A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
distributed-simulator-impl.cc
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License version 2 as
4 * published by the Free Software Foundation;
5 *
6 * This program is distributed in the hope that it will be useful,
7 * but WITHOUT ANY WARRANTY; without even the implied warranty of
8 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 * GNU General Public License for more details.
10 *
11 * You should have received a copy of the GNU General Public License
12 * along with this program; if not, write to the Free Software
13 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14 *
15 * Author: George Riley <riley@ece.gatech.edu>
16 *
17 */
18
26
28#include "mpi-interface.h"
29
30#include "ns3/assert.h"
31#include "ns3/channel.h"
32#include "ns3/event-impl.h"
33#include "ns3/log.h"
34#include "ns3/node-container.h"
35#include "ns3/pointer.h"
36#include "ns3/ptr.h"
37#include "ns3/scheduler.h"
38#include "ns3/simulator.h"
39
40#include <cmath>
41#include <mpi.h>
42
43namespace ns3
44{
45
46NS_LOG_COMPONENT_DEFINE("DistributedSimulatorImpl");
47
48NS_OBJECT_ENSURE_REGISTERED(DistributedSimulatorImpl);
49
51{
52}
53
54Time
56{
57 return m_smallestTime;
58}
59
62{
63 return m_txCount;
64}
65
68{
69 return m_rxCount;
70}
71
74{
75 return m_myId;
76}
77
78bool
80{
81 return m_isFinished;
82}
83
90
93{
94 static TypeId tid = TypeId("ns3::DistributedSimulatorImpl")
96 .SetGroupName("Mpi")
97 .AddConstructor<DistributedSimulatorImpl>();
98 return tid;
99}
100
102{
103 NS_LOG_FUNCTION(this);
104
107
108 // Allocate the LBTS message buffer
111
112 m_stop = false;
113 m_globalFinished = false;
116 m_currentTs = 0;
119 m_eventCount = 0;
120 m_events = nullptr;
121}
122
124{
125 NS_LOG_FUNCTION(this);
126}
127
128void
130{
131 NS_LOG_FUNCTION(this);
132
133 while (!m_events->IsEmpty())
134 {
135 Scheduler::Event next = m_events->RemoveNext();
136 next.impl->Unref();
137 }
138 m_events = nullptr;
139 delete[] m_pLBTS;
141}
142
143void
145{
146 NS_LOG_FUNCTION(this);
147
148 while (!m_destroyEvents.empty())
149 {
150 Ptr<EventImpl> ev = m_destroyEvents.front().PeekEventImpl();
151 m_destroyEvents.pop_front();
152 NS_LOG_LOGIC("handle destroy " << ev);
153 if (!ev->IsCancelled())
154 {
155 ev->Invoke();
156 }
157 }
158
160}
161
162void
164{
165 NS_LOG_FUNCTION(this);
166
167 /* If running sequential simulation can ignore lookahead */
168 if (MpiInterface::GetSize() <= 1)
169 {
170 m_lookAhead = Seconds(0);
171 }
172 else
173 {
175 for (auto iter = c.Begin(); iter != c.End(); ++iter)
176 {
177 if ((*iter)->GetSystemId() != MpiInterface::GetSystemId())
178 {
179 continue;
180 }
181
182 for (uint32_t i = 0; i < (*iter)->GetNDevices(); ++i)
183 {
184 Ptr<NetDevice> localNetDevice = (*iter)->GetDevice(i);
185 // only works for p2p links currently
186 if (!localNetDevice->IsPointToPoint())
187 {
188 continue;
189 }
190 Ptr<Channel> channel = localNetDevice->GetChannel();
191 if (!channel)
192 {
193 continue;
194 }
195
196 // grab the adjacent node
197 Ptr<Node> remoteNode;
198 if (channel->GetDevice(0) == localNetDevice)
199 {
200 remoteNode = (channel->GetDevice(1))->GetNode();
201 }
202 else
203 {
204 remoteNode = (channel->GetDevice(0))->GetNode();
205 }
206
207 // if it's not remote, don't consider it
208 if (remoteNode->GetSystemId() == MpiInterface::GetSystemId())
209 {
210 continue;
211 }
212
213 // compare delay on the channel with current value of
214 // m_lookAhead. if delay on channel is smaller, make
215 // it the new lookAhead.
216 TimeValue delay;
217 channel->GetAttribute("Delay", delay);
218
219 if (delay.Get() < m_lookAhead)
220 {
221 m_lookAhead = delay.Get();
222 }
223 }
224 }
225 }
226
227 // m_lookAhead is now set
229
230 /*
231 * Compute the maximum inter-task latency and use that value
232 * for tasks with no inter-task links.
233 *
234 * Special processing for edge cases. For tasks that have no
235 * nodes need to determine a reasonable lookAhead value. Infinity
236 * would work correctly but introduces a performance issue; tasks
237 * with an infinite lookAhead would execute all their events
238 * before doing an AllGather resulting in very bad load balance
239 * during the first time window. Since all tasks participate in
240 * the AllGather it is desirable to have all the tasks advance in
241 * simulation time at a similar rate assuming roughly equal events
242 * per unit of simulation time in order to equalize the amount of
243 * work per time window.
244 */
245 long sendbuf;
246 long recvbuf;
247
248 /* Tasks with no inter-task links do not contribute to max */
250 {
251 sendbuf = 0;
252 }
253 else
254 {
255 sendbuf = m_lookAhead.GetInteger();
256 }
257
258 MPI_Allreduce(&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MpiInterface::GetCommunicator());
259
260 /* For nodes that did not compute a lookahead use max from ranks
261 * that did compute a value. An edge case occurs if all nodes have
262 * no inter-task links (max will be 0 in this case). Use infinity so all tasks
263 * will proceed without synchronization until a single AllGather
264 * occurs when all tasks have finished.
265 */
266 if (m_lookAhead == GetMaximumSimulationTime() && recvbuf != 0)
267 {
268 m_lookAhead = Time(recvbuf);
270 }
271}
272
273void
275{
276 if (lookAhead > Time(0))
277 {
278 NS_LOG_FUNCTION(this << lookAhead);
279 m_lookAhead = Min(m_lookAhead, lookAhead);
280 }
281 else
282 {
283 NS_LOG_WARN("attempted to set lookahead to a negative time: " << lookAhead);
284 }
285}
286
287void
289{
290 NS_LOG_FUNCTION(this << schedulerFactory);
291
292 Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler>();
293
294 if (m_events)
295 {
296 while (!m_events->IsEmpty())
297 {
298 Scheduler::Event next = m_events->RemoveNext();
299 scheduler->Insert(next);
300 }
301 }
302 m_events = scheduler;
303}
304
305void
307{
308 NS_LOG_FUNCTION(this);
309
310 Scheduler::Event next = m_events->RemoveNext();
311
312 PreEventHook(EventId(next.impl, next.key.m_ts, next.key.m_context, next.key.m_uid));
313
314 NS_ASSERT(next.key.m_ts >= m_currentTs);
316 m_eventCount++;
317
318 NS_LOG_LOGIC("handle " << next.key.m_ts);
319 m_currentTs = next.key.m_ts;
321 m_currentUid = next.key.m_uid;
322 next.impl->Invoke();
323 next.impl->Unref();
324}
325
326bool
328{
329 return m_globalFinished;
330}
331
332bool
334{
335 return m_events->IsEmpty() || m_stop;
336}
337
338uint64_t
340{
341 // If local MPI task is has no more events or stop was called
342 // next event time is infinity.
343 if (IsLocalFinished())
344 {
346 }
347 else
348 {
349 Scheduler::Event ev = m_events->PeekNext();
350 return ev.key.m_ts;
351 }
352}
353
354Time
356{
357 return TimeStep(NextTs());
358}
359
360void
362{
363 NS_LOG_FUNCTION(this);
364
366 m_stop = false;
367 m_globalFinished = false;
368 while (!m_globalFinished)
369 {
370 Time nextTime = Next();
371
372 // If local event is beyond grantedTime then need to synchronize
373 // with other tasks to determine new time window. If local task
374 // is finished then continue to participate in allgather
375 // synchronizations with other tasks until all tasks have
376 // completed.
377 if (nextTime > m_grantedTime || IsLocalFinished())
378 {
379 // Can't process next event, calculate a new LBTS
380 // First receive any pending messages
382 // reset next time
383 nextTime = Next();
384 // And check for send completes
386 // Finally calculate the lbts
389 m_myId,
391 nextTime);
392 m_pLBTS[m_myId] = lMsg;
393 MPI_Allgather(&lMsg,
394 sizeof(LbtsMessage),
395 MPI_BYTE,
396 m_pLBTS,
397 sizeof(LbtsMessage),
398 MPI_BYTE,
400 Time smallestTime = m_pLBTS[0].GetSmallestTime();
401 // The totRx and totTx counts insure there are no transient
402 // messages; If totRx != totTx, there are transients,
403 // so we don't update the granted time.
404 uint32_t totRx = m_pLBTS[0].GetRxCount();
405 uint32_t totTx = m_pLBTS[0].GetTxCount();
407
408 for (uint32_t i = 1; i < m_systemCount; ++i)
409 {
410 if (m_pLBTS[i].GetSmallestTime() < smallestTime)
411 {
412 smallestTime = m_pLBTS[i].GetSmallestTime();
413 }
414 totRx += m_pLBTS[i].GetRxCount();
415 totTx += m_pLBTS[i].GetTxCount();
417 }
418
419 // Global halting condition is all nodes have empty queue's and
420 // no messages are in-flight.
421 m_globalFinished &= totRx == totTx;
422
423 if (totRx == totTx)
424 {
425 // If lookahead is infinite then granted time should be as well.
426 // Covers the edge case if all the tasks have no inter tasks
427 // links, prevents overflow of granted time.
429 {
431 }
432 else
433 {
434 // Overflow is possible here if near end of representable time.
435 m_grantedTime = smallestTime + m_lookAhead;
436 }
437 }
438 }
439
440 // Execute next event if it is within the current time window.
441 // Local task may be completed.
442 if ((nextTime <= m_grantedTime) && (!IsLocalFinished()))
443 { // Safe to process
445 }
446 }
447
448 // If the simulator stopped naturally by lack of events, make a
449 // consistency test to check that we didn't lose any events along the way.
450 NS_ASSERT(!m_events->IsEmpty() || m_unscheduledEvents == 0);
451}
452
455{
456 return m_myId;
457}
458
459void
461{
462 NS_LOG_FUNCTION(this);
463
464 m_stop = true;
465}
466
469{
470 NS_LOG_FUNCTION(this << delay.GetTimeStep());
471
472 return Simulator::Schedule(delay, &Simulator::Stop);
473}
474
475//
476// Schedule an event for a _relative_ time in the future.
477//
480{
481 NS_LOG_FUNCTION(this << delay.GetTimeStep() << event);
482
483 Time tAbsolute = delay + TimeStep(m_currentTs);
484
485 NS_ASSERT(tAbsolute.IsPositive());
486 NS_ASSERT(tAbsolute >= TimeStep(m_currentTs));
488 ev.impl = event;
489 ev.key.m_ts = static_cast<uint64_t>(tAbsolute.GetTimeStep());
490 ev.key.m_context = GetContext();
491 ev.key.m_uid = m_uid;
492 m_uid++;
494 m_events->Insert(ev);
495 return EventId(event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
496}
497
498void
500{
501 NS_LOG_FUNCTION(this << context << delay.GetTimeStep() << m_currentTs << event);
502
504 ev.impl = event;
505 ev.key.m_ts = m_currentTs + delay.GetTimeStep();
506 ev.key.m_context = context;
507 ev.key.m_uid = m_uid;
508 m_uid++;
510 m_events->Insert(ev);
511}
512
515{
516 NS_LOG_FUNCTION(this << event);
517 return Schedule(Time(0), event);
518}
519
522{
523 NS_LOG_FUNCTION(this << event);
524
525 EventId id(Ptr<EventImpl>(event, false), m_currentTs, 0xffffffff, 2);
526 m_destroyEvents.push_back(id);
527 m_uid++;
528 return id;
529}
530
531Time
533{
534 return TimeStep(m_currentTs);
535}
536
537Time
539{
540 if (IsExpired(id))
541 {
542 return TimeStep(0);
543 }
544 else
545 {
546 return TimeStep(id.GetTs() - m_currentTs);
547 }
548}
549
550void
552{
553 if (id.GetUid() == EventId::UID::DESTROY)
554 {
555 // destroy events.
556 for (auto i = m_destroyEvents.begin(); i != m_destroyEvents.end(); i++)
557 {
558 if (*i == id)
559 {
560 m_destroyEvents.erase(i);
561 break;
562 }
563 }
564 return;
565 }
566 if (IsExpired(id))
567 {
568 return;
569 }
570 Scheduler::Event event;
571 event.impl = id.PeekEventImpl();
572 event.key.m_ts = id.GetTs();
573 event.key.m_context = id.GetContext();
574 event.key.m_uid = id.GetUid();
575 m_events->Remove(event);
576 event.impl->Cancel();
577 // whenever we remove an event from the event list, we have to unref it.
578 event.impl->Unref();
579
581}
582
583void
585{
586 if (!IsExpired(id))
587 {
588 id.PeekEventImpl()->Cancel();
589 }
590}
591
592bool
594{
595 if (id.GetUid() == EventId::UID::DESTROY)
596 {
597 if (id.PeekEventImpl() == nullptr || id.PeekEventImpl()->IsCancelled())
598 {
599 return true;
600 }
601 // destroy events.
602 for (auto i = m_destroyEvents.begin(); i != m_destroyEvents.end(); i++)
603 {
604 if (*i == id)
605 {
606 return false;
607 }
608 }
609 return true;
610 }
611 return id.PeekEventImpl() == nullptr || id.GetTs() < m_currentTs ||
612 (id.GetTs() == m_currentTs && id.GetUid() <= m_currentUid) ||
613 id.PeekEventImpl()->IsCancelled();
614}
615
616Time
618{
621 return TimeStep(0x7fffffffffffffffLL);
622}
623
626{
627 return m_currentContext;
628}
629
630uint64_t
632{
633 return m_eventCount;
634}
635
636} // namespace ns3
#define Min(a, b)
Distributed simulator implementation using lookahead.
EventId Schedule(const Time &delay, EventImpl *event) override
Schedule a future event execution (in the same context).
uint64_t GetEventCount() const override
Get the number of events executed.
void Remove(const EventId &id) override
Remove an event from the event list.
static TypeId GetTypeId()
Register this type.
DestroyEvents m_destroyEvents
The container of events to run at Destroy()
void ScheduleWithContext(uint32_t context, const Time &delay, EventImpl *event) override
Schedule a future event execution (in a different context).
EventId ScheduleNow(EventImpl *event) override
Schedule an event to run at the current virtual time.
uint64_t NextTs() const
Get the timestep of the next event.
EventId ScheduleDestroy(EventImpl *event) override
Schedule an event to run at the end of the simulation, after the Stop() time or condition has been re...
Time m_grantedTime
End of current window.
uint32_t m_currentContext
Execution context of the current event.
Time GetMaximumSimulationTime() const override
Get the maximum representable simulation time.
LbtsMessage * m_pLBTS
Container for Lbts messages, one per rank.
uint64_t m_currentTs
Timestamp of the current event.
uint32_t GetSystemId() const override
Get the system id of this simulator.
void SetScheduler(ObjectFactory schedulerFactory) override
Set the Scheduler to be used to manage the event list.
bool IsFinished() const override
Check if the simulation should finish.
Ptr< Scheduler > m_events
The event priority queue.
Time Next() const
Get the time of the next event, as returned by NextTs().
void CalculateLookAhead()
Calculate lookahead constraint based on network latency.
uint32_t GetContext() const override
Get the current simulation context.
bool m_globalFinished
Are all parallel instances completed.
uint32_t m_uid
Next event unique id.
void Run() override
Run the simulation.
void ProcessOneEvent()
Process the next event.
void Cancel(const EventId &id) override
Set the cancel bit on this event: the event's associated function will not be invoked when it expires...
void Stop() override
Tell the Simulator the calling event should be the last one executed.
int m_unscheduledEvents
Number of events that have been inserted but not yet scheduled, not counting the "destroy" events; th...
void Destroy() override
Execute the events scheduled with ScheduleDestroy().
Time GetDelayLeft(const EventId &id) const override
Get the remaining time until this event will execute.
uint32_t m_systemCount
MPI communicator size.
virtual void BoundLookAhead(const Time lookAhead)
Add additional bound to lookahead constraints.
bool IsExpired(const EventId &id) const override
Check if an event has already run or been cancelled.
Time Now() const override
Return the current simulation virtual time.
bool IsLocalFinished() const
Check if this rank is finished.
bool m_stop
Flag calling for the end of the simulation.
static Time m_lookAhead
Current window size.
uint32_t m_currentUid
Unique id of the current event.
void DoDispose() override
Destructor implementation.
An identifier for simulation events.
Definition: event-id.h:55
@ INVALID
Invalid UID value.
Definition: event-id.h:61
@ VALID
Schedule(), etc.
Definition: event-id.h:69
@ DESTROY
ScheduleDestroy() events.
Definition: event-id.h:65
A simulation event.
Definition: event-impl.h:46
void Invoke()
Called by the simulation engine to notify the event that it is time to execute.
Definition: event-impl.cc:47
static void ReceiveMessages()
Check for received messages complete.
static void TestSendComplete()
Check for completed sends.
Structure used for all-reduce LBTS computation.
uint32_t m_txCount
Count of transmitted messages.
uint32_t m_rxCount
Count of received messages.
uint32_t m_myId
System Id of the rank sending this LBTS.
Time m_smallestTime
Earliest next event timestamp.
bool m_isFinished
true when this rank has no more events.
static MPI_Comm GetCommunicator()
Return the communicator used to run ns-3.
static void Destroy()
Deletes storage used by the parallel environment.
static uint32_t GetSystemId()
Get the id number of this rank.
static uint32_t GetSize()
Get the number of ranks used by ns-3.
keep track of a set of node pointers.
Iterator End() const
Get an iterator which indicates past-the-last Node in the container.
static NodeContainer GetGlobal()
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
Iterator Begin() const
Get an iterator which refers to the first Node in the container.
Instantiate subclasses of ns3::Object.
Ptr< Object > Create() const
Create an Object instance of the configured TypeId.
virtual void DoDispose()
Destructor implementation.
Definition: object.cc:444
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:77
Maintain the event list.
Definition: scheduler.h:157
void Unref() const
Decrement the reference count.
static EventId Schedule(const Time &delay, FUNC f, Ts &&... args)
Schedule an event to expire after delay.
Definition: simulator.h:571
@ NO_CONTEXT
Flag for events not associated with any particular context.
Definition: simulator.h:210
static void Stop()
Tell the Simulator the calling event should be the last one executed.
Definition: simulator.cc:186
The SimulatorImpl base class.
virtual void PreEventHook(const EventId &id)
Hook called before processing each event.
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
bool IsPositive() const
Exactly equivalent to t >= 0.
Definition: nstime.h:333
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:455
static Time Max()
Maximum representable Time Not to be confused with Max(Time,Time).
Definition: nstime.h:297
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:445
AttributeValue implementation for Time.
Definition: nstime.h:1413
Time Get() const
Definition: time.cc:530
a unique identifier for an interface.
Definition: type-id.h:59
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:932
Declaration of classes ns3::LbtsMessage and ns3::DistributedSimulatorImpl.
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_LOGIC(msg)
Use NS_LOG to output a message of level LOG_LOGIC.
Definition: log.h:282
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition: log.h:261
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition: nstime.h:1326
Declaration of class ns3::MpiInterface.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
Scheduler event.
Definition: scheduler.h:184
EventKey key
Key for sorting and ordering Events.
Definition: scheduler.h:186
EventImpl * impl
Pointer to the event implementation.
Definition: scheduler.h:185
uint32_t m_context
Event context.
Definition: scheduler.h:173
uint64_t m_ts
Event time stamp.
Definition: scheduler.h:171
uint32_t m_uid
Event unique id.
Definition: scheduler.h:172