A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
null-message-mpi-interface.cc
Go to the documentation of this file.
1/*
2 * Copyright 2013. Lawrence Livermore National Security, LLC.
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 * Author: Steven Smith <smith84@llnl.gov>
7 *
8 */
9
10/**
11 * \file
12 * \ingroup mpi
13 * Implementation of classes ns3::NullMessageSentBuffer and ns3::NullMessageMpiInterface.
14 */
15
17
18#include "mpi-receiver.h"
22
23#include "ns3/log.h"
24#include "ns3/net-device.h"
25#include "ns3/node-list.h"
26#include "ns3/node.h"
27#include "ns3/nstime.h"
28#include "ns3/simulator.h"
29
30#include <iomanip>
31#include <iostream>
32#include <list>
33#include <mpi.h>
34
35namespace ns3
36{
37
38NS_LOG_COMPONENT_DEFINE("NullMessageMpiInterface");
39
40NS_OBJECT_ENSURE_REGISTERED(NullMessageMpiInterface);
41
42/**
43 * \ingroup mpi
44 *
45 * \brief Non-blocking send buffers for Null Message implementation.
46 *
47 * One buffer is allocated for each non-blocking send.
48 */
50{
51 public:
54
55 /**
56 * \return pointer to sent buffer
57 */
58 uint8_t* GetBuffer();
59 /**
60 * \param buffer pointer to sent buffer
61 */
62 void SetBuffer(uint8_t* buffer);
63 /**
64 * \return MPI request
65 */
66 MPI_Request* GetRequest();
67
68 private:
69 /**
70 * Buffer for send.
71 */
72 uint8_t* m_buffer;
73
74 /**
75 * MPI request posted for the send.
76 */
77 MPI_Request m_request;
78};
79
80/**
81 * maximum MPI message size for easy
82 * buffer creation
83 */
85
87{
88 m_buffer = nullptr;
89 m_request = MPI_REQUEST_NULL;
90}
91
96
97uint8_t*
102
103void
105{
106 m_buffer = buffer;
107}
108
109MPI_Request*
114
120
121std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
122
123MPI_Comm NullMessageMpiInterface::g_communicator = MPI_COMM_WORLD;
127
128TypeId
130{
131 static TypeId tid =
132 TypeId("ns3::NullMessageMpiInterface").SetParent<Object>().SetGroupName("Mpi");
133 return tid;
134}
135
140
145
146void
151
158
165
166MPI_Comm
172
173bool
178
179void
180NullMessageMpiInterface::Enable(int* pargc, char*** pargv)
181{
182 NS_LOG_FUNCTION(this << *pargc);
183
184 NS_ASSERT(g_enabled == false);
185
186 // Initialize the MPI interface
187 MPI_Init(pargc, pargv);
188 Enable(MPI_COMM_WORLD);
189 g_mpiInitCalled = true;
190}
191
192void
194{
195 NS_LOG_FUNCTION(this);
196
197 NS_ASSERT(g_enabled == false);
198
199 // Standard MPI practice is to duplicate the communicator for
200 // library to use. Library communicates in isolated communication
201 // context.
202 MPI_Comm_dup(communicator, &g_communicator);
203 g_freeCommunicator = true;
204
205 // SystemId and Size are unit32_t in interface but MPI uses int so convert.
206 int mpiSystemId;
207 int mpiSize;
208 MPI_Comm_rank(g_communicator, &mpiSystemId);
209 MPI_Comm_size(g_communicator, &mpiSize);
210
211 g_sid = mpiSystemId;
212 g_size = mpiSize;
213
214 g_enabled = true;
215
216 MPI_Barrier(g_communicator);
217}
218
219void
221{
224
226
227 // Post a non-blocking receive for all peers
228 g_requests = new MPI_Request[g_numNeighbors];
229 g_pRxBuffers = new char*[g_numNeighbors];
230 int index = 0;
231 for (uint32_t rank = 0; rank < g_size; ++rank)
232 {
234 if (bundle)
235 {
237 MPI_Irecv(g_pRxBuffers[index],
239 MPI_CHAR,
240 rank,
241 0,
243 &g_requests[index]);
244 ++index;
245 }
246 }
247}
248
249void
251{
252 NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
253
255
256 // Find the system id for the destination node
257 Ptr<Node> destNode = NodeList::GetNode(node);
258 uint32_t nodeSysId = destNode->GetSystemId();
259
260 NullMessageSentBuffer sendBuf;
261 g_pendingTx.push_back(sendBuf);
262 auto iter = g_pendingTx.rbegin(); // Points to the last element
263
264 uint32_t serializedSize = p->GetSerializedSize();
265 uint32_t bufferSize = serializedSize + (2 * sizeof(uint64_t)) + (2 * sizeof(uint32_t));
266 auto buffer = new uint8_t[bufferSize];
267 iter->SetBuffer(buffer);
268 // Add the time, dest node and dest device
269 uint64_t t = rxTime.GetInteger();
270 auto pTime = reinterpret_cast<uint64_t*>(buffer);
271 *pTime++ = t;
272
273 Time guarantee_update =
275 *pTime++ = guarantee_update.GetTimeStep();
276
277 auto pData = reinterpret_cast<uint32_t*>(pTime);
278 *pData++ = node;
279 *pData++ = dev;
280 // Serialize the packet
281 p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
282
283 MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
284 bufferSize,
285 MPI_CHAR,
286 nodeSysId,
287 0,
289 (iter->GetRequest()));
290
292}
293
294void
297{
298 NS_LOG_FUNCTION(guarantee_update.GetTimeStep() << bundle);
299
301
302 NullMessageSentBuffer sendBuf;
303 g_pendingTx.push_back(sendBuf);
304 auto iter = g_pendingTx.rbegin(); // Points to the last element
305
306 uint32_t bufferSize = 2 * sizeof(uint64_t) + 2 * sizeof(uint32_t);
307 auto buffer = new uint8_t[bufferSize];
308 iter->SetBuffer(buffer);
309 // Add the time, dest node and dest device
310 auto pTime = reinterpret_cast<uint64_t*>(buffer);
311 *pTime++ = 0;
312 *pTime++ = guarantee_update.GetInteger();
313 auto pData = reinterpret_cast<uint32_t*>(pTime);
314 *pData++ = 0;
315 *pData++ = 0;
316
317 // Find the system id for the destination MPI rank
318 uint32_t nodeSysId = bundle->GetSystemId();
319
320 MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
321 bufferSize,
322 MPI_CHAR,
323 nodeSysId,
324 0,
326 (iter->GetRequest()));
327}
328
329void
336
337void
344
345void
347{
348 NS_LOG_FUNCTION(blocking);
349
351
352 // stop flag set to true when no more messages are found to
353 // process.
354 bool stop = false;
355
356 if (!g_numNeighbors)
357 {
358 // Not communicating with anyone.
359 return;
360 }
361
362 do
363 {
364 int messageReceived = 0;
365 int index = 0;
366 MPI_Status status;
367
368 if (blocking)
369 {
370 MPI_Waitany(g_numNeighbors, g_requests, &index, &status);
371 messageReceived = 1; /* Wait always implies message was received */
372 stop = true;
373 }
374 else
375 {
376 MPI_Testany(g_numNeighbors, g_requests, &index, &messageReceived, &status);
377 }
378
379 if (messageReceived)
380 {
381 int count;
382 MPI_Get_count(&status, MPI_CHAR, &count);
383
384 // Get the meta data first
385 auto pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
386 uint64_t time = *pTime++;
387 uint64_t guaranteeUpdate = *pTime++;
388
389 auto pData = reinterpret_cast<uint32_t*>(pTime);
390 uint32_t node = *pData++;
391 uint32_t dev = *pData++;
392
393 Time rxTime(time);
394
395 // rxtime == 0 means this is a Null Message
396 if (rxTime > Time(0))
397 {
398 count -= sizeof(time) + sizeof(guaranteeUpdate) + sizeof(node) + sizeof(dev);
399
400 Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
401
402 // Find the correct node/device to schedule receive event
403 Ptr<Node> pNode = NodeList::GetNode(node);
404 Ptr<MpiReceiver> pMpiRec = nullptr;
405 uint32_t nDevices = pNode->GetNDevices();
406 for (uint32_t i = 0; i < nDevices; ++i)
407 {
408 Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
409 if (pThisDev->GetIfIndex() == dev)
410 {
411 pMpiRec = pThisDev->GetObject<MpiReceiver>();
412 break;
413 }
414 }
415 NS_ASSERT(pNode && pMpiRec);
416
417 // Schedule the rx event
418 Simulator::ScheduleWithContext(pNode->GetId(),
419 rxTime - Simulator::Now(),
421 pMpiRec,
422 p);
423 }
424
425 // Update guarantee time for both packet receives and Null Messages.
427 NS_ASSERT(bundle);
428
429 bundle->SetGuaranteeTime(Time(guaranteeUpdate));
430
431 // Re-queue the next read
432 MPI_Irecv(g_pRxBuffers[index],
434 MPI_CHAR,
435 status.MPI_SOURCE,
436 0,
438 &g_requests[index]);
439 }
440 else
441 {
442 // if non-blocking and no message received in testany then stop message loop
443 stop = true;
444 }
445 } while (!stop);
446}
447
448void
450{
452
454
455 auto iter = g_pendingTx.begin();
456 while (iter != g_pendingTx.end())
457 {
458 MPI_Status status;
459 int flag = 0;
460 MPI_Test(iter->GetRequest(), &flag, &status);
461 auto current = iter; // Save current for erasing
462 ++iter; // Advance to next
463 if (flag)
464 { // This message is complete
465 g_pendingTx.erase(current);
466 }
467 }
468}
469
470void
472{
473 NS_LOG_FUNCTION(this);
474
475 if (g_enabled)
476 {
477 for (auto iter = g_pendingTx.begin(); iter != g_pendingTx.end(); ++iter)
478 {
479 MPI_Cancel(iter->GetRequest());
480 MPI_Request_free(iter->GetRequest());
481 }
482
483 for (uint32_t i = 0; i < g_numNeighbors; ++i)
484 {
485 MPI_Cancel(&g_requests[i]);
486 MPI_Request_free(&g_requests[i]);
487 }
488
489 for (uint32_t i = 0; i < g_numNeighbors; ++i)
490 {
491 delete[] g_pRxBuffers[i];
492 }
493 delete[] g_pRxBuffers;
494 delete[] g_requests;
495
496 g_pendingTx.clear();
497
499 {
500 MPI_Comm_free(&g_communicator);
501 g_freeCommunicator = false;
502 }
503
504 if (g_mpiInitCalled)
505 {
506 int flag = 0;
507 MPI_Initialized(&flag);
508 if (flag)
509 {
510 MPI_Finalize();
511 }
512 else
513 {
514 NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
515 }
516 }
517
518 g_enabled = false;
519 g_mpiInitCalled = false;
520 }
521 else
522 {
523 NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
524 }
525}
526
527} // namespace ns3
Class to aggregate to a NetDevice if it supports MPI capability.
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
static Ptr< Node > GetNode(uint32_t n)
Definition node-list.cc:240
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void Destroy() override
Deletes storage used by the parallel environment.
static void ReceiveMessagesBlocking()
Blocking message receive.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
bool IsEnabled() override
Returns enabled state of parallel environment.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static TypeId GetTypeId()
Register this type.
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static MPI_Request * g_requests
Pending non-blocking receives.
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
static void TestSendComplete()
Check for completed sends.
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
static bool g_enabled
Has this interface been enabled.
static char ** g_pRxBuffers
Data buffers for non-blocking receives.
static void InitializeSendReceiveBuffers()
Initialize send and receive buffers.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t g_size
Size of the MPI COM_WORLD group.
void Disable() override
Clean up the ns-3 parallel communications interface.
static std::list< NullMessageSentBuffer > g_pendingTx
List of pending non-blocking sends.
static bool g_freeCommunicator
Did we create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
static uint32_t g_numNeighbors
Number of neighbor tasks, tasks that this task shares a link with.
Non-blocking send buffers for Null Message implementation.
MPI_Request m_request
MPI request posted for the send.
static NullMessageSimulatorImpl * GetInstance()
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
A base class which provides memory management and object aggregation.
Definition object.h:78
Smart pointer class similar to boost::intrusive_ptr.
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
Get the bundle corresponding to a remote rank.
static std::size_t Size()
Get the number of ns-3 channels in this bundle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition simulator.h:577
static Time Now()
Return the current simulation virtual time.
Definition simulator.cc:197
Simulation virtual time values and global simulation resolution.
Definition nstime.h:94
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition nstime.h:444
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition nstime.h:434
a unique identifier for an interface.
Definition type-id.h:48
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition type-id.cc:1001
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition assert.h:55
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition log.h:191
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition object-base.h:35
Ptr< T > Create(Ts &&... args)
Create class instances by constructors with varying numbers of arguments and return them by Ptr.
Definition ptr.h:436
ns3::MpiReceiver declaration, provides an interface to aggregate to MPI-compatible NetDevices.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
Declaration of classes ns3::NullMessageSentBuffer and ns3::NullMessageMpiInterface.
Declaration of class ns3::NullMessageSimulatorImpl.
Declaration of class ns3::RemoteChannelBundleManager.
Declaration of class ns3::RemoteChannelBundle.