A Discrete-Event Network Simulator
API
null-message-mpi-interface.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright 2013. Lawrence Livermore National Security, LLC.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License version 2 as
7  * published by the Free Software Foundation;
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * Author: Steven Smith <smith84@llnl.gov>
19  *
20  */
21 
23 
26 #include "remote-channel-bundle.h"
27 
28 #include "ns3/mpi-receiver.h"
29 #include "ns3/node.h"
30 #include "ns3/node-list.h"
31 #include "ns3/net-device.h"
32 #include "ns3/nstime.h"
33 #include "ns3/simulator.h"
34 #include "ns3/log.h"
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 #include <iostream>
41 #include <iomanip>
42 #include <list>
43 
44 namespace ns3 {
45 
46 NS_LOG_COMPONENT_DEFINE ("NullMessageMpiInterface");
47 
52 #ifdef NS3_MPI
53 const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE = 2000;
54 #endif
55 
57 {
58  m_buffer = 0;
59  m_request = 0;
60 }
61 
63 {
64  delete [] m_buffer;
65 }
66 
67 uint8_t*
69 {
70  return m_buffer;
71 }
72 
73 void
75 {
76  m_buffer = buffer;
77 }
78 
81 {
82  return &m_request;
83 }
84 
90 std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
91 
94 
96 {
97  NS_LOG_FUNCTION (this);
98 
99 #ifndef NS3_MPI
100  /*
101  * This class can only be constructed if MPI is available. Fail if an
102  * attempt is made to instantiate this class without MPI.
103  */
104  NS_FATAL_ERROR ("Must compile with MPI if Null Message simulator is used, see --enable-mpi option for waf");
105 #endif
106 }
107 
109 {
110  NS_LOG_FUNCTION (this);
111 }
112 
113 void
115 {
116  NS_LOG_FUNCTION (this);
117 }
118 
119 uint32_t
121 {
123  return g_sid;
124 }
125 
126 uint32_t
128 {
130  return g_size;
131 }
132 
133 bool
135 {
136  if (!g_initialized)
137  {
139  g_initialized = true;
140  }
141  return g_enabled;
142 }
143 
144 void
145 NullMessageMpiInterface::Enable (int* pargc, char*** pargv)
146 {
147  NS_LOG_FUNCTION (this << *pargc);
148 #ifdef NS3_MPI
149 
150  // Initialize the MPI interface
151  MPI_Init (pargc, pargv);
152  MPI_Barrier (MPI_COMM_WORLD);
153 
154  // SystemId and Size are unit32_t in interface but MPI uses int so convert.
155  int mpiSystemId;
156  int mpiSize;
157  MPI_Comm_rank (MPI_COMM_WORLD, &mpiSystemId);
158  MPI_Comm_size (MPI_COMM_WORLD, &mpiSize);
159 
160  g_sid = mpiSystemId;
161  g_size = mpiSize;
162 
163  g_enabled = true;
164  g_initialized = true;
165 
166 #endif
167 }
168 
169 void
171 {
173 #ifdef NS3_MPI
175 
177 
178  // Post a non-blocking receive for all peers
180  g_pRxBuffers = new char*[g_numNeighbors];
181  int index = 0;
182  for (uint32_t rank = 0; rank < g_size; ++rank)
183  {
185  if (bundle)
186  {
187  g_pRxBuffers[index] = new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE];
188  MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, rank, 0,
189  MPI_COMM_WORLD, &g_requests[index]);
190  ++index;
191  }
192  }
193 #endif
194 }
195 
196 void
197 NullMessageMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
198 {
199  NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
200 
202 
203 #ifdef NS3_MPI
204 
205  // Find the system id for the destination node
206  Ptr<Node> destNode = NodeList::GetNode (node);
207  uint32_t nodeSysId = destNode->GetSystemId ();
208 
209  NullMessageSentBuffer sendBuf;
210  g_pendingTx.push_back (sendBuf);
211  std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
212 
213  uint32_t serializedSize = p->GetSerializedSize ();
214  uint32_t bufferSize = serializedSize + ( 2 * sizeof (uint64_t) ) + ( 2 * sizeof (uint32_t) );
215  uint8_t* buffer = new uint8_t[bufferSize];
216  iter->SetBuffer (buffer);
217  // Add the time, dest node and dest device
218  uint64_t t = rxTime.GetInteger ();
219  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
220  *pTime++ = t;
221 
222  Time guarantee_update = NullMessageSimulatorImpl::GetInstance ()->CalculateGuaranteeTime (nodeSysId);
223  *pTime++ = guarantee_update.GetTimeStep ();
224 
225  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
226  *pData++ = node;
227  *pData++ = dev;
228  // Serialize the packet
229  p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
230 
231  MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
232  0, MPI_COMM_WORLD, (iter->GetRequest ()));
233 
235 
236 #endif
237 }
238 
239 void
241 {
242  NS_LOG_FUNCTION (guarantee_update.GetTimeStep () << bundle);
243 
245 
246 #ifdef NS3_MPI
247 
248  NullMessageSentBuffer sendBuf;
249  g_pendingTx.push_back (sendBuf);
250  std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
251 
252  uint32_t bufferSize = 2 * sizeof (uint64_t) + 2 * sizeof (uint32_t);
253  uint8_t* buffer = new uint8_t[bufferSize];
254  iter->SetBuffer (buffer);
255  // Add the time, dest node and dest device
256  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
257  *pTime++ = 0;
258  *pTime++ = guarantee_update.GetInteger ();
259  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
260  *pData++ = 0;
261  *pData++ = 0;
262 
263  // Find the system id for the destination MPI rank
264  uint32_t nodeSysId = bundle->GetSystemId ();
265 
266  MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
267  0, MPI_COMM_WORLD, (iter->GetRequest ()));
268 #endif
269 }
270 
271 void
273 {
275 
276  ReceiveMessages(true);
277 }
278 
279 
280 void
282 {
284 
285  ReceiveMessages(false);
286 }
287 
288 
289 void
291 {
292  NS_LOG_FUNCTION (blocking);
293 
295 
296 #ifdef NS3_MPI
297 
298  // stop flag set to true when no more messages are found to
299  // process.
300  bool stop = false;
301 
302 
303  if (!g_numNeighbors) {
304  // Not communicating with anyone.
305  return;
306  }
307 
308  do
309  {
310  int messageReceived = 0;
311  int index = 0;
312  MPI_Status status;
313 
314  if (blocking)
315  {
316  MPI_Waitany (g_numNeighbors, g_requests, &index, &status);
317  messageReceived = 1; /* Wait always implies message was received */
318  stop = true;
319  }
320  else
321  {
322  MPI_Testany (g_numNeighbors, g_requests, &index, &messageReceived, &status);
323  }
324 
325  if (messageReceived)
326  {
327  int count;
328  MPI_Get_count (&status, MPI_CHAR, &count);
329 
330  // Get the meta data first
331  uint64_t* pTime = reinterpret_cast<uint64_t *> (g_pRxBuffers[index]);
332  uint64_t time = *pTime++;
333  uint64_t guaranteeUpdate = *pTime++;
334 
335  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
336  uint32_t node = *pData++;
337  uint32_t dev = *pData++;
338 
339  Time rxTime (time);
340 
341  // rxtime == 0 means this is a Null Message
342  if (rxTime > Time (0))
343  {
344  count -= sizeof (time) + sizeof (guaranteeUpdate) + sizeof (node) + sizeof (dev);
345 
346  Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
347 
348  // Find the correct node/device to schedule receive event
349  Ptr<Node> pNode = NodeList::GetNode (node);
350  Ptr<MpiReceiver> pMpiRec = 0;
351  uint32_t nDevices = pNode->GetNDevices ();
352  for (uint32_t i = 0; i < nDevices; ++i)
353  {
354  Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
355  if (pThisDev->GetIfIndex () == dev)
356  {
357  pMpiRec = pThisDev->GetObject<MpiReceiver> ();
358  break;
359  }
360  }
361  NS_ASSERT (pNode && pMpiRec);
362 
363  // Schedule the rx event
364  Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
365  &MpiReceiver::Receive, pMpiRec, p);
366 
367  }
368 
369  // Update guarantee time for both packet receives and Null Messages.
370  Ptr<RemoteChannelBundle> bundle = RemoteChannelBundleManager::Find (status.MPI_SOURCE);
371  NS_ASSERT (bundle);
372 
373  bundle->SetGuaranteeTime (Time (guaranteeUpdate));
374 
375  // Re-queue the next read
376  MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, status.MPI_SOURCE, 0,
377  MPI_COMM_WORLD, &g_requests[index]);
378 
379  }
380  else
381  {
382  // if non-blocking and no message received in testany then stop message loop
383  stop = true;
384  }
385  }
386  while (!stop);
387 #endif
388 }
389 
390 void
392 {
394 
396 
397 #ifdef NS3_MPI
398  std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin ();
399  while (iter != g_pendingTx.end ())
400  {
401  MPI_Status status;
402  int flag = 0;
403  MPI_Test (iter->GetRequest (), &flag, &status);
404  std::list<NullMessageSentBuffer>::iterator current = iter; // Save current for erasing
405  ++iter; // Advance to next
406  if (flag)
407  { // This message is complete
408  g_pendingTx.erase (current);
409  }
410  }
411 #endif
412 }
413 
414 void
416 {
417  NS_LOG_FUNCTION (this);
418 
419 #ifdef NS3_MPI
420  int flag = 0;
421  MPI_Initialized (&flag);
422  if (flag)
423  {
424 
425  for (std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin ();
426  iter != g_pendingTx.end ();
427  ++iter)
428  {
429  MPI_Cancel (iter->GetRequest ());
430  MPI_Request_free (iter->GetRequest ());
431  }
432 
433  for (uint32_t i = 0; i < g_numNeighbors; ++i)
434  {
435  MPI_Cancel (&g_requests[i]);
436  MPI_Request_free (&g_requests[i]);
437  }
438 
439  MPI_Finalize ();
440 
441  for (uint32_t i = 0; i < g_numNeighbors; ++i)
442  {
443  delete [] g_pRxBuffers[i];
444  }
445  delete [] g_pRxBuffers;
446  delete [] g_requests;
447 
448  g_pendingTx.clear ();
449 
450  g_enabled = false;
451  g_initialized = false;
452 
453  }
454  else
455  {
456  NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
457  }
458 #endif
459 }
460 
461 } // namespace ns3
virtual void Destroy()
Delete all buffers.
Time CalculateGuaranteeTime(uint32_t systemId)
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:102
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:73
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
static Ptr< SimulatorImpl > GetImplementation(void)
Get the SimulatorImpl singleton.
Definition: simulator.cc:400
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:241
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:591
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:67
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:201
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:162
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
uint32_t GetSystemId(void) const
Definition: node.cc:121
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
static NullMessageSimulatorImpl * GetInstance(void)
static void TestSendComplete()
Check for completed sends.
virtual void Enable(int *pargc, char ***pargv)
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:44
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:142
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:42
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
virtual void Disable()
Terminates the MPI environment by calling MPI_Finalize This function must be called after Destroy ()...
uint32_t GetNDevices(void) const
Definition: node.cc:150
Every class exported by the ns3 library is enclosed in the ns3 namespace.
static void ReceiveMessagesBlocking()
Blocking message receive.
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:377
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:385
NullMessageSentBuffer()
maximum MPI message size for easy buffer creation
static Time Now(void)
Return the current simulation virtual time.
Definition: simulator.cc:224
uint8_t * m_buffer
Buffer for send.
static void ScheduleWithContext(uint32_t context, Time const &delay, MEM mem_ptr, OBJ obj)
Schedule an event with the given context.
Definition: simulator.h:1319
uint32_t GetId(void) const
Definition: node.cc:107
uint32_t GetSerializedSize(void) const
Returns number of bytes required for packet serialization.
Definition: packet.cc:548
Non-blocking send buffers for Null Message implementation.
static void InitializeSendReceiveBuffers(void)
Initialize send and receive buffers.
static std::list< NullMessageSentBuffer > g_pendingTx
MPI_Request m_request
MPI request posted for the send.