A Discrete-Event Network Simulator
API
null-message-mpi-interface.cc
Go to the documentation of this file.
1/*
2 * Copyright 2013. Lawrence Livermore National Security, LLC.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation;
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 *
17 * Author: Steven Smith <smith84@llnl.gov>
18 *
19 */
20
28
32
33#include "ns3/log.h"
34#include "ns3/mpi-receiver.h"
35#include "ns3/net-device.h"
36#include "ns3/node-list.h"
37#include "ns3/node.h"
38#include "ns3/nstime.h"
39#include "ns3/simulator.h"
40
41#include <iomanip>
42#include <iostream>
43#include <list>
44#include <mpi.h>
45
46namespace ns3
47{
48
49NS_LOG_COMPONENT_DEFINE("NullMessageMpiInterface");
50
51NS_OBJECT_ENSURE_REGISTERED(NullMessageMpiInterface);
52
61{
62 public:
65
69 uint8_t* GetBuffer();
73 void SetBuffer(uint8_t* buffer);
77 MPI_Request* GetRequest();
78
79 private:
83 uint8_t* m_buffer;
84
88 MPI_Request m_request;
89};
90
96
98{
99 m_buffer = nullptr;
100 m_request = nullptr;
101}
102
104{
105 delete[] m_buffer;
106}
107
108uint8_t*
110{
111 return m_buffer;
112}
113
114void
116{
117 m_buffer = buffer;
118}
119
120MPI_Request*
122{
123 return &m_request;
124}
125
131
132std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
133
134MPI_Comm NullMessageMpiInterface::g_communicator = MPI_COMM_WORLD;
138
139TypeId
141{
142 static TypeId tid =
143 TypeId("ns3::NullMessageMpiInterface").SetParent<Object>().SetGroupName("Mpi");
144 return tid;
145}
146
148{
149 NS_LOG_FUNCTION(this);
150}
151
153{
154 NS_LOG_FUNCTION(this);
155}
156
157void
159{
160 NS_LOG_FUNCTION(this);
161}
162
165{
167 return g_sid;
168}
169
172{
174 return g_size;
175}
176
177MPI_Comm
179{
181 return g_communicator;
182}
183
184bool
186{
187 return g_enabled;
188}
189
190void
191NullMessageMpiInterface::Enable(int* pargc, char*** pargv)
192{
193 NS_LOG_FUNCTION(this << *pargc);
194
195 NS_ASSERT(g_enabled == false);
196
197 // Initialize the MPI interface
198 MPI_Init(pargc, pargv);
199 Enable(MPI_COMM_WORLD);
200 g_mpiInitCalled = true;
201}
202
203void
205{
206 NS_LOG_FUNCTION(this);
207
208 NS_ASSERT(g_enabled == false);
209
210 // Standard MPI practice is to duplicate the communicator for
211 // library to use. Library communicates in isolated communication
212 // context.
213 MPI_Comm_dup(communicator, &g_communicator);
214 g_freeCommunicator = true;
215
216 // SystemId and Size are unit32_t in interface but MPI uses int so convert.
217 int mpiSystemId;
218 int mpiSize;
219 MPI_Comm_rank(g_communicator, &mpiSystemId);
220 MPI_Comm_size(g_communicator, &mpiSize);
221
222 g_sid = mpiSystemId;
223 g_size = mpiSize;
224
225 g_enabled = true;
226
227 MPI_Barrier(g_communicator);
228}
229
230void
232{
235
237
238 // Post a non-blocking receive for all peers
239 g_requests = new MPI_Request[g_numNeighbors];
240 g_pRxBuffers = new char*[g_numNeighbors];
241 int index = 0;
242 for (uint32_t rank = 0; rank < g_size; ++rank)
243 {
245 if (bundle)
246 {
248 MPI_Irecv(g_pRxBuffers[index],
250 MPI_CHAR,
251 rank,
252 0,
254 &g_requests[index]);
255 ++index;
256 }
257 }
258}
259
260void
262{
263 NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
264
266
267 // Find the system id for the destination node
268 Ptr<Node> destNode = NodeList::GetNode(node);
269 uint32_t nodeSysId = destNode->GetSystemId();
270
271 NullMessageSentBuffer sendBuf;
272 g_pendingTx.push_back(sendBuf);
273 std::list<NullMessageSentBuffer>::reverse_iterator iter =
274 g_pendingTx.rbegin(); // Points to the last element
275
276 uint32_t serializedSize = p->GetSerializedSize();
277 uint32_t bufferSize = serializedSize + (2 * sizeof(uint64_t)) + (2 * sizeof(uint32_t));
278 uint8_t* buffer = new uint8_t[bufferSize];
279 iter->SetBuffer(buffer);
280 // Add the time, dest node and dest device
281 uint64_t t = rxTime.GetInteger();
282 uint64_t* pTime = reinterpret_cast<uint64_t*>(buffer);
283 *pTime++ = t;
284
285 Time guarantee_update =
287 *pTime++ = guarantee_update.GetTimeStep();
288
289 uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
290 *pData++ = node;
291 *pData++ = dev;
292 // Serialize the packet
293 p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
294
295 MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
296 bufferSize,
297 MPI_CHAR,
298 nodeSysId,
299 0,
301 (iter->GetRequest()));
302
304}
305
306void
309{
310 NS_LOG_FUNCTION(guarantee_update.GetTimeStep() << bundle);
311
313
314 NullMessageSentBuffer sendBuf;
315 g_pendingTx.push_back(sendBuf);
316 std::list<NullMessageSentBuffer>::reverse_iterator iter =
317 g_pendingTx.rbegin(); // Points to the last element
318
319 uint32_t bufferSize = 2 * sizeof(uint64_t) + 2 * sizeof(uint32_t);
320 uint8_t* buffer = new uint8_t[bufferSize];
321 iter->SetBuffer(buffer);
322 // Add the time, dest node and dest device
323 uint64_t* pTime = reinterpret_cast<uint64_t*>(buffer);
324 *pTime++ = 0;
325 *pTime++ = guarantee_update.GetInteger();
326 uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
327 *pData++ = 0;
328 *pData++ = 0;
329
330 // Find the system id for the destination MPI rank
331 uint32_t nodeSysId = bundle->GetSystemId();
332
333 MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
334 bufferSize,
335 MPI_CHAR,
336 nodeSysId,
337 0,
339 (iter->GetRequest()));
340}
341
342void
344{
346
347 ReceiveMessages(true);
348}
349
350void
352{
354
355 ReceiveMessages(false);
356}
357
358void
360{
361 NS_LOG_FUNCTION(blocking);
362
364
365 // stop flag set to true when no more messages are found to
366 // process.
367 bool stop = false;
368
369 if (!g_numNeighbors)
370 {
371 // Not communicating with anyone.
372 return;
373 }
374
375 do
376 {
377 int messageReceived = 0;
378 int index = 0;
379 MPI_Status status;
380
381 if (blocking)
382 {
383 MPI_Waitany(g_numNeighbors, g_requests, &index, &status);
384 messageReceived = 1; /* Wait always implies message was received */
385 stop = true;
386 }
387 else
388 {
389 MPI_Testany(g_numNeighbors, g_requests, &index, &messageReceived, &status);
390 }
391
392 if (messageReceived)
393 {
394 int count;
395 MPI_Get_count(&status, MPI_CHAR, &count);
396
397 // Get the meta data first
398 uint64_t* pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
399 uint64_t time = *pTime++;
400 uint64_t guaranteeUpdate = *pTime++;
401
402 uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
403 uint32_t node = *pData++;
404 uint32_t dev = *pData++;
405
406 Time rxTime(time);
407
408 // rxtime == 0 means this is a Null Message
409 if (rxTime > Time(0))
410 {
411 count -= sizeof(time) + sizeof(guaranteeUpdate) + sizeof(node) + sizeof(dev);
412
413 Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
414
415 // Find the correct node/device to schedule receive event
416 Ptr<Node> pNode = NodeList::GetNode(node);
417 Ptr<MpiReceiver> pMpiRec = nullptr;
418 uint32_t nDevices = pNode->GetNDevices();
419 for (uint32_t i = 0; i < nDevices; ++i)
420 {
421 Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
422 if (pThisDev->GetIfIndex() == dev)
423 {
424 pMpiRec = pThisDev->GetObject<MpiReceiver>();
425 break;
426 }
427 }
428 NS_ASSERT(pNode && pMpiRec);
429
430 // Schedule the rx event
432 rxTime - Simulator::Now(),
434 pMpiRec,
435 p);
436 }
437
438 // Update guarantee time for both packet receives and Null Messages.
440 NS_ASSERT(bundle);
441
442 bundle->SetGuaranteeTime(Time(guaranteeUpdate));
443
444 // Re-queue the next read
445 MPI_Irecv(g_pRxBuffers[index],
447 MPI_CHAR,
448 status.MPI_SOURCE,
449 0,
451 &g_requests[index]);
452 }
453 else
454 {
455 // if non-blocking and no message received in testany then stop message loop
456 stop = true;
457 }
458 } while (!stop);
459}
460
461void
463{
465
467
468 std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin();
469 while (iter != g_pendingTx.end())
470 {
471 MPI_Status status;
472 int flag = 0;
473 MPI_Test(iter->GetRequest(), &flag, &status);
474 std::list<NullMessageSentBuffer>::iterator current = iter; // Save current for erasing
475 ++iter; // Advance to next
476 if (flag)
477 { // This message is complete
478 g_pendingTx.erase(current);
479 }
480 }
481}
482
483void
485{
486 NS_LOG_FUNCTION(this);
487
488 if (g_enabled)
489 {
490 for (std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin();
491 iter != g_pendingTx.end();
492 ++iter)
493 {
494 MPI_Cancel(iter->GetRequest());
495 MPI_Request_free(iter->GetRequest());
496 }
497
498 for (uint32_t i = 0; i < g_numNeighbors; ++i)
499 {
500 MPI_Cancel(&g_requests[i]);
501 MPI_Request_free(&g_requests[i]);
502 }
503
504 for (uint32_t i = 0; i < g_numNeighbors; ++i)
505 {
506 delete[] g_pRxBuffers[i];
507 }
508 delete[] g_pRxBuffers;
509 delete[] g_requests;
510
511 g_pendingTx.clear();
512
514 {
515 MPI_Comm_free(&g_communicator);
516 g_freeCommunicator = false;
517 }
518
519 if (g_mpiInitCalled)
520 {
521 int flag = 0;
522 MPI_Initialized(&flag);
523 if (flag)
524 {
525 MPI_Finalize();
526 }
527 else
528 {
529 NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
530 }
531 }
532
533 g_enabled = false;
534 g_mpiInitCalled = false;
535 }
536 else
537 {
538 NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
539 }
540}
541
542} // namespace ns3
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:48
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:51
uint32_t GetSystemId() const
Definition: node.cc:131
uint32_t GetNDevices() const
Definition: node.cc:162
uint32_t GetId() const
Definition: node.cc:117
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:152
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:251
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void Destroy() override
Deletes storage used by the parallel environment.
static void ReceiveMessagesBlocking()
Blocking message receive.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
bool IsEnabled() override
Returns enabled state of parallel environment.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static TypeId GetTypeId()
Register this type.
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static MPI_Request * g_requests
Pending non-blocking receives.
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
static void TestSendComplete()
Check for completed sends.
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
static bool g_enabled
Has this interface been enabled.
static char ** g_pRxBuffers
Data buffers for non-blocking receives.
static void InitializeSendReceiveBuffers()
Initialize send and receive buffers.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t g_size
Size of the MPI COM_WORLD group.
void Disable() override
Clean up the ns-3 parallel communications interface.
static std::list< NullMessageSentBuffer > g_pendingTx
List of pending non-blocking sends.
static bool g_freeCommunicator
Did we create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
static uint32_t g_numNeighbors
Number of neighbor tasks, tasks that this task shares a link with.
Non-blocking send buffers for Null Message implementation.
MPI_Request m_request
MPI request posted for the send.
uint8_t * m_buffer
Buffer for send.
static NullMessageSimulatorImpl * GetInstance()
Time CalculateGuaranteeTime(uint32_t systemId)
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
A base class which provides memory management and object aggregation.
Definition: object.h:89
uint32_t GetSerializedSize() const
Returns number of bytes required for packet serialization.
Definition: packet.cc:610
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:663
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:78
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
Get the bundle corresponding to a remote rank.
static std::size_t Size()
Get the number of ns-3 channels in this bundle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:587
static Time Now()
Return the current simulation virtual time.
Definition: simulator.cc:199
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:454
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:444
a unique identifier for an interface.
Definition: type-id.h:60
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:935
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:160
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
void(* Time)(Time oldValue, Time newValue)
TracedValue callback signature for Time.
Definition: nstime.h:848
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
Declaration of classes ns3::NullMessageSentBuffer and ns3::NullMessageMpiInterface.
Declaration of class ns3::NullMessageSimulatorImpl.
Declaration of class ns3::RemoteChannelBundleManager.
Declaration of class ns3::RemoteChannelBundle.