A Discrete-Event Network Simulator
API
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  */
18 
20 #include "mpi-interface.h"
21 
22 #include "ns3/simulator.h"
23 #include "ns3/scheduler.h"
24 #include "ns3/event-impl.h"
25 #include "ns3/channel.h"
26 #include "ns3/node-container.h"
27 #include "ns3/ptr.h"
28 #include "ns3/pointer.h"
29 #include "ns3/assert.h"
30 #include "ns3/log.h"
31 
32 #include <cmath>
33 
34 #ifdef NS3_MPI
35 #include <mpi.h>
36 #endif
37 
38 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
39 
40 namespace ns3 {
41 
42 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl);
43 
45 {
46 }
47 
48 Time
50 {
51  return m_smallestTime;
52 }
53 
54 uint32_t
56 {
57  return m_txCount;
58 }
59 
60 uint32_t
62 {
63  return m_rxCount;
64 }
65 uint32_t
67 {
68  return m_myId;
69 }
70 
71 bool
73 {
74  return m_isFinished;
75 }
76 
78 
79 TypeId
81 {
82  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
83  .SetParent<Object> ()
84  .AddConstructor<DistributedSimulatorImpl> ()
85  ;
86  return tid;
87 }
88 
90 {
91 #ifdef NS3_MPI
94 
95  // Allocate the LBTS message buffer
97  m_grantedTime = Seconds (0);
98 #else
100  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
101 #endif
102 
103  m_stop = false;
104  // uids are allocated from 4.
105  // uid 0 is "invalid" events
106  // uid 1 is "now" events
107  // uid 2 is "destroy" events
108  m_uid = 4;
109  // before ::Run is entered, the m_currentUid will be zero
110  m_currentUid = 0;
111  m_currentTs = 0;
112  m_currentContext = 0xffffffff;
114  m_events = 0;
115 }
116 
118 {
119 }
120 
121 void
123 {
124  while (!m_events->IsEmpty ())
125  {
126  Scheduler::Event next = m_events->RemoveNext ();
127  next.impl->Unref ();
128  }
129  m_events = 0;
130  delete [] m_pLBTS;
132 }
133 
134 void
136 {
137  while (!m_destroyEvents.empty ())
138  {
139  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
140  m_destroyEvents.pop_front ();
141  NS_LOG_LOGIC ("handle destroy " << ev);
142  if (!ev->IsCancelled ())
143  {
144  ev->Invoke ();
145  }
146  }
147 
149 }
150 
151 
152 void
154 {
155 #ifdef NS3_MPI
156  if (MpiInterface::GetSize () <= 1)
157  {
159  m_grantedTime = Seconds (0);
160  }
161  else
162  {
165 
167  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
168  {
169  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
170  {
171  continue;
172  }
173 
174  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
175  {
176  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
177  // only works for p2p links currently
178  if (!localNetDevice->IsPointToPoint ())
179  {
180  continue;
181  }
182  Ptr<Channel> channel = localNetDevice->GetChannel ();
183  if (channel == 0)
184  {
185  continue;
186  }
187 
188  // grab the adjacent node
189  Ptr<Node> remoteNode;
190  if (channel->GetDevice (0) == localNetDevice)
191  {
192  remoteNode = (channel->GetDevice (1))->GetNode ();
193  }
194  else
195  {
196  remoteNode = (channel->GetDevice (0))->GetNode ();
197  }
198 
199  // if it's not remote, don't consider it
200  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
201  {
202  continue;
203  }
204 
205  // compare delay on the channel with current value of
206  // m_lookAhead. if delay on channel is smaller, make
207  // it the new lookAhead.
208  TimeValue delay;
209  channel->GetAttribute ("Delay", delay);
210 
212  {
214  m_grantedTime = delay.Get ();
215  }
216  }
217  }
218  }
219 
220  /*
221  * Compute the maximum inter-task latency and use that value
222  * for tasks with no inter-task links.
223  *
224  * Special processing for edge cases. For tasks that have no
225  * nodes need to determine a reasonable lookAhead value. Infinity
226  * would work correctly but introduces a performance issue; tasks
227  * with an infinite lookAhead would execute all their events
228  * before doing an AllGather resulting in very bad load balance
229  * during the first time window. Since all tasks participate in
230  * the AllGather it is desirable to have all the tasks advance in
231  * simulation time at a similar rate assuming roughly equal events
232  * per unit of simulation time in order to equalize the amount of
233  * work per time window.
234  */
235  long sendbuf;
236  long recvbuf;
237 
238  /* Tasks with no inter-task links do not contribute to max */
240  {
241  sendbuf = 0;
242  }
243  else
244  {
245  sendbuf = m_lookAhead.GetInteger ();
246  }
247 
248  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MPI_COMM_WORLD);
249 
250  /* For nodes that did not compute a lookahead use max from ranks
251  * that did compute a value. An edge case occurs if all nodes have
252  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
253  * will proceed without synchronization until a single AllGather
254  * occurs when all tasks have finished.
255  */
256  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
257  {
258  m_lookAhead = Time (recvbuf);
260  }
261 
262 #else
263  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
264 #endif
265 }
266 
267 void
269 {
270  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
271 
272  if (m_events != 0)
273  {
274  while (!m_events->IsEmpty ())
275  {
276  Scheduler::Event next = m_events->RemoveNext ();
277  scheduler->Insert (next);
278  }
279  }
280  m_events = scheduler;
281 }
282 
283 void
285 {
286  Scheduler::Event next = m_events->RemoveNext ();
287 
288  NS_ASSERT (next.key.m_ts >= m_currentTs);
290 
291  NS_LOG_LOGIC ("handle " << next.key.m_ts);
292  m_currentTs = next.key.m_ts;
294  m_currentUid = next.key.m_uid;
295  next.impl->Invoke ();
296  next.impl->Unref ();
297 }
298 
299 bool
301 {
302  return m_globalFinished;
303 }
304 
305 bool
307 {
308  return m_events->IsEmpty () || m_stop;
309 }
310 
311 uint64_t
313 {
314  // If local MPI task is has no more events or stop was called
315  // next event time is infinity.
316  if (IsLocalFinished ()) {
318  } else {
319  Scheduler::Event ev = m_events->PeekNext ();
320  return ev.key.m_ts;
321  }
322 }
323 
324 Time
326 {
327  return TimeStep (NextTs ());
328 }
329 
330 void
332 {
333 #ifdef NS3_MPI
335  m_stop = false;
336  while (!m_globalFinished)
337  {
338  Time nextTime = Next ();
339 
340  // If local event is beyond grantedTime then need to synchronize
341  // with other tasks to determine new time window. If local task
342  // is finished then continue to participate in allgather
343  // synchronizations with other tasks until all tasks have
344  // completed.
345  if (nextTime > m_grantedTime || IsLocalFinished () )
346  {
347 
348  // Can't process next event, calculate a new LBTS
349  // First receive any pending messages
351  // reset next time
352  nextTime = Next ();
353  // And check for send completes
355  // Finally calculate the lbts
357  m_pLBTS[m_myId] = lMsg;
358  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
359  sizeof (LbtsMessage), MPI_BYTE, MPI_COMM_WORLD);
360  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
361  // The totRx and totTx counts insure there are no transient
362  // messages; If totRx != totTx, there are transients,
363  // so we don't update the granted time.
364  uint32_t totRx = m_pLBTS[0].GetRxCount ();
365  uint32_t totTx = m_pLBTS[0].GetTxCount ();
367 
368  for (uint32_t i = 1; i < m_systemCount; ++i)
369  {
370  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
371  {
372  smallestTime = m_pLBTS[i].GetSmallestTime ();
373  }
374  totRx += m_pLBTS[i].GetRxCount ();
375  totTx += m_pLBTS[i].GetTxCount ();
377  }
378  if (totRx == totTx)
379  {
380  // If lookahead is infinite then granted time should be as well.
381  // Covers the edge case if all the tasks have no inter tasks
382  // links, prevents overflow of granted time.
384  {
386  }
387  else
388  {
389  // Overflow is possible here if near end of representable time.
390  m_grantedTime = smallestTime + m_lookAhead;
391  }
392  }
393  }
394 
395  // Execute next event if it is within the current time window.
396  // Local task may be completed.
397  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
398  { // Safe to process
399  ProcessOneEvent ();
400  }
401  }
402 
403  // If the simulator stopped naturally by lack of events, make a
404  // consistency test to check that we didn't lose any events along the way.
405  NS_ASSERT (!m_events->IsEmpty () || m_unscheduledEvents == 0);
406 #else
407  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
408 #endif
409 }
410 
412 {
413  return m_myId;
414 }
415 
416 void
418 {
419  m_stop = true;
420 }
421 
422 void
424 {
426 }
427 
428 //
429 // Schedule an event for a _relative_ time in the future.
430 //
431 EventId
433 {
434  Time tAbsolute = time + TimeStep (m_currentTs);
435 
436  NS_ASSERT (tAbsolute.IsPositive ());
437  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
438  Scheduler::Event ev;
439  ev.impl = event;
440  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
441  ev.key.m_context = GetContext ();
442  ev.key.m_uid = m_uid;
443  m_uid++;
445  m_events->Insert (ev);
446  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
447 }
448 
449 void
450 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time, EventImpl *event)
451 {
452  NS_LOG_FUNCTION (this << context << time.GetTimeStep () << m_currentTs << event);
453 
454  Scheduler::Event ev;
455  ev.impl = event;
456  ev.key.m_ts = m_currentTs + time.GetTimeStep ();
457  ev.key.m_context = context;
458  ev.key.m_uid = m_uid;
459  m_uid++;
461  m_events->Insert (ev);
462 }
463 
464 EventId
466 {
467  Scheduler::Event ev;
468  ev.impl = event;
469  ev.key.m_ts = m_currentTs;
470  ev.key.m_context = GetContext ();
471  ev.key.m_uid = m_uid;
472  m_uid++;
474  m_events->Insert (ev);
475  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
476 }
477 
478 EventId
480 {
481  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
482  m_destroyEvents.push_back (id);
483  m_uid++;
484  return id;
485 }
486 
487 Time
489 {
490  return TimeStep (m_currentTs);
491 }
492 
493 Time
495 {
496  if (IsExpired (id))
497  {
498  return TimeStep (0);
499  }
500  else
501  {
502  return TimeStep (id.GetTs () - m_currentTs);
503  }
504 }
505 
506 void
508 {
509  if (id.GetUid () == 2)
510  {
511  // destroy events.
512  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
513  {
514  if (*i == id)
515  {
516  m_destroyEvents.erase (i);
517  break;
518  }
519  }
520  return;
521  }
522  if (IsExpired (id))
523  {
524  return;
525  }
526  Scheduler::Event event;
527  event.impl = id.PeekEventImpl ();
528  event.key.m_ts = id.GetTs ();
529  event.key.m_context = id.GetContext ();
530  event.key.m_uid = id.GetUid ();
531  m_events->Remove (event);
532  event.impl->Cancel ();
533  // whenever we remove an event from the event list, we have to unref it.
534  event.impl->Unref ();
535 
537 }
538 
539 void
541 {
542  if (!IsExpired (id))
543  {
544  id.PeekEventImpl ()->Cancel ();
545  }
546 }
547 
548 bool
550 {
551  if (ev.GetUid () == 2)
552  {
553  if (ev.PeekEventImpl () == 0
554  || ev.PeekEventImpl ()->IsCancelled ())
555  {
556  return true;
557  }
558  // destroy events.
559  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
560  {
561  if (*i == ev)
562  {
563  return false;
564  }
565  }
566  return true;
567  }
568  if (ev.PeekEventImpl () == 0
569  || ev.GetTs () < m_currentTs
570  || (ev.GetTs () == m_currentTs
571  && ev.GetUid () <= m_currentUid)
572  || ev.PeekEventImpl ()->IsCancelled ())
573  {
574  return true;
575  }
576  else
577  {
578  return false;
579  }
580 }
581 
582 Time
584 {
587  return TimeStep (0x7fffffffffffffffLL);
588 }
589 
590 uint32_t
592 {
593  return m_currentContext;
594 }
595 
596 } // namespace ns3
Time Get(void) const
keep track of time values and allow control of global simulation resolution
Definition: nstime.h:81
smart pointer class similar to boost::intrusive_ptr
Definition: ptr.h:59
bool IsPositive(void) const
Definition: nstime.h:236
#define NS_LOG_FUNCTION(parameters)
Definition: log.h:311
virtual void SetScheduler(ObjectFactory schedulerFactory)
std::vector< Ptr< Node > >::const_iterator Iterator
Time TimeStep(uint64_t ts)
Definition: nstime.h:817
EventImpl * impl
Definition: scheduler.h:72
#define NS_ASSERT(condition)
Definition: assert.h:64
virtual EventId ScheduleDestroy(EventImpl *event)
virtual Time GetMaximumSimulationTime(void) const
NS_LOG_COMPONENT_DEFINE("DistributedSimulatorImpl")
Iterator End(void) const
Get an iterator which indicates past-the-last Node in the container.
bool IsCancelled(void)
Definition: event-impl.cc:57
virtual void DoDispose(void)
Definition: object.cc:335
static EventId Schedule(Time const &time, MEM mem_ptr, OBJ obj)
Definition: simulator.h:824
EventImpl * PeekEventImpl(void) const
Definition: event-id.cc:65
uint32_t GetSystemId(void) const
Definition: node.cc:110
#define NS_FATAL_ERROR(msg)
fatal error handling
Definition: fatal-error.h:72
void Invoke(void)
Definition: event-impl.cc:40
double GetSeconds(void) const
Definition: nstime.h:266
static uint32_t GetTxCount()
virtual Time GetDelayLeft(const EventId &id) const
static void Destroy()
hold objects of type ns3::Time
Definition: nstime.h:828
Ptr< Object > Create(void) const
virtual EventId ScheduleNow(EventImpl *event)
void Unref(void) const
NS_OBJECT_ENSURE_REGISTERED(AntennaModel)
static uint32_t GetRxCount()
uint32_t GetUid(void) const
Definition: event-id.cc:83
Maintain the event list.
Definition: scheduler.h:57
static void ReceiveMessages()
static void TestSendComplete()
#define NS_LOG_LOGIC(msg)
Definition: log.h:334
virtual void Remove(const EventId &ev)
keep track of a set of node pointers.
Iterator Begin(void) const
Get an iterator which refers to the first Node in the container.
virtual void ScheduleWithContext(uint32_t context, Time const &time, EventImpl *event)
int64_t GetTimeStep(void) const
Definition: nstime.h:314
int64_t GetInteger(void) const
Definition: nstime.h:322
virtual uint32_t GetSystemId(void) const
virtual uint32_t GetContext(void) const
Structure used for all-reduce LBTS computation.
static NodeContainer GetGlobal(void)
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
virtual void Cancel(const EventId &ev)
instantiate subclasses of ns3::Object.
a simulation event
Definition: event-impl.h:39
virtual EventId Schedule(Time const &time, EventImpl *event)
an identifier for simulation events.
Definition: event-id.h:46
static uint32_t GetSystemId()
static void Stop(void)
Definition: simulator.cc:165
#define NS_UNUSED(x)
Definition: unused.h:5
a base class which provides memory management and object aggregation
Definition: object.h:63
a unique identifier for an interface.
Definition: type-id.h:49
TypeId SetParent(TypeId tid)
Definition: type-id.cc:610
static uint32_t GetSize()
uint64_t GetTs(void) const
Definition: event-id.cc:71
virtual bool IsExpired(const EventId &ev) const