A Discrete-Event Network Simulator
API
granted-time-window-mpi-interface.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  */
18 
25 // This object contains static methods that provide an easy interface
26 // to the necessary MPI information.
27 
28 #include <iostream>
29 #include <iomanip>
30 #include <list>
31 
33 #include "mpi-receiver.h"
34 #include "mpi-interface.h"
35 
36 #include "ns3/node.h"
37 #include "ns3/node-list.h"
38 #include "ns3/net-device.h"
39 #include "ns3/simulator.h"
40 #include "ns3/simulator-impl.h"
41 #include "ns3/nstime.h"
42 #include "ns3/log.h"
43 
44 #include <mpi.h>
45 
46 namespace ns3 {
47 
48 NS_LOG_COMPONENT_DEFINE ("GrantedTimeWindowMpiInterface");
49 
50 NS_OBJECT_ENSURE_REGISTERED (GrantedTimeWindowMpiInterface);
51 
53 {
54  m_buffer = 0;
55  m_request = 0;
56 }
57 
59 {
60  delete [] m_buffer;
61 }
62 
63 uint8_t*
65 {
66  return m_buffer;
67 }
68 
69 void
70 SentBuffer::SetBuffer (uint8_t* buffer)
71 {
72  m_buffer = buffer;
73 }
74 
75 MPI_Request*
77 {
78  return &m_request;
79 }
80 
87 std::list<SentBuffer> GrantedTimeWindowMpiInterface::g_pendingTx;
88 
91 MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
93 
94 TypeId
96 {
97  static TypeId tid = TypeId ("ns3::GrantedTimeWindowMpiInterface")
98  .SetParent<Object> ()
99  .SetGroupName ("Mpi")
100  ;
101  return tid;
102 }
103 
104 void
106 {
107  NS_LOG_FUNCTION (this);
108 
109  for (uint32_t i = 0; i < GetSize (); ++i)
110  {
111  delete [] g_pRxBuffers[i];
112  }
113  delete [] g_pRxBuffers;
114  delete [] g_requests;
115 
116  g_pendingTx.clear ();
117 }
118 
119 uint32_t
121 {
123  return g_rxCount;
124 }
125 
126 uint32_t
128 {
130  return g_txCount;
131 }
132 
133 uint32_t
135 {
137  return g_sid;
138 }
139 
140 uint32_t
142 {
144  return g_size;
145 }
146 
147 bool
149 {
150  return g_enabled;
151 }
152 
153 MPI_Comm
155 {
157  return g_communicator;
158 }
159 
160 void
161 GrantedTimeWindowMpiInterface::Enable (int* pargc, char*** pargv)
162 {
163  NS_LOG_FUNCTION (this << pargc << pargv);
164 
165  NS_ASSERT (g_enabled == false);
166 
167  // Initialize the MPI interface
168  MPI_Init (pargc, pargv);
169  Enable (MPI_COMM_WORLD);
170  g_mpiInitCalled = true;
171  g_enabled = true;
172 }
173 
174 void
176 {
177  NS_LOG_FUNCTION (this);
178 
179  NS_ASSERT (g_enabled == false);
180 
181  // Standard MPI practice is to duplicate the communicator for
182  // library to use. Library communicates in isolated communication
183  // context.
184  MPI_Comm_dup (communicator, &g_communicator);
185  g_freeCommunicator = true;
186 
187  MPI_Barrier (g_communicator);
188 
189  int mpiSystemId;
190  int mpiSize;
191  MPI_Comm_rank (g_communicator, &mpiSystemId);
192  MPI_Comm_size (g_communicator, &mpiSize);
193  g_sid = mpiSystemId;
194  g_size = mpiSize;
195 
196  g_enabled = true;
197  // Post a non-blocking receive for all peers
198  g_pRxBuffers = new char*[g_size];
199  g_requests = new MPI_Request[g_size];
200  for (uint32_t i = 0; i < GetSize (); ++i)
201  {
202  g_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
203  MPI_Irecv (g_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
205  }
206 }
207 
208 void
209 GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
210 {
211  NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
212 
213  SentBuffer sendBuf;
214  g_pendingTx.push_back (sendBuf);
215  std::list<SentBuffer>::reverse_iterator i = g_pendingTx.rbegin (); // Points to the last element
216 
217  uint32_t serializedSize = p->GetSerializedSize ();
218  uint8_t* buffer = new uint8_t[serializedSize + 16];
219  i->SetBuffer (buffer);
220  // Add the time, dest node and dest device
221  uint64_t t = rxTime.GetInteger ();
222  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
223  *pTime++ = t;
224  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
225  *pData++ = node;
226  *pData++ = dev;
227  // Serialize the packet
228  p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
229 
230  // Find the system id for the destination node
231  Ptr<Node> destNode = NodeList::GetNode (node);
232  uint32_t nodeSysId = destNode->GetSystemId ();
233 
234  MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId,
235  0, g_communicator, (i->GetRequest ()));
236  g_txCount++;
237 }
238 
239 void
241 {
243 
244  // Poll the non-block reads to see if data arrived
245  while (true)
246  {
247  int flag = 0;
248  int index = 0;
249  MPI_Status status;
250 
251  MPI_Testany (MpiInterface::GetSize (), g_requests, &index, &flag, &status);
252  if (!flag)
253  {
254  break; // No more messages
255  }
256  int count;
257  MPI_Get_count (&status, MPI_CHAR, &count);
258  g_rxCount++; // Count this receive
259 
260  // Get the meta data first
261  uint64_t* pTime = reinterpret_cast<uint64_t *> (g_pRxBuffers[index]);
262  uint64_t time = *pTime++;
263  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
264  uint32_t node = *pData++;
265  uint32_t dev = *pData++;
266 
267  Time rxTime (time);
268 
269  count -= sizeof (time) + sizeof (node) + sizeof (dev);
270 
271  Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
272 
273  // Find the correct node/device to schedule receive event
274  Ptr<Node> pNode = NodeList::GetNode (node);
275  Ptr<MpiReceiver> pMpiRec = 0;
276  uint32_t nDevices = pNode->GetNDevices ();
277  for (uint32_t i = 0; i < nDevices; ++i)
278  {
279  Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
280  if (pThisDev->GetIfIndex () == dev)
281  {
282  pMpiRec = pThisDev->GetObject<MpiReceiver> ();
283  break;
284  }
285  }
286 
287  NS_ASSERT (pNode && pMpiRec);
288 
289  // Schedule the rx event
290  Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
291  &MpiReceiver::Receive, pMpiRec, p);
292 
293  // Re-queue the next read
294  MPI_Irecv (g_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
295  g_communicator, &g_requests[index]);
296  }
297 }
298 
299 void
301 {
303 
304  std::list<SentBuffer>::iterator i = g_pendingTx.begin ();
305  while (i != g_pendingTx.end ())
306  {
307  MPI_Status status;
308  int flag = 0;
309  MPI_Test (i->GetRequest (), &flag, &status);
310  std::list<SentBuffer>::iterator current = i; // Save current for erasing
311  i++; // Advance to next
312  if (flag)
313  { // This message is complete
314  g_pendingTx.erase (current);
315  }
316  }
317 }
318 
319 void
321 {
323 
324  if (g_freeCommunicator)
325  {
326  MPI_Comm_free (&g_communicator);
327  g_freeCommunicator = false;
328  }
329 
330  // ns-3 should MPI finalize only if ns-3 was used to initialize
331  if (g_mpiInitCalled)
332  {
333  int flag = 0;
334  MPI_Initialized (&flag);
335  if (flag)
336  {
337  MPI_Finalize ();
338  }
339  else
340  {
341  NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
342  }
343  g_mpiInitCalled = false;
344  }
345 
346  g_enabled = false;
347 }
348 
349 
350 } // namespace ns3
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t g_rxCount
Total packets received.
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:103
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
uint32_t GetId(void) const
Definition: node.cc:109
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:45
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:424
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:241
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:144
uint32_t GetSerializedSize(void) const
Returns number of bytes required for packet serialization.
Definition: packet.cc:585
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static uint32_t g_txCount
Total packets sent.
virtual MPI_Comm GetCommunicator()
Return the communicator used to run ns-3.
static TypeId GetTypeId(void)
Register this type.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:67
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:205
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:165
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
virtual void Enable(int *pargc, char ***pargv)
Setup the parallel communication interface.
virtual uint32_t GetSize()
Get the number of ranks used by ns-3.
static void TestSendComplete()
Check for completed sends.
virtual uint32_t GetSystemId()
Get the id number of this rank.
Declaration of class ns3::MpiInterface.
ns3::MpiReciver declaration, provides an interface to aggregate to MPI-compatible NetDevices...
uint8_t * m_buffer
The buffer.
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:51
MPI_Request m_request
The MPI request handle.
Tracks non-blocking sends.
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:47
virtual bool IsEnabled()
Returns enabled state of parallel environment.
static std::list< SentBuffer > g_pendingTx
List of pending non-blocking sends.
static void ScheduleWithContext(uint32_t context, Time const &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:572
Every class exported by the ns3 library is enclosed in the ns3 namespace.
uint32_t GetSystemId(void) const
Definition: node.cc:123
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
static Time Now(void)
Return the current simulation virtual time.
Definition: simulator.cc:195
static bool g_freeCommunicator
Did ns-3 create the communicator? Have to free it.
virtual void Disable()
Clean up the ns-3 parallel communications interface.
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
Send a packet to a remote node.
static char ** g_pRxBuffers
Data buffers for non-blocking reads.
static void ReceiveMessages()
Check for received messages complete.
static bool g_enabled
Has this interface been enabled.
const uint32_t MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
static uint32_t g_size
Size of the MPI COM_WORLD group.
A base class which provides memory management and object aggregation.
Definition: object.h:87
static MPI_Request * g_requests
Pending non-blocking receives.
a unique identifier for an interface.
Definition: type-id.h:58
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:923
static uint32_t GetSize()
Get the number of ranks used by ns-3.
uint32_t GetNDevices(void) const
Definition: node.cc:152
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:638
virtual void Destroy()
Deletes storage used by the parallel environment.
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:416