A Discrete-Event Network Simulator
API
granted-time-window-mpi-interface.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
20 // This object contains static methods that provide an easy interface
21 // to the necessary MPI information.
22 
23 #include <iostream>
24 #include <iomanip>
25 #include <list>
26 
28 #include "mpi-receiver.h"
29 #include "mpi-interface.h"
30 
31 #include "ns3/node.h"
32 #include "ns3/node-list.h"
33 #include "ns3/net-device.h"
34 #include "ns3/simulator.h"
35 #include "ns3/simulator-impl.h"
36 #include "ns3/nstime.h"
37 #include "ns3/log.h"
38 
39 #ifdef NS3_MPI
40 #include <mpi.h>
41 #endif
42 
43 namespace ns3 {
44 
45 NS_LOG_COMPONENT_DEFINE ("GrantedTimeWindowMpiInterface");
46 
48 {
49  m_buffer = 0;
50  m_request = 0;
51 }
52 
54 {
55  delete [] m_buffer;
56 }
57 
58 uint8_t*
60 {
61  return m_buffer;
62 }
63 
64 void
65 SentBuffer::SetBuffer (uint8_t* buffer)
66 {
67  m_buffer = buffer;
68 }
69 
70 #ifdef NS3_MPI
73 {
74  return &m_request;
75 }
76 #endif
77 
84 std::list<SentBuffer> GrantedTimeWindowMpiInterface::m_pendingTx;
85 
86 #ifdef NS3_MPI
89 #endif
90 
91 TypeId
93 {
94  static TypeId tid = TypeId ("ns3::GrantedTimeWindowMpiInterface")
95  .SetParent<Object> ()
96  ;
97  return tid;
98 }
99 
100 void
102 {
103  NS_LOG_FUNCTION (this);
104 
105 #ifdef NS3_MPI
106  for (uint32_t i = 0; i < GetSize (); ++i)
107  {
108  delete [] m_pRxBuffers[i];
109  }
110  delete [] m_pRxBuffers;
111  delete [] m_requests;
112 
113  m_pendingTx.clear ();
114 #endif
115 }
116 
117 uint32_t
119 {
120  return m_rxCount;
121 }
122 
123 uint32_t
125 {
126  return m_txCount;
127 }
128 
129 uint32_t
131 {
132  if (!m_initialized)
133  {
135  m_initialized = true;
136  }
137  return m_sid;
138 }
139 
140 uint32_t
142 {
143  if (!m_initialized)
144  {
146  m_initialized = true;
147  }
148  return m_size;
149 }
150 
151 bool
153 {
154  if (!m_initialized)
155  {
157  m_initialized = true;
158  }
159  return m_enabled;
160 }
161 
162 void
163 GrantedTimeWindowMpiInterface::Enable (int* pargc, char*** pargv)
164 {
165  NS_LOG_FUNCTION (this << pargc << pargv);
166 
167 #ifdef NS3_MPI
168  // Initialize the MPI interface
169  MPI_Init (pargc, pargv);
170  MPI_Barrier (MPI_COMM_WORLD);
171  MPI_Comm_rank (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_sid));
172  MPI_Comm_size (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_size));
173  m_enabled = true;
174  m_initialized = true;
175  // Post a non-blocking receive for all peers
176  m_pRxBuffers = new char*[m_size];
178  for (uint32_t i = 0; i < GetSize (); ++i)
179  {
180  m_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
181  MPI_Irecv (m_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
182  MPI_COMM_WORLD, &m_requests[i]);
183  }
184 #else
185  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
186 #endif
187 }
188 
189 void
190 GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
191 {
192  NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
193 
194 #ifdef NS3_MPI
195  SentBuffer sendBuf;
196  m_pendingTx.push_back (sendBuf);
197  std::list<SentBuffer>::reverse_iterator i = m_pendingTx.rbegin (); // Points to the last element
198 
199  uint32_t serializedSize = p->GetSerializedSize ();
200  uint8_t* buffer = new uint8_t[serializedSize + 16];
201  i->SetBuffer (buffer);
202  // Add the time, dest node and dest device
203  uint64_t t = rxTime.GetInteger ();
204  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
205  *pTime++ = t;
206  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
207  *pData++ = node;
208  *pData++ = dev;
209  // Serialize the packet
210  p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
211 
212  // Find the system id for the destination node
213  Ptr<Node> destNode = NodeList::GetNode (node);
214  uint32_t nodeSysId = destNode->GetSystemId ();
215 
216  MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId,
217  0, MPI_COMM_WORLD, (i->GetRequest ()));
218  m_txCount++;
219 #else
220  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
221 #endif
222 }
223 
224 void
226 {
228 
229 #ifdef NS3_MPI
230  // Poll the non-block reads to see if data arrived
231  while (true)
232  {
233  int flag = 0;
234  int index = 0;
235  MPI_Status status;
236 
237  MPI_Testany (MpiInterface::GetSize (), m_requests, &index, &flag, &status);
238  if (!flag)
239  {
240  break; // No more messages
241  }
242  int count;
243  MPI_Get_count (&status, MPI_CHAR, &count);
244  m_rxCount++; // Count this receive
245 
246  // Get the meta data first
247  uint64_t* pTime = reinterpret_cast<uint64_t *> (m_pRxBuffers[index]);
248  uint64_t time = *pTime++;
249  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
250  uint32_t node = *pData++;
251  uint32_t dev = *pData++;
252 
253  Time rxTime (time);
254 
255  count -= sizeof (time) + sizeof (node) + sizeof (dev);
256 
257  Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
258 
259  // Find the correct node/device to schedule receive event
260  Ptr<Node> pNode = NodeList::GetNode (node);
261  Ptr<MpiReceiver> pMpiRec = 0;
262  uint32_t nDevices = pNode->GetNDevices ();
263  for (uint32_t i = 0; i < nDevices; ++i)
264  {
265  Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
266  if (pThisDev->GetIfIndex () == dev)
267  {
268  pMpiRec = pThisDev->GetObject<MpiReceiver> ();
269  break;
270  }
271  }
272 
273  NS_ASSERT (pNode && pMpiRec);
274 
275  // Schedule the rx event
276  Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
277  &MpiReceiver::Receive, pMpiRec, p);
278 
279  // Re-queue the next read
280  MPI_Irecv (m_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
281  MPI_COMM_WORLD, &m_requests[index]);
282  }
283 #else
284  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
285 #endif
286 }
287 
288 void
290 {
292 
293 #ifdef NS3_MPI
294  std::list<SentBuffer>::iterator i = m_pendingTx.begin ();
295  while (i != m_pendingTx.end ())
296  {
297  MPI_Status status;
298  int flag = 0;
299  MPI_Test (i->GetRequest (), &flag, &status);
300  std::list<SentBuffer>::iterator current = i; // Save current for erasing
301  i++; // Advance to next
302  if (flag)
303  { // This message is complete
304  m_pendingTx.erase (current);
305  }
306  }
307 #else
308  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
309 #endif
310 }
311 
312 void
314 {
316 
317 #ifdef NS3_MPI
318  int flag = 0;
319  MPI_Initialized (&flag);
320  if (flag)
321  {
322  MPI_Finalize ();
323  m_enabled = false;
324  m_initialized = false;
325  }
326  else
327  {
328  NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
329  }
330 #else
331  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
332 #endif
333 }
334 
335 
336 } // namespace ns3
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:95
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
static Ptr< SimulatorImpl > GetImplementation(void)
Get the SimulatorImpl singleton.
Definition: simulator.cc:390
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:240
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:603
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:61
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:201
#define NS_FATAL_ERROR(msg)
Fatal error handling.
Definition: fatal-error.h:100
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
virtual void Enable(int *pargc, char ***pargv)
uint32_t GetSystemId(void) const
Definition: node.cc:113
static void TestSendComplete()
Check for completed sends.
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:43
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:134
Tracks non-blocking sends.
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:42
static void ScheduleWithContext(uint32_t context, Time const &time, MEM mem_ptr, OBJ obj)
Schedule an event with the given context.
Definition: simulator.h:899
uint32_t GetNDevices(void) const
Definition: node.cc:142
Every class exported by the ns3 library is enclosed in the ns3 namespace.
int64_t GetTimeStep(void) const
Definition: nstime.h:357
int64_t GetInteger(void) const
Definition: nstime.h:365
static Time Now(void)
Return the current simulation virtual time.
Definition: simulator.cc:223
virtual void Disable()
Terminates the MPI environment by calling MPI_Finalize This function must be called after Destroy () ...
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
MPI_Request * GetRequest()
uint32_t GetId(void) const
Definition: node.cc:106
static void ReceiveMessages()
Check for received messages complete.
uint32_t GetSerializedSize(void) const
Returns number of bytes required for packet serialization.
Definition: packet.cc:560
const uint32_t MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
A base class which provides memory management and object aggregation.
Definition: object.h:87
a unique identifier for an interface.
Definition: type-id.h:51
TypeId SetParent(TypeId tid)
Definition: type-id.cc:631
static uint32_t GetSize()