A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
granted-time-window-mpi-interface.cc
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License version 2 as
4 * published by the Free Software Foundation;
5 *
6 * This program is distributed in the hope that it will be useful,
7 * but WITHOUT ANY WARRANTY; without even the implied warranty of
8 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 * GNU General Public License for more details.
10 *
11 * You should have received a copy of the GNU General Public License
12 * along with this program; if not, write to the Free Software
13 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14 *
15 * Author: George Riley <riley@ece.gatech.edu>
16 */
17
18/**
19 * \file
20 * \ingroup mpi
21 * Implementation of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
22 */
23
24// This object contains static methods that provide an easy interface
25// to the necessary MPI information.
26
28
29#include "mpi-interface.h"
30#include "mpi-receiver.h"
31
32#include "ns3/log.h"
33#include "ns3/net-device.h"
34#include "ns3/node-list.h"
35#include "ns3/node.h"
36#include "ns3/nstime.h"
37#include "ns3/simulator-impl.h"
38#include "ns3/simulator.h"
39
40#include <iomanip>
41#include <iostream>
42#include <list>
43#include <mpi.h>
44
45namespace ns3
46{
47
48NS_LOG_COMPONENT_DEFINE("GrantedTimeWindowMpiInterface");
49
50NS_OBJECT_ENSURE_REGISTERED(GrantedTimeWindowMpiInterface);
51
53{
54 m_buffer = nullptr;
55 m_request = MPI_REQUEST_NULL;
56}
57
59{
60 delete[] m_buffer;
61}
62
63uint8_t*
65{
66 return m_buffer;
67}
68
69void
70SentBuffer::SetBuffer(uint8_t* buffer)
71{
72 m_buffer = buffer;
73}
74
75MPI_Request*
77{
78 return &m_request;
79}
80
88
91MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
93
96{
97 static TypeId tid =
98 TypeId("ns3::GrantedTimeWindowMpiInterface").SetParent<Object>().SetGroupName("Mpi");
99 return tid;
100}
101
102void
104{
105 NS_LOG_FUNCTION(this);
106
107 for (uint32_t i = 0; i < GetSize(); ++i)
108 {
109 delete[] g_pRxBuffers[i];
110 }
111 delete[] g_pRxBuffers;
112 delete[] g_requests;
113
114 g_pendingTx.clear();
115}
116
119{
121 return g_rxCount;
122}
123
126{
128 return g_txCount;
129}
130
133{
135 return g_sid;
136}
137
140{
142 return g_size;
143}
144
145bool
147{
148 return g_enabled;
149}
150
151MPI_Comm
153{
155 return g_communicator;
156}
157
158void
160{
161 NS_LOG_FUNCTION(this << pargc << pargv);
162
163 NS_ASSERT(g_enabled == false);
164
165 // Initialize the MPI interface
166 MPI_Init(pargc, pargv);
167 Enable(MPI_COMM_WORLD);
168 g_mpiInitCalled = true;
169 g_enabled = true;
170}
171
172void
174{
175 NS_LOG_FUNCTION(this);
176
177 NS_ASSERT(g_enabled == false);
178
179 // Standard MPI practice is to duplicate the communicator for
180 // library to use. Library communicates in isolated communication
181 // context.
182 MPI_Comm_dup(communicator, &g_communicator);
183 g_freeCommunicator = true;
184
185 MPI_Barrier(g_communicator);
186
187 int mpiSystemId;
188 int mpiSize;
189 MPI_Comm_rank(g_communicator, &mpiSystemId);
190 MPI_Comm_size(g_communicator, &mpiSize);
191 g_sid = mpiSystemId;
192 g_size = mpiSize;
193
194 g_enabled = true;
195 // Post a non-blocking receive for all peers
196 g_pRxBuffers = new char*[g_size];
197 g_requests = new MPI_Request[g_size];
198 for (uint32_t i = 0; i < GetSize(); ++i)
199 {
200 g_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
201 MPI_Irecv(g_pRxBuffers[i],
203 MPI_CHAR,
204 MPI_ANY_SOURCE,
205 0,
207 &g_requests[i]);
208 }
209}
210
211void
213 const Time& rxTime,
214 uint32_t node,
215 uint32_t dev)
216{
217 NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
218
219 SentBuffer sendBuf;
220 g_pendingTx.push_back(sendBuf);
221 auto i = g_pendingTx.rbegin(); // Points to the last element
222
223 uint32_t serializedSize = p->GetSerializedSize();
224 auto buffer = new uint8_t[serializedSize + 16];
225 i->SetBuffer(buffer);
226 // Add the time, dest node and dest device
227 uint64_t t = rxTime.GetInteger();
228 auto pTime = reinterpret_cast<uint64_t*>(buffer);
229 *pTime++ = t;
230 auto pData = reinterpret_cast<uint32_t*>(pTime);
231 *pData++ = node;
232 *pData++ = dev;
233 // Serialize the packet
234 p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
235
236 // Find the system id for the destination node
237 Ptr<Node> destNode = NodeList::GetNode(node);
238 uint32_t nodeSysId = destNode->GetSystemId();
239
240 MPI_Isend(reinterpret_cast<void*>(i->GetBuffer()),
241 serializedSize + 16,
242 MPI_CHAR,
243 nodeSysId,
244 0,
246 (i->GetRequest()));
247 g_txCount++;
248}
249
250void
252{
254
255 // Poll the non-block reads to see if data arrived
256 while (true)
257 {
258 int flag = 0;
259 int index = 0;
260 MPI_Status status;
261
262 MPI_Testany(MpiInterface::GetSize(), g_requests, &index, &flag, &status);
263 if (!flag)
264 {
265 break; // No more messages
266 }
267 int count;
268 MPI_Get_count(&status, MPI_CHAR, &count);
269 g_rxCount++; // Count this receive
270
271 // Get the meta data first
272 auto pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
273 uint64_t time = *pTime++;
274 auto pData = reinterpret_cast<uint32_t*>(pTime);
275 uint32_t node = *pData++;
276 uint32_t dev = *pData++;
277
278 Time rxTime(time);
279
280 count -= sizeof(time) + sizeof(node) + sizeof(dev);
281
282 Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
283
284 // Find the correct node/device to schedule receive event
285 Ptr<Node> pNode = NodeList::GetNode(node);
286 Ptr<MpiReceiver> pMpiRec = nullptr;
287 uint32_t nDevices = pNode->GetNDevices();
288 for (uint32_t i = 0; i < nDevices; ++i)
289 {
290 Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
291 if (pThisDev->GetIfIndex() == dev)
292 {
293 pMpiRec = pThisDev->GetObject<MpiReceiver>();
294 break;
295 }
296 }
297
298 NS_ASSERT(pNode && pMpiRec);
299
300 // Schedule the rx event
301 Simulator::ScheduleWithContext(pNode->GetId(),
302 rxTime - Simulator::Now(),
304 pMpiRec,
305 p);
306
307 // Re-queue the next read
308 MPI_Irecv(g_pRxBuffers[index],
310 MPI_CHAR,
311 MPI_ANY_SOURCE,
312 0,
314 &g_requests[index]);
315 }
316}
317
318void
320{
322
323 auto i = g_pendingTx.begin();
324 while (i != g_pendingTx.end())
325 {
326 MPI_Status status;
327 int flag = 0;
328 MPI_Test(i->GetRequest(), &flag, &status);
329 auto current = i; // Save current for erasing
330 i++; // Advance to next
331 if (flag)
332 { // This message is complete
333 g_pendingTx.erase(current);
334 }
335 }
336}
337
338void
340{
342
344 {
345 MPI_Comm_free(&g_communicator);
346 g_freeCommunicator = false;
347 }
348
349 // ns-3 should MPI finalize only if ns-3 was used to initialize
350 if (g_mpiInitCalled)
351 {
352 int flag = 0;
353 MPI_Initialized(&flag);
354 if (flag)
355 {
356 MPI_Finalize();
357 }
358 else
359 {
360 NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
361 }
362 g_mpiInitCalled = false;
363 }
364
365 g_enabled = false;
366}
367
368} // namespace ns3
static void ReceiveMessages()
Check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static bool g_freeCommunicator
Did ns-3 create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
void Disable() override
Clean up the ns-3 parallel communications interface.
static void TestSendComplete()
Check for completed sends.
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
static uint32_t g_size
Size of the MPI COM_WORLD group.
static bool g_enabled
Has this interface been enabled.
static std::list< SentBuffer > g_pendingTx
List of pending non-blocking sends.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
bool IsEnabled() override
Returns enabled state of parallel environment.
static MPI_Request * g_requests
Pending non-blocking receives.
static uint32_t g_rxCount
Total packets received.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static char ** g_pRxBuffers
Data buffers for non-blocking reads.
void Destroy() override
Deletes storage used by the parallel environment.
static uint32_t g_txCount
Total packets sent.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static TypeId GetTypeId()
Register this type.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t GetSize()
Get the number of ranks used by ns-3.
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:48
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:51
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:251
A base class which provides memory management and object aggregation.
Definition: object.h:89
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:77
Tracks non-blocking sends.
MPI_Request m_request
The MPI request handle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:588
static Time Now()
Return the current simulation virtual time.
Definition: simulator.cc:208
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:455
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:445
a unique identifier for an interface.
Definition: type-id.h:59
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:932
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:179
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
Declaration of class ns3::MpiInterface.
ns3::MpiReceiver declaration, provides an interface to aggregate to MPI-compatible NetDevices.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation