A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
granted-time-window-mpi-interface.cc
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License version 2 as
4 * published by the Free Software Foundation;
5 *
6 * This program is distributed in the hope that it will be useful,
7 * but WITHOUT ANY WARRANTY; without even the implied warranty of
8 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 * GNU General Public License for more details.
10 *
11 * You should have received a copy of the GNU General Public License
12 * along with this program; if not, write to the Free Software
13 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14 *
15 * Author: George Riley <riley@ece.gatech.edu>
16 */
17
24// This object contains static methods that provide an easy interface
25// to the necessary MPI information.
26
28
29#include "mpi-interface.h"
30#include "mpi-receiver.h"
31
32#include "ns3/log.h"
33#include "ns3/net-device.h"
34#include "ns3/node-list.h"
35#include "ns3/node.h"
36#include "ns3/nstime.h"
37#include "ns3/simulator-impl.h"
38#include "ns3/simulator.h"
39
40#include <iomanip>
41#include <iostream>
42#include <list>
43#include <mpi.h>
44
45namespace ns3
46{
47
48NS_LOG_COMPONENT_DEFINE("GrantedTimeWindowMpiInterface");
49
50NS_OBJECT_ENSURE_REGISTERED(GrantedTimeWindowMpiInterface);
51
53{
54 m_buffer = nullptr;
55 m_request = nullptr;
56}
57
59{
60 delete[] m_buffer;
61}
62
63uint8_t*
65{
66 return m_buffer;
67}
68
69void
70SentBuffer::SetBuffer(uint8_t* buffer)
71{
72 m_buffer = buffer;
73}
74
75MPI_Request*
77{
78 return &m_request;
79}
80
88
91MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
93;
94
97{
98 static TypeId tid =
99 TypeId("ns3::GrantedTimeWindowMpiInterface").SetParent<Object>().SetGroupName("Mpi");
100 return tid;
101}
102
103void
105{
106 NS_LOG_FUNCTION(this);
107
108 for (uint32_t i = 0; i < GetSize(); ++i)
109 {
110 delete[] g_pRxBuffers[i];
111 }
112 delete[] g_pRxBuffers;
113 delete[] g_requests;
114
115 g_pendingTx.clear();
116}
117
120{
122 return g_rxCount;
123}
124
127{
129 return g_txCount;
130}
131
134{
136 return g_sid;
137}
138
141{
143 return g_size;
144}
145
146bool
148{
149 return g_enabled;
150}
151
152MPI_Comm
154{
156 return g_communicator;
157}
158
159void
161{
162 NS_LOG_FUNCTION(this << pargc << pargv);
163
164 NS_ASSERT(g_enabled == false);
165
166 // Initialize the MPI interface
167 MPI_Init(pargc, pargv);
168 Enable(MPI_COMM_WORLD);
169 g_mpiInitCalled = true;
170 g_enabled = true;
171}
172
173void
175{
176 NS_LOG_FUNCTION(this);
177
178 NS_ASSERT(g_enabled == false);
179
180 // Standard MPI practice is to duplicate the communicator for
181 // library to use. Library communicates in isolated communication
182 // context.
183 MPI_Comm_dup(communicator, &g_communicator);
184 g_freeCommunicator = true;
185
186 MPI_Barrier(g_communicator);
187
188 int mpiSystemId;
189 int mpiSize;
190 MPI_Comm_rank(g_communicator, &mpiSystemId);
191 MPI_Comm_size(g_communicator, &mpiSize);
192 g_sid = mpiSystemId;
193 g_size = mpiSize;
194
195 g_enabled = true;
196 // Post a non-blocking receive for all peers
197 g_pRxBuffers = new char*[g_size];
198 g_requests = new MPI_Request[g_size];
199 for (uint32_t i = 0; i < GetSize(); ++i)
200 {
201 g_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
202 MPI_Irecv(g_pRxBuffers[i],
204 MPI_CHAR,
205 MPI_ANY_SOURCE,
206 0,
208 &g_requests[i]);
209 }
210}
211
212void
214 const Time& rxTime,
215 uint32_t node,
216 uint32_t dev)
217{
218 NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
219
220 SentBuffer sendBuf;
221 g_pendingTx.push_back(sendBuf);
222 std::list<SentBuffer>::reverse_iterator i = g_pendingTx.rbegin(); // Points to the last element
223
224 uint32_t serializedSize = p->GetSerializedSize();
225 uint8_t* buffer = new uint8_t[serializedSize + 16];
226 i->SetBuffer(buffer);
227 // Add the time, dest node and dest device
228 uint64_t t = rxTime.GetInteger();
229 uint64_t* pTime = reinterpret_cast<uint64_t*>(buffer);
230 *pTime++ = t;
231 uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
232 *pData++ = node;
233 *pData++ = dev;
234 // Serialize the packet
235 p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
236
237 // Find the system id for the destination node
238 Ptr<Node> destNode = NodeList::GetNode(node);
239 uint32_t nodeSysId = destNode->GetSystemId();
240
241 MPI_Isend(reinterpret_cast<void*>(i->GetBuffer()),
242 serializedSize + 16,
243 MPI_CHAR,
244 nodeSysId,
245 0,
247 (i->GetRequest()));
248 g_txCount++;
249}
250
251void
253{
255
256 // Poll the non-block reads to see if data arrived
257 while (true)
258 {
259 int flag = 0;
260 int index = 0;
261 MPI_Status status;
262
263 MPI_Testany(MpiInterface::GetSize(), g_requests, &index, &flag, &status);
264 if (!flag)
265 {
266 break; // No more messages
267 }
268 int count;
269 MPI_Get_count(&status, MPI_CHAR, &count);
270 g_rxCount++; // Count this receive
271
272 // Get the meta data first
273 uint64_t* pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
274 uint64_t time = *pTime++;
275 uint32_t* pData = reinterpret_cast<uint32_t*>(pTime);
276 uint32_t node = *pData++;
277 uint32_t dev = *pData++;
278
279 Time rxTime(time);
280
281 count -= sizeof(time) + sizeof(node) + sizeof(dev);
282
283 Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
284
285 // Find the correct node/device to schedule receive event
286 Ptr<Node> pNode = NodeList::GetNode(node);
287 Ptr<MpiReceiver> pMpiRec = nullptr;
288 uint32_t nDevices = pNode->GetNDevices();
289 for (uint32_t i = 0; i < nDevices; ++i)
290 {
291 Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
292 if (pThisDev->GetIfIndex() == dev)
293 {
294 pMpiRec = pThisDev->GetObject<MpiReceiver>();
295 break;
296 }
297 }
298
299 NS_ASSERT(pNode && pMpiRec);
300
301 // Schedule the rx event
302 Simulator::ScheduleWithContext(pNode->GetId(),
303 rxTime - Simulator::Now(),
305 pMpiRec,
306 p);
307
308 // Re-queue the next read
309 MPI_Irecv(g_pRxBuffers[index],
311 MPI_CHAR,
312 MPI_ANY_SOURCE,
313 0,
315 &g_requests[index]);
316 }
317}
318
319void
321{
323
324 std::list<SentBuffer>::iterator i = g_pendingTx.begin();
325 while (i != g_pendingTx.end())
326 {
327 MPI_Status status;
328 int flag = 0;
329 MPI_Test(i->GetRequest(), &flag, &status);
330 std::list<SentBuffer>::iterator current = i; // Save current for erasing
331 i++; // Advance to next
332 if (flag)
333 { // This message is complete
334 g_pendingTx.erase(current);
335 }
336 }
337}
338
339void
341{
343
345 {
346 MPI_Comm_free(&g_communicator);
347 g_freeCommunicator = false;
348 }
349
350 // ns-3 should MPI finalize only if ns-3 was used to initialize
351 if (g_mpiInitCalled)
352 {
353 int flag = 0;
354 MPI_Initialized(&flag);
355 if (flag)
356 {
357 MPI_Finalize();
358 }
359 else
360 {
361 NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
362 }
363 g_mpiInitCalled = false;
364 }
365
366 g_enabled = false;
367}
368
369} // namespace ns3
static void ReceiveMessages()
Check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static bool g_freeCommunicator
Did ns-3 create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
void Disable() override
Clean up the ns-3 parallel communications interface.
static void TestSendComplete()
Check for completed sends.
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
static uint32_t g_size
Size of the MPI COM_WORLD group.
static bool g_enabled
Has this interface been enabled.
static std::list< SentBuffer > g_pendingTx
List of pending non-blocking sends.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
bool IsEnabled() override
Returns enabled state of parallel environment.
static MPI_Request * g_requests
Pending non-blocking receives.
static uint32_t g_rxCount
Total packets received.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static char ** g_pRxBuffers
Data buffers for non-blocking reads.
void Destroy() override
Deletes storage used by the parallel environment.
static uint32_t g_txCount
Total packets sent.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static TypeId GetTypeId()
Register this type.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t GetSize()
Get the number of ranks used by ns-3.
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:48
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:51
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:251
A base class which provides memory management and object aggregation.
Definition: object.h:89
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:78
Tracks non-blocking sends.
MPI_Request m_request
The MPI request handle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:587
static Time Now()
Return the current simulation virtual time.
Definition: simulator.cc:199
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:454
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:444
a unique identifier for an interface.
Definition: type-id.h:59
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:936
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:179
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
Declaration of class ns3::MpiInterface.
ns3::MpiReceiver declaration, provides an interface to aggregate to MPI-compatible NetDevices.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation