A Discrete-Event Network Simulator
API
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
22 #include "mpi-interface.h"
23 
24 #include "ns3/simulator.h"
25 #include "ns3/scheduler.h"
26 #include "ns3/event-impl.h"
27 #include "ns3/channel.h"
28 #include "ns3/node-container.h"
29 #include "ns3/ptr.h"
30 #include "ns3/pointer.h"
31 #include "ns3/assert.h"
32 #include "ns3/log.h"
33 
34 #include <cmath>
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
41 
42 namespace ns3 {
43 
44 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl)
45  ;
46 
48 {
49 }
50 
51 Time
53 {
54  return m_smallestTime;
55 }
56 
57 uint32_t
59 {
60  return m_txCount;
61 }
62 
63 uint32_t
65 {
66  return m_rxCount;
67 }
68 uint32_t
70 {
71  return m_myId;
72 }
73 
74 bool
76 {
77  return m_isFinished;
78 }
79 
81 
82 TypeId
84 {
85  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
86  .SetParent<Object> ()
87  .AddConstructor<DistributedSimulatorImpl> ()
88  ;
89  return tid;
90 }
91 
93 {
94  NS_LOG_FUNCTION (this);
95 
96 #ifdef NS3_MPI
99 
100  // Allocate the LBTS message buffer
102  m_grantedTime = Seconds (0);
103 #else
105  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
106 #endif
107 
108  m_stop = false;
109  m_globalFinished = false;
110  // uids are allocated from 4.
111  // uid 0 is "invalid" events
112  // uid 1 is "now" events
113  // uid 2 is "destroy" events
114  m_uid = 4;
115  // before ::Run is entered, the m_currentUid will be zero
116  m_currentUid = 0;
117  m_currentTs = 0;
118  m_currentContext = 0xffffffff;
120  m_events = 0;
121 }
122 
124 {
125  NS_LOG_FUNCTION (this);
126 }
127 
128 void
130 {
131  NS_LOG_FUNCTION (this);
132 
133  while (!m_events->IsEmpty ())
134  {
135  Scheduler::Event next = m_events->RemoveNext ();
136  next.impl->Unref ();
137  }
138  m_events = 0;
139  delete [] m_pLBTS;
141 }
142 
143 void
145 {
146  NS_LOG_FUNCTION (this);
147 
148  while (!m_destroyEvents.empty ())
149  {
150  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
151  m_destroyEvents.pop_front ();
152  NS_LOG_LOGIC ("handle destroy " << ev);
153  if (!ev->IsCancelled ())
154  {
155  ev->Invoke ();
156  }
157  }
158 
160 }
161 
162 
163 void
165 {
166  NS_LOG_FUNCTION (this);
167 
168 #ifdef NS3_MPI
169  if (MpiInterface::GetSize () <= 1)
170  {
172  m_grantedTime = Seconds (0);
173  }
174  else
175  {
177 
179 
181  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
182  {
183  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
184  {
185  continue;
186  }
187 
188  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
189  {
190  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
191  // only works for p2p links currently
192  if (!localNetDevice->IsPointToPoint ())
193  {
194  continue;
195  }
196  Ptr<Channel> channel = localNetDevice->GetChannel ();
197  if (channel == 0)
198  {
199  continue;
200  }
201 
202  // grab the adjacent node
203  Ptr<Node> remoteNode;
204  if (channel->GetDevice (0) == localNetDevice)
205  {
206  remoteNode = (channel->GetDevice (1))->GetNode ();
207  }
208  else
209  {
210  remoteNode = (channel->GetDevice (0))->GetNode ();
211  }
212 
213  // if it's not remote, don't consider it
214  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
215  {
216  continue;
217  }
218 
219  // compare delay on the channel with current value of
220  // m_lookAhead. if delay on channel is smaller, make
221  // it the new lookAhead.
222  TimeValue delay;
223  channel->GetAttribute ("Delay", delay);
224 
226  {
228  m_grantedTime = delay.Get ();
229  }
230  }
231  }
232  }
233 
234  /*
235  * Compute the maximum inter-task latency and use that value
236  * for tasks with no inter-task links.
237  *
238  * Special processing for edge cases. For tasks that have no
239  * nodes need to determine a reasonable lookAhead value. Infinity
240  * would work correctly but introduces a performance issue; tasks
241  * with an infinite lookAhead would execute all their events
242  * before doing an AllGather resulting in very bad load balance
243  * during the first time window. Since all tasks participate in
244  * the AllGather it is desirable to have all the tasks advance in
245  * simulation time at a similar rate assuming roughly equal events
246  * per unit of simulation time in order to equalize the amount of
247  * work per time window.
248  */
249  long sendbuf;
250  long recvbuf;
251 
252  /* Tasks with no inter-task links do not contribute to max */
254  {
255  sendbuf = 0;
256  }
257  else
258  {
259  sendbuf = m_lookAhead.GetInteger ();
260  }
261 
262  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MPI_COMM_WORLD);
263 
264  /* For nodes that did not compute a lookahead use max from ranks
265  * that did compute a value. An edge case occurs if all nodes have
266  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
267  * will proceed without synchronization until a single AllGather
268  * occurs when all tasks have finished.
269  */
270  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
271  {
272  m_lookAhead = Time (recvbuf);
274  }
275 
276 #else
277  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
278 #endif
279 }
280 
281 void
283 {
284  NS_LOG_FUNCTION (this << schedulerFactory);
285 
286  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
287 
288  if (m_events != 0)
289  {
290  while (!m_events->IsEmpty ())
291  {
292  Scheduler::Event next = m_events->RemoveNext ();
293  scheduler->Insert (next);
294  }
295  }
296  m_events = scheduler;
297 }
298 
299 void
301 {
302  NS_LOG_FUNCTION (this);
303 
304  Scheduler::Event next = m_events->RemoveNext ();
305 
306  NS_ASSERT (next.key.m_ts >= m_currentTs);
308 
309  NS_LOG_LOGIC ("handle " << next.key.m_ts);
310  m_currentTs = next.key.m_ts;
312  m_currentUid = next.key.m_uid;
313  next.impl->Invoke ();
314  next.impl->Unref ();
315 }
316 
317 bool
319 {
320  return m_globalFinished;
321 }
322 
323 bool
325 {
326  return m_events->IsEmpty () || m_stop;
327 }
328 
329 uint64_t
331 {
332  // If local MPI task is has no more events or stop was called
333  // next event time is infinity.
334  if (IsLocalFinished ())
335  {
337  }
338  else
339  {
340  Scheduler::Event ev = m_events->PeekNext ();
341  return ev.key.m_ts;
342  }
343 }
344 
345 Time
347 {
348  return TimeStep (NextTs ());
349 }
350 
351 void
353 {
354  NS_LOG_FUNCTION (this);
355 
356 #ifdef NS3_MPI
358  m_stop = false;
359  while (!m_globalFinished)
360  {
361  Time nextTime = Next ();
362 
363  // If local event is beyond grantedTime then need to synchronize
364  // with other tasks to determine new time window. If local task
365  // is finished then continue to participate in allgather
366  // synchronizations with other tasks until all tasks have
367  // completed.
368  if (nextTime > m_grantedTime || IsLocalFinished () )
369  {
370  // Can't process next event, calculate a new LBTS
371  // First receive any pending messages
373  // reset next time
374  nextTime = Next ();
375  // And check for send completes
377  // Finally calculate the lbts
379  m_myId, IsLocalFinished (), nextTime);
380  m_pLBTS[m_myId] = lMsg;
381  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
382  sizeof (LbtsMessage), MPI_BYTE, MPI_COMM_WORLD);
383  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
384  // The totRx and totTx counts insure there are no transient
385  // messages; If totRx != totTx, there are transients,
386  // so we don't update the granted time.
387  uint32_t totRx = m_pLBTS[0].GetRxCount ();
388  uint32_t totTx = m_pLBTS[0].GetTxCount ();
390 
391  for (uint32_t i = 1; i < m_systemCount; ++i)
392  {
393  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
394  {
395  smallestTime = m_pLBTS[i].GetSmallestTime ();
396  }
397  totRx += m_pLBTS[i].GetRxCount ();
398  totTx += m_pLBTS[i].GetTxCount ();
400  }
401  if (totRx == totTx)
402  {
403  // If lookahead is infinite then granted time should be as well.
404  // Covers the edge case if all the tasks have no inter tasks
405  // links, prevents overflow of granted time.
407  {
409  }
410  else
411  {
412  // Overflow is possible here if near end of representable time.
413  m_grantedTime = smallestTime + m_lookAhead;
414  }
415  }
416  }
417 
418  // Execute next event if it is within the current time window.
419  // Local task may be completed.
420  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
421  { // Safe to process
422  ProcessOneEvent ();
423  }
424  }
425 
426  // If the simulator stopped naturally by lack of events, make a
427  // consistency test to check that we didn't lose any events along the way.
428  NS_ASSERT (!m_events->IsEmpty () || m_unscheduledEvents == 0);
429 #else
430  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
431 #endif
432 }
433 
435 {
436  return m_myId;
437 }
438 
439 void
441 {
442  NS_LOG_FUNCTION (this);
443 
444  m_stop = true;
445 }
446 
447 void
449 {
450  NS_LOG_FUNCTION (this << time.GetTimeStep ());
451 
453 }
454 
455 //
456 // Schedule an event for a _relative_ time in the future.
457 //
458 EventId
460 {
461  NS_LOG_FUNCTION (this << time.GetTimeStep () << event);
462 
463  Time tAbsolute = time + TimeStep (m_currentTs);
464 
465  NS_ASSERT (tAbsolute.IsPositive ());
466  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
467  Scheduler::Event ev;
468  ev.impl = event;
469  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
470  ev.key.m_context = GetContext ();
471  ev.key.m_uid = m_uid;
472  m_uid++;
474  m_events->Insert (ev);
475  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
476 }
477 
478 void
479 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time, EventImpl *event)
480 {
481  NS_LOG_FUNCTION (this << context << time.GetTimeStep () << m_currentTs << event);
482 
483  Scheduler::Event ev;
484  ev.impl = event;
485  ev.key.m_ts = m_currentTs + time.GetTimeStep ();
486  ev.key.m_context = context;
487  ev.key.m_uid = m_uid;
488  m_uid++;
490  m_events->Insert (ev);
491 }
492 
493 EventId
495 {
496  NS_LOG_FUNCTION (this << event);
497 
498  Scheduler::Event ev;
499  ev.impl = event;
500  ev.key.m_ts = m_currentTs;
501  ev.key.m_context = GetContext ();
502  ev.key.m_uid = m_uid;
503  m_uid++;
505  m_events->Insert (ev);
506  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
507 }
508 
509 EventId
511 {
512  NS_LOG_FUNCTION (this << event);
513 
514  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
515  m_destroyEvents.push_back (id);
516  m_uid++;
517  return id;
518 }
519 
520 Time
522 {
523  return TimeStep (m_currentTs);
524 }
525 
526 Time
528 {
529  if (IsExpired (id))
530  {
531  return TimeStep (0);
532  }
533  else
534  {
535  return TimeStep (id.GetTs () - m_currentTs);
536  }
537 }
538 
539 void
541 {
542  if (id.GetUid () == 2)
543  {
544  // destroy events.
545  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
546  {
547  if (*i == id)
548  {
549  m_destroyEvents.erase (i);
550  break;
551  }
552  }
553  return;
554  }
555  if (IsExpired (id))
556  {
557  return;
558  }
559  Scheduler::Event event;
560  event.impl = id.PeekEventImpl ();
561  event.key.m_ts = id.GetTs ();
562  event.key.m_context = id.GetContext ();
563  event.key.m_uid = id.GetUid ();
564  m_events->Remove (event);
565  event.impl->Cancel ();
566  // whenever we remove an event from the event list, we have to unref it.
567  event.impl->Unref ();
568 
570 }
571 
572 void
574 {
575  if (!IsExpired (id))
576  {
577  id.PeekEventImpl ()->Cancel ();
578  }
579 }
580 
581 bool
583 {
584  if (ev.GetUid () == 2)
585  {
586  if (ev.PeekEventImpl () == 0
587  || ev.PeekEventImpl ()->IsCancelled ())
588  {
589  return true;
590  }
591  // destroy events.
592  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
593  {
594  if (*i == ev)
595  {
596  return false;
597  }
598  }
599  return true;
600  }
601  if (ev.PeekEventImpl () == 0
602  || ev.GetTs () < m_currentTs
603  || (ev.GetTs () == m_currentTs
604  && ev.GetUid () <= m_currentUid)
605  || ev.PeekEventImpl ()->IsCancelled ())
606  {
607  return true;
608  }
609  else
610  {
611  return false;
612  }
613 }
614 
615 Time
617 {
620  return TimeStep (0x7fffffffffffffffLL);
621 }
622 
623 uint32_t
625 {
626  return m_currentContext;
627 }
628 
629 } // namespace ns3
Time Get(void) const
keep track of time values and allow control of global simulation resolution
Definition: nstime.h:81
smart pointer class similar to boost::intrusive_ptr
Definition: ptr.h:59
#define NS_LOG_FUNCTION(parameters)
Definition: log.h:345
virtual void SetScheduler(ObjectFactory schedulerFactory)
std::vector< Ptr< Node > >::const_iterator Iterator
Time TimeStep(uint64_t ts)
Definition: nstime.h:950
EventImpl * impl
Definition: scheduler.h:72
#define NS_ASSERT(condition)
Definition: assert.h:64
NS_OBJECT_ENSURE_REGISTERED(NullMessageSimulatorImpl)
virtual EventId ScheduleDestroy(EventImpl *event)
virtual Time GetMaximumSimulationTime(void) const
NS_LOG_COMPONENT_DEFINE("DistributedSimulatorImpl")
Iterator End(void) const
Get an iterator which indicates past-the-last Node in the container.
bool IsCancelled(void)
Definition: event-impl.cc:57
virtual void DoDispose(void)
This method is called by Object::Dispose or by the object's destructor, whichever comes first...
Definition: object.cc:336
static EventId Schedule(Time const &time, MEM mem_ptr, OBJ obj)
Schedule an event to expire at the relative time "time" is reached.
Definition: simulator.h:824
EventImpl * PeekEventImpl(void) const
Definition: event-id.cc:65
uint32_t GetSystemId(void) const
Definition: node.cc:111
virtual Time Now(void) const
Return the "current simulation time".
#define NS_FATAL_ERROR(msg)
fatal error handling
Definition: fatal-error.h:72
virtual void DoDispose(void)
This method is called by Object::Dispose or by the object's destructor, whichever comes first...
void Invoke(void)
Called by the simulation engine to notify the event that it has expired.
Definition: event-impl.cc:40
static void TestSendComplete()
Check for completed sends.
double GetSeconds(void) const
Definition: nstime.h:274
virtual Time GetDelayLeft(const EventId &id) const
static void Destroy()
Deletes storage used by the parallel environment.
hold objects of type ns3::Time
Definition: nstime.h:961
Ptr< Object > Create(void) const
virtual EventId ScheduleNow(EventImpl *event)
void Unref(void) const
Decrement the reference count.
uint32_t GetUid(void) const
Definition: event-id.cc:83
Maintain the event list.
Definition: scheduler.h:57
#define NS_LOG_LOGIC(msg)
Definition: log.h:368
virtual void Remove(const EventId &ev)
Remove an event from the event list.
keep track of a set of node pointers.
virtual void Destroy()
This method is typically invoked at the end of a simulation to avoid false-positive reports by a leak...
Iterator Begin(void) const
Get an iterator which refers to the first Node in the container.
virtual void ScheduleWithContext(uint32_t context, Time const &time, EventImpl *event)
int64_t GetTimeStep(void) const
Definition: nstime.h:356
int64_t GetInteger(void) const
Definition: nstime.h:364
virtual uint32_t GetSystemId(void) const
virtual uint32_t GetContext(void) const
Structure used for all-reduce LBTS computation.
static NodeContainer GetGlobal(void)
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
virtual void Cancel(const EventId &ev)
Set the cancel bit on this event: the event's associated function will not be invoked when it expires...
instantiate subclasses of ns3::Object.
a simulation event
Definition: event-impl.h:39
virtual EventId Schedule(Time const &time, EventImpl *event)
static void ReceiveMessages()
Check for received messages complete.
an identifier for simulation events.
Definition: event-id.h:46
static uint32_t GetSystemId()
static void Stop(void)
If an event invokes this method, it will be the last event scheduled by the Simulator::run method bef...
Definition: simulator.cc:165
#define NS_UNUSED(x)
Definition: unused.h:5
a base class which provides memory management and object aggregation
Definition: object.h:63
virtual void Run(void)
Run the simulation until one of:
virtual void Stop(void)
If an event invokes this method, it will be the last event scheduled by the Simulator::Run method bef...
virtual bool IsFinished(void) const
If there are no more events lefts to be scheduled, or if simulation time has already reached the "sto...
a unique identifier for an interface.
Definition: type-id.h:49
TypeId SetParent(TypeId tid)
Definition: type-id.cc:611
static uint32_t GetSize()
uint64_t GetTs(void) const
Definition: event-id.cc:71
virtual bool IsExpired(const EventId &ev) const
This method has O(1) complexity.