A Discrete-Event Network Simulator
API
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
22 #include "mpi-interface.h"
23 
24 #include "ns3/simulator.h"
25 #include "ns3/scheduler.h"
26 #include "ns3/event-impl.h"
27 #include "ns3/channel.h"
28 #include "ns3/node-container.h"
29 #include "ns3/ptr.h"
30 #include "ns3/pointer.h"
31 #include "ns3/assert.h"
32 #include "ns3/log.h"
33 
34 #include <cmath>
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
41 
42 namespace ns3 {
43 
44 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl);
45 
47 {
48 }
49 
50 Time
52 {
53  return m_smallestTime;
54 }
55 
56 uint32_t
58 {
59  return m_txCount;
60 }
61 
62 uint32_t
64 {
65  return m_rxCount;
66 }
67 uint32_t
69 {
70  return m_myId;
71 }
72 
73 bool
75 {
76  return m_isFinished;
77 }
78 
80 
81 TypeId
83 {
84  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
85  .SetParent<Object> ()
86  .AddConstructor<DistributedSimulatorImpl> ()
87  ;
88  return tid;
89 }
90 
92 {
93  NS_LOG_FUNCTION (this);
94 
95 #ifdef NS3_MPI
98 
99  // Allocate the LBTS message buffer
101  m_grantedTime = Seconds (0);
102 #else
104  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
105 #endif
106 
107  m_stop = false;
108  m_globalFinished = false;
109  // uids are allocated from 4.
110  // uid 0 is "invalid" events
111  // uid 1 is "now" events
112  // uid 2 is "destroy" events
113  m_uid = 4;
114  // before ::Run is entered, the m_currentUid will be zero
115  m_currentUid = 0;
116  m_currentTs = 0;
117  m_currentContext = 0xffffffff;
119  m_events = 0;
120 }
121 
123 {
124  NS_LOG_FUNCTION (this);
125 }
126 
127 void
129 {
130  NS_LOG_FUNCTION (this);
131 
132  while (!m_events->IsEmpty ())
133  {
134  Scheduler::Event next = m_events->RemoveNext ();
135  next.impl->Unref ();
136  }
137  m_events = 0;
138  delete [] m_pLBTS;
140 }
141 
142 void
144 {
145  NS_LOG_FUNCTION (this);
146 
147  while (!m_destroyEvents.empty ())
148  {
149  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
150  m_destroyEvents.pop_front ();
151  NS_LOG_LOGIC ("handle destroy " << ev);
152  if (!ev->IsCancelled ())
153  {
154  ev->Invoke ();
155  }
156  }
157 
159 }
160 
161 
162 void
164 {
165  NS_LOG_FUNCTION (this);
166 
167 #ifdef NS3_MPI
168  if (MpiInterface::GetSize () <= 1)
169  {
170  m_lookAhead = Seconds (0);
171  }
172  else
173  {
174  if (m_lookAhead == Seconds (-1))
175  {
177  }
178  // else it was already set by SetLookAhead
179 
181  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
182  {
183  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
184  {
185  continue;
186  }
187 
188  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
189  {
190  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
191  // only works for p2p links currently
192  if (!localNetDevice->IsPointToPoint ())
193  {
194  continue;
195  }
196  Ptr<Channel> channel = localNetDevice->GetChannel ();
197  if (channel == 0)
198  {
199  continue;
200  }
201 
202  // grab the adjacent node
203  Ptr<Node> remoteNode;
204  if (channel->GetDevice (0) == localNetDevice)
205  {
206  remoteNode = (channel->GetDevice (1))->GetNode ();
207  }
208  else
209  {
210  remoteNode = (channel->GetDevice (0))->GetNode ();
211  }
212 
213  // if it's not remote, don't consider it
214  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
215  {
216  continue;
217  }
218 
219  // compare delay on the channel with current value of
220  // m_lookAhead. if delay on channel is smaller, make
221  // it the new lookAhead.
222  TimeValue delay;
223  channel->GetAttribute ("Delay", delay);
224 
225  if (delay.Get () < m_lookAhead)
226  {
227  m_lookAhead = delay.Get ();
228  }
229  }
230  }
231  }
232 
233  // m_lookAhead is now set
235 
236  /*
237  * Compute the maximum inter-task latency and use that value
238  * for tasks with no inter-task links.
239  *
240  * Special processing for edge cases. For tasks that have no
241  * nodes need to determine a reasonable lookAhead value. Infinity
242  * would work correctly but introduces a performance issue; tasks
243  * with an infinite lookAhead would execute all their events
244  * before doing an AllGather resulting in very bad load balance
245  * during the first time window. Since all tasks participate in
246  * the AllGather it is desirable to have all the tasks advance in
247  * simulation time at a similar rate assuming roughly equal events
248  * per unit of simulation time in order to equalize the amount of
249  * work per time window.
250  */
251  long sendbuf;
252  long recvbuf;
253 
254  /* Tasks with no inter-task links do not contribute to max */
256  {
257  sendbuf = 0;
258  }
259  else
260  {
261  sendbuf = m_lookAhead.GetInteger ();
262  }
263 
264  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MPI_COMM_WORLD);
265 
266  /* For nodes that did not compute a lookahead use max from ranks
267  * that did compute a value. An edge case occurs if all nodes have
268  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
269  * will proceed without synchronization until a single AllGather
270  * occurs when all tasks have finished.
271  */
272  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
273  {
274  m_lookAhead = Time (recvbuf);
276  }
277 
278 #else
279  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
280 #endif
281 }
282 
283 void
285 {
286  if (lookAhead > 0)
287  {
288  NS_LOG_FUNCTION (this << lookAhead);
289  m_lookAhead = lookAhead;
290  }
291  else
292  {
293  NS_LOG_WARN ("attempted to set look ahead negative: " << lookAhead);
294  }
295 }
296 
297 void
299 {
300  NS_LOG_FUNCTION (this << schedulerFactory);
301 
302  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
303 
304  if (m_events != 0)
305  {
306  while (!m_events->IsEmpty ())
307  {
308  Scheduler::Event next = m_events->RemoveNext ();
309  scheduler->Insert (next);
310  }
311  }
312  m_events = scheduler;
313 }
314 
315 void
317 {
318  NS_LOG_FUNCTION (this);
319 
320  Scheduler::Event next = m_events->RemoveNext ();
321 
322  NS_ASSERT (next.key.m_ts >= m_currentTs);
324 
325  NS_LOG_LOGIC ("handle " << next.key.m_ts);
326  m_currentTs = next.key.m_ts;
328  m_currentUid = next.key.m_uid;
329  next.impl->Invoke ();
330  next.impl->Unref ();
331 }
332 
333 bool
335 {
336  return m_globalFinished;
337 }
338 
339 bool
341 {
342  return m_events->IsEmpty () || m_stop;
343 }
344 
345 uint64_t
347 {
348  // If local MPI task is has no more events or stop was called
349  // next event time is infinity.
350  if (IsLocalFinished ())
351  {
353  }
354  else
355  {
356  Scheduler::Event ev = m_events->PeekNext ();
357  return ev.key.m_ts;
358  }
359 }
360 
361 Time
363 {
364  return TimeStep (NextTs ());
365 }
366 
367 void
369 {
370  NS_LOG_FUNCTION (this);
371 
372 #ifdef NS3_MPI
374  m_stop = false;
375  while (!m_globalFinished)
376  {
377  Time nextTime = Next ();
378 
379  // If local event is beyond grantedTime then need to synchronize
380  // with other tasks to determine new time window. If local task
381  // is finished then continue to participate in allgather
382  // synchronizations with other tasks until all tasks have
383  // completed.
384  if (nextTime > m_grantedTime || IsLocalFinished () )
385  {
386  // Can't process next event, calculate a new LBTS
387  // First receive any pending messages
389  // reset next time
390  nextTime = Next ();
391  // And check for send completes
393  // Finally calculate the lbts
395  m_myId, IsLocalFinished (), nextTime);
396  m_pLBTS[m_myId] = lMsg;
397  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
398  sizeof (LbtsMessage), MPI_BYTE, MPI_COMM_WORLD);
399  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
400  // The totRx and totTx counts insure there are no transient
401  // messages; If totRx != totTx, there are transients,
402  // so we don't update the granted time.
403  uint32_t totRx = m_pLBTS[0].GetRxCount ();
404  uint32_t totTx = m_pLBTS[0].GetTxCount ();
406 
407  for (uint32_t i = 1; i < m_systemCount; ++i)
408  {
409  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
410  {
411  smallestTime = m_pLBTS[i].GetSmallestTime ();
412  }
413  totRx += m_pLBTS[i].GetRxCount ();
414  totTx += m_pLBTS[i].GetTxCount ();
416  }
417  if (totRx == totTx)
418  {
419  // If lookahead is infinite then granted time should be as well.
420  // Covers the edge case if all the tasks have no inter tasks
421  // links, prevents overflow of granted time.
423  {
425  }
426  else
427  {
428  // Overflow is possible here if near end of representable time.
429  m_grantedTime = smallestTime + m_lookAhead;
430  }
431  }
432  }
433 
434  // Execute next event if it is within the current time window.
435  // Local task may be completed.
436  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
437  { // Safe to process
438  ProcessOneEvent ();
439  }
440  }
441 
442  // If the simulator stopped naturally by lack of events, make a
443  // consistency test to check that we didn't lose any events along the way.
444  NS_ASSERT (!m_events->IsEmpty () || m_unscheduledEvents == 0);
445 #else
446  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
447 #endif
448 }
449 
451 {
452  return m_myId;
453 }
454 
455 void
457 {
458  NS_LOG_FUNCTION (this);
459 
460  m_stop = true;
461 }
462 
463 void
465 {
466  NS_LOG_FUNCTION (this << time.GetTimeStep ());
467 
469 }
470 
471 //
472 // Schedule an event for a _relative_ time in the future.
473 //
474 EventId
476 {
477  NS_LOG_FUNCTION (this << time.GetTimeStep () << event);
478 
479  Time tAbsolute = time + TimeStep (m_currentTs);
480 
481  NS_ASSERT (tAbsolute.IsPositive ());
482  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
483  Scheduler::Event ev;
484  ev.impl = event;
485  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
486  ev.key.m_context = GetContext ();
487  ev.key.m_uid = m_uid;
488  m_uid++;
490  m_events->Insert (ev);
491  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
492 }
493 
494 void
495 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time, EventImpl *event)
496 {
497  NS_LOG_FUNCTION (this << context << time.GetTimeStep () << m_currentTs << event);
498 
499  Scheduler::Event ev;
500  ev.impl = event;
501  ev.key.m_ts = m_currentTs + time.GetTimeStep ();
502  ev.key.m_context = context;
503  ev.key.m_uid = m_uid;
504  m_uid++;
506  m_events->Insert (ev);
507 }
508 
509 EventId
511 {
512  NS_LOG_FUNCTION (this << event);
513 
514  Scheduler::Event ev;
515  ev.impl = event;
516  ev.key.m_ts = m_currentTs;
517  ev.key.m_context = GetContext ();
518  ev.key.m_uid = m_uid;
519  m_uid++;
521  m_events->Insert (ev);
522  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
523 }
524 
525 EventId
527 {
528  NS_LOG_FUNCTION (this << event);
529 
530  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
531  m_destroyEvents.push_back (id);
532  m_uid++;
533  return id;
534 }
535 
536 Time
538 {
539  return TimeStep (m_currentTs);
540 }
541 
542 Time
544 {
545  if (IsExpired (id))
546  {
547  return TimeStep (0);
548  }
549  else
550  {
551  return TimeStep (id.GetTs () - m_currentTs);
552  }
553 }
554 
555 void
557 {
558  if (id.GetUid () == 2)
559  {
560  // destroy events.
561  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
562  {
563  if (*i == id)
564  {
565  m_destroyEvents.erase (i);
566  break;
567  }
568  }
569  return;
570  }
571  if (IsExpired (id))
572  {
573  return;
574  }
575  Scheduler::Event event;
576  event.impl = id.PeekEventImpl ();
577  event.key.m_ts = id.GetTs ();
578  event.key.m_context = id.GetContext ();
579  event.key.m_uid = id.GetUid ();
580  m_events->Remove (event);
581  event.impl->Cancel ();
582  // whenever we remove an event from the event list, we have to unref it.
583  event.impl->Unref ();
584 
586 }
587 
588 void
590 {
591  if (!IsExpired (id))
592  {
593  id.PeekEventImpl ()->Cancel ();
594  }
595 }
596 
597 bool
599 {
600  if (ev.GetUid () == 2)
601  {
602  if (ev.PeekEventImpl () == 0
603  || ev.PeekEventImpl ()->IsCancelled ())
604  {
605  return true;
606  }
607  // destroy events.
608  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
609  {
610  if (*i == ev)
611  {
612  return false;
613  }
614  }
615  return true;
616  }
617  if (ev.PeekEventImpl () == 0
618  || ev.GetTs () < m_currentTs
619  || (ev.GetTs () == m_currentTs
620  && ev.GetUid () <= m_currentUid)
621  || ev.PeekEventImpl ()->IsCancelled ())
622  {
623  return true;
624  }
625  else
626  {
627  return false;
628  }
629 }
630 
631 Time
633 {
636  return TimeStep (0x7fffffffffffffffLL);
637 }
638 
639 uint32_t
641 {
642  return m_currentContext;
643 }
644 
645 } // namespace ns3
Time Get(void) const
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:95
smart pointer class similar to boost::intrusive_ptr
Definition: ptr.h:60
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
virtual void SetScheduler(ObjectFactory schedulerFactory)
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register the class in the ns-3 factory.
Definition: object-base.h:38
std::vector< Ptr< Node > >::const_iterator Iterator
Node container iterator.
EventImpl * impl
Definition: scheduler.h:73
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:61
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:170
virtual EventId ScheduleDestroy(EventImpl *event)
virtual Time GetMaximumSimulationTime(void) const
Iterator End(void) const
Get an iterator which indicates past-the-last Node in the container.
bool IsCancelled(void)
Definition: event-impl.cc:57
virtual void DoDispose(void)
This method is called by Object::Dispose or by the object's destructor, whichever comes first...
Definition: object.cc:335
#define NS_FATAL_ERROR(msg)
fatal error handling
Definition: fatal-error.h:95
virtual void SetMaximumLookAhead(const Time lookAhead)
static EventId Schedule(Time const &time, MEM mem_ptr, OBJ obj)
Schedule an event to expire at the relative time "time" is reached.
Definition: simulator.h:825
EventImpl * PeekEventImpl(void) const
Definition: event-id.cc:65
uint32_t GetSystemId(void) const
Definition: node.cc:113
virtual Time Now(void) const
Return the "current simulation time".
virtual void DoDispose(void)
This method is called by Object::Dispose or by the object's destructor, whichever comes first...
void Invoke(void)
Called by the simulation engine to notify the event that it has expired.
Definition: event-impl.cc:40
static void TestSendComplete()
Check for completed sends.
virtual Time GetDelayLeft(const EventId &id) const
static void Destroy()
Deletes storage used by the parallel environment.
Attribute for objects of type ns3::Time.
Definition: nstime.h:912
Ptr< Object > Create(void) const
virtual EventId ScheduleNow(EventImpl *event)
void Unref(void) const
Decrement the reference count.
uint32_t GetUid(void) const
Definition: event-id.cc:83
Maintain the event list.
Definition: scheduler.h:58
#define NS_LOG_LOGIC(msg)
Use NS_LOG to output a message of level LOG_LOGIC.
Definition: log.h:233
virtual void Remove(const EventId &ev)
Remove an event from the event list.
keep track of a set of node pointers.
virtual void Destroy()
This method is typically invoked at the end of a simulation to avoid false-positive reports by a leak...
Iterator Begin(void) const
Get an iterator which refers to the first Node in the container.
virtual void ScheduleWithContext(uint32_t context, Time const &time, EventImpl *event)
int64_t GetTimeStep(void) const
Definition: nstime.h:352
Time TimeStep(uint64_t ts)
Definition: nstime.h:902
int64_t GetInteger(void) const
Definition: nstime.h:360
virtual uint32_t GetSystemId(void) const
virtual uint32_t GetContext(void) const
Structure used for all-reduce LBTS computation.
static NodeContainer GetGlobal(void)
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
virtual void Cancel(const EventId &ev)
Set the cancel bit on this event: the event's associated function will not be invoked when it expires...
instantiate subclasses of ns3::Object.
a simulation event
Definition: event-impl.h:39
virtual EventId Schedule(Time const &time, EventImpl *event)
static void ReceiveMessages()
Check for received messages complete.
an identifier for simulation events.
Definition: event-id.h:46
static uint32_t GetSystemId()
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition: log.h:203
static void Stop(void)
If an event invokes this method, it will be the last event scheduled by the Simulator::run method bef...
Definition: simulator.cc:165
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition: nstime.h:845
#define NS_UNUSED(x)
Definition: unused.h:5
a base class which provides memory management and object aggregation
Definition: object.h:64
virtual void Run(void)
Run the simulation until one of:
virtual void Stop(void)
If an event invokes this method, it will be the last event scheduled by the Simulator::Run method bef...
virtual bool IsFinished(void) const
If there are no more events lefts to be scheduled, or if simulation time has already reached the "sto...
a unique identifier for an interface.
Definition: type-id.h:49
TypeId SetParent(TypeId tid)
Definition: type-id.cc:610
static uint32_t GetSize()
uint64_t GetTs(void) const
Definition: event-id.cc:71
virtual bool IsExpired(const EventId &ev) const
This method has O(1) complexity.