A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
null-message-mpi-interface.cc
Go to the documentation of this file.
1/*
2 * Copyright 2013. Lawrence Livermore National Security, LLC.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation;
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 *
17 * Author: Steven Smith <smith84@llnl.gov>
18 *
19 */
20
21/**
22 * \file
23 * \ingroup mpi
24 * Implementation of classes ns3::NullMessageSentBuffer and ns3::NullMessageMpiInterface.
25 */
26
28
29#include "mpi-receiver.h"
33
34#include "ns3/log.h"
35#include "ns3/net-device.h"
36#include "ns3/node-list.h"
37#include "ns3/node.h"
38#include "ns3/nstime.h"
39#include "ns3/simulator.h"
40
41#include <iomanip>
42#include <iostream>
43#include <list>
44#include <mpi.h>
45
46namespace ns3
47{
48
49NS_LOG_COMPONENT_DEFINE("NullMessageMpiInterface");
50
51NS_OBJECT_ENSURE_REGISTERED(NullMessageMpiInterface);
52
53/**
54 * \ingroup mpi
55 *
56 * \brief Non-blocking send buffers for Null Message implementation.
57 *
58 * One buffer is allocated for each non-blocking send.
59 */
61{
62 public:
65
66 /**
67 * \return pointer to sent buffer
68 */
69 uint8_t* GetBuffer();
70 /**
71 * \param buffer pointer to sent buffer
72 */
73 void SetBuffer(uint8_t* buffer);
74 /**
75 * \return MPI request
76 */
77 MPI_Request* GetRequest();
78
79 private:
80 /**
81 * Buffer for send.
82 */
83 uint8_t* m_buffer;
84
85 /**
86 * MPI request posted for the send.
87 */
88 MPI_Request m_request;
89};
90
91/**
92 * maximum MPI message size for easy
93 * buffer creation
94 */
96
98{
99 m_buffer = nullptr;
100 m_request = MPI_REQUEST_NULL;
101}
102
104{
105 delete[] m_buffer;
106}
107
108uint8_t*
110{
111 return m_buffer;
112}
113
114void
116{
117 m_buffer = buffer;
118}
119
120MPI_Request*
122{
123 return &m_request;
124}
125
131
132std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
133
134MPI_Comm NullMessageMpiInterface::g_communicator = MPI_COMM_WORLD;
138
139TypeId
141{
142 static TypeId tid =
143 TypeId("ns3::NullMessageMpiInterface").SetParent<Object>().SetGroupName("Mpi");
144 return tid;
145}
146
148{
149 NS_LOG_FUNCTION(this);
150}
151
153{
154 NS_LOG_FUNCTION(this);
155}
156
157void
159{
160 NS_LOG_FUNCTION(this);
161}
162
165{
167 return g_sid;
168}
169
172{
174 return g_size;
175}
176
177MPI_Comm
179{
181 return g_communicator;
182}
183
184bool
186{
187 return g_enabled;
188}
189
190void
191NullMessageMpiInterface::Enable(int* pargc, char*** pargv)
192{
193 NS_LOG_FUNCTION(this << *pargc);
194
195 NS_ASSERT(g_enabled == false);
196
197 // Initialize the MPI interface
198 MPI_Init(pargc, pargv);
199 Enable(MPI_COMM_WORLD);
200 g_mpiInitCalled = true;
201}
202
203void
205{
206 NS_LOG_FUNCTION(this);
207
208 NS_ASSERT(g_enabled == false);
209
210 // Standard MPI practice is to duplicate the communicator for
211 // library to use. Library communicates in isolated communication
212 // context.
213 MPI_Comm_dup(communicator, &g_communicator);
214 g_freeCommunicator = true;
215
216 // SystemId and Size are unit32_t in interface but MPI uses int so convert.
217 int mpiSystemId;
218 int mpiSize;
219 MPI_Comm_rank(g_communicator, &mpiSystemId);
220 MPI_Comm_size(g_communicator, &mpiSize);
221
222 g_sid = mpiSystemId;
223 g_size = mpiSize;
224
225 g_enabled = true;
226
227 MPI_Barrier(g_communicator);
228}
229
230void
232{
235
237
238 // Post a non-blocking receive for all peers
239 g_requests = new MPI_Request[g_numNeighbors];
240 g_pRxBuffers = new char*[g_numNeighbors];
241 int index = 0;
242 for (uint32_t rank = 0; rank < g_size; ++rank)
243 {
245 if (bundle)
246 {
248 MPI_Irecv(g_pRxBuffers[index],
250 MPI_CHAR,
251 rank,
252 0,
254 &g_requests[index]);
255 ++index;
256 }
257 }
258}
259
260void
262{
263 NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
264
266
267 // Find the system id for the destination node
268 Ptr<Node> destNode = NodeList::GetNode(node);
269 uint32_t nodeSysId = destNode->GetSystemId();
270
271 NullMessageSentBuffer sendBuf;
272 g_pendingTx.push_back(sendBuf);
273 auto iter = g_pendingTx.rbegin(); // Points to the last element
274
275 uint32_t serializedSize = p->GetSerializedSize();
276 uint32_t bufferSize = serializedSize + (2 * sizeof(uint64_t)) + (2 * sizeof(uint32_t));
277 auto buffer = new uint8_t[bufferSize];
278 iter->SetBuffer(buffer);
279 // Add the time, dest node and dest device
280 uint64_t t = rxTime.GetInteger();
281 auto pTime = reinterpret_cast<uint64_t*>(buffer);
282 *pTime++ = t;
283
284 Time guarantee_update =
286 *pTime++ = guarantee_update.GetTimeStep();
287
288 auto pData = reinterpret_cast<uint32_t*>(pTime);
289 *pData++ = node;
290 *pData++ = dev;
291 // Serialize the packet
292 p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
293
294 MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
295 bufferSize,
296 MPI_CHAR,
297 nodeSysId,
298 0,
300 (iter->GetRequest()));
301
303}
304
305void
308{
309 NS_LOG_FUNCTION(guarantee_update.GetTimeStep() << bundle);
310
312
313 NullMessageSentBuffer sendBuf;
314 g_pendingTx.push_back(sendBuf);
315 auto iter = g_pendingTx.rbegin(); // Points to the last element
316
317 uint32_t bufferSize = 2 * sizeof(uint64_t) + 2 * sizeof(uint32_t);
318 auto buffer = new uint8_t[bufferSize];
319 iter->SetBuffer(buffer);
320 // Add the time, dest node and dest device
321 auto pTime = reinterpret_cast<uint64_t*>(buffer);
322 *pTime++ = 0;
323 *pTime++ = guarantee_update.GetInteger();
324 auto pData = reinterpret_cast<uint32_t*>(pTime);
325 *pData++ = 0;
326 *pData++ = 0;
327
328 // Find the system id for the destination MPI rank
329 uint32_t nodeSysId = bundle->GetSystemId();
330
331 MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
332 bufferSize,
333 MPI_CHAR,
334 nodeSysId,
335 0,
337 (iter->GetRequest()));
338}
339
340void
342{
344
345 ReceiveMessages(true);
346}
347
348void
350{
352
353 ReceiveMessages(false);
354}
355
356void
358{
359 NS_LOG_FUNCTION(blocking);
360
362
363 // stop flag set to true when no more messages are found to
364 // process.
365 bool stop = false;
366
367 if (!g_numNeighbors)
368 {
369 // Not communicating with anyone.
370 return;
371 }
372
373 do
374 {
375 int messageReceived = 0;
376 int index = 0;
377 MPI_Status status;
378
379 if (blocking)
380 {
381 MPI_Waitany(g_numNeighbors, g_requests, &index, &status);
382 messageReceived = 1; /* Wait always implies message was received */
383 stop = true;
384 }
385 else
386 {
387 MPI_Testany(g_numNeighbors, g_requests, &index, &messageReceived, &status);
388 }
389
390 if (messageReceived)
391 {
392 int count;
393 MPI_Get_count(&status, MPI_CHAR, &count);
394
395 // Get the meta data first
396 auto pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
397 uint64_t time = *pTime++;
398 uint64_t guaranteeUpdate = *pTime++;
399
400 auto pData = reinterpret_cast<uint32_t*>(pTime);
401 uint32_t node = *pData++;
402 uint32_t dev = *pData++;
403
404 Time rxTime(time);
405
406 // rxtime == 0 means this is a Null Message
407 if (rxTime > Time(0))
408 {
409 count -= sizeof(time) + sizeof(guaranteeUpdate) + sizeof(node) + sizeof(dev);
410
411 Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
412
413 // Find the correct node/device to schedule receive event
414 Ptr<Node> pNode = NodeList::GetNode(node);
415 Ptr<MpiReceiver> pMpiRec = nullptr;
416 uint32_t nDevices = pNode->GetNDevices();
417 for (uint32_t i = 0; i < nDevices; ++i)
418 {
419 Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
420 if (pThisDev->GetIfIndex() == dev)
421 {
422 pMpiRec = pThisDev->GetObject<MpiReceiver>();
423 break;
424 }
425 }
426 NS_ASSERT(pNode && pMpiRec);
427
428 // Schedule the rx event
429 Simulator::ScheduleWithContext(pNode->GetId(),
430 rxTime - Simulator::Now(),
432 pMpiRec,
433 p);
434 }
435
436 // Update guarantee time for both packet receives and Null Messages.
438 NS_ASSERT(bundle);
439
440 bundle->SetGuaranteeTime(Time(guaranteeUpdate));
441
442 // Re-queue the next read
443 MPI_Irecv(g_pRxBuffers[index],
445 MPI_CHAR,
446 status.MPI_SOURCE,
447 0,
449 &g_requests[index]);
450 }
451 else
452 {
453 // if non-blocking and no message received in testany then stop message loop
454 stop = true;
455 }
456 } while (!stop);
457}
458
459void
461{
463
465
466 auto iter = g_pendingTx.begin();
467 while (iter != g_pendingTx.end())
468 {
469 MPI_Status status;
470 int flag = 0;
471 MPI_Test(iter->GetRequest(), &flag, &status);
472 auto current = iter; // Save current for erasing
473 ++iter; // Advance to next
474 if (flag)
475 { // This message is complete
476 g_pendingTx.erase(current);
477 }
478 }
479}
480
481void
483{
484 NS_LOG_FUNCTION(this);
485
486 if (g_enabled)
487 {
488 for (auto iter = g_pendingTx.begin(); iter != g_pendingTx.end(); ++iter)
489 {
490 MPI_Cancel(iter->GetRequest());
491 MPI_Request_free(iter->GetRequest());
492 }
493
494 for (uint32_t i = 0; i < g_numNeighbors; ++i)
495 {
496 MPI_Cancel(&g_requests[i]);
497 MPI_Request_free(&g_requests[i]);
498 }
499
500 for (uint32_t i = 0; i < g_numNeighbors; ++i)
501 {
502 delete[] g_pRxBuffers[i];
503 }
504 delete[] g_pRxBuffers;
505 delete[] g_requests;
506
507 g_pendingTx.clear();
508
510 {
511 MPI_Comm_free(&g_communicator);
512 g_freeCommunicator = false;
513 }
514
515 if (g_mpiInitCalled)
516 {
517 int flag = 0;
518 MPI_Initialized(&flag);
519 if (flag)
520 {
521 MPI_Finalize();
522 }
523 else
524 {
525 NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
526 }
527 }
528
529 g_enabled = false;
530 g_mpiInitCalled = false;
531 }
532 else
533 {
534 NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
535 }
536}
537
538} // namespace ns3
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:48
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:51
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:251
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void Destroy() override
Deletes storage used by the parallel environment.
static void ReceiveMessagesBlocking()
Blocking message receive.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
bool IsEnabled() override
Returns enabled state of parallel environment.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static TypeId GetTypeId()
Register this type.
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static MPI_Request * g_requests
Pending non-blocking receives.
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
static void TestSendComplete()
Check for completed sends.
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
static bool g_enabled
Has this interface been enabled.
static char ** g_pRxBuffers
Data buffers for non-blocking receives.
static void InitializeSendReceiveBuffers()
Initialize send and receive buffers.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t g_size
Size of the MPI COM_WORLD group.
void Disable() override
Clean up the ns-3 parallel communications interface.
static std::list< NullMessageSentBuffer > g_pendingTx
List of pending non-blocking sends.
static bool g_freeCommunicator
Did we create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
static uint32_t g_numNeighbors
Number of neighbor tasks, tasks that this task shares a link with.
Non-blocking send buffers for Null Message implementation.
MPI_Request m_request
MPI request posted for the send.
uint8_t * m_buffer
Buffer for send.
static NullMessageSimulatorImpl * GetInstance()
Time CalculateGuaranteeTime(uint32_t systemId)
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
A base class which provides memory management and object aggregation.
Definition: object.h:89
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:77
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
Get the bundle corresponding to a remote rank.
static std::size_t Size()
Get the number of ns-3 channels in this bundle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:588
static Time Now()
Return the current simulation virtual time.
Definition: simulator.cc:208
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:455
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:445
a unique identifier for an interface.
Definition: type-id.h:59
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:932
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:179
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
ns3::MpiReceiver declaration, provides an interface to aggregate to MPI-compatible NetDevices.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
Declaration of classes ns3::NullMessageSentBuffer and ns3::NullMessageMpiInterface.
Declaration of class ns3::NullMessageSimulatorImpl.
Declaration of class ns3::RemoteChannelBundleManager.
Declaration of class ns3::RemoteChannelBundle.