A Discrete-Event Network Simulator
API
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
null-message-mpi-interface.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright 2013. Lawrence Livermore National Security, LLC.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License version 2 as
7  * published by the Free Software Foundation;
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * Author: Steven Smith <smith84@llnl.gov>
19  *
20  */
21 
23 
26 #include "remote-channel-bundle.h"
27 
28 #include "ns3/mpi-receiver.h"
29 #include "ns3/node.h"
30 #include "ns3/node-list.h"
31 #include "ns3/net-device.h"
32 #include "ns3/nstime.h"
33 #include "ns3/simulator.h"
34 #include "ns3/log.h"
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 #include <iostream>
41 #include <iomanip>
42 #include <list>
43 
44 NS_LOG_COMPONENT_DEFINE ("NullMessageMpiInterface");
45 
46 namespace ns3 {
47 
52 const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE = 2000;
53 
54 
56 {
57  m_buffer = 0;
58  m_request = 0;
59 }
60 
62 {
63  delete [] m_buffer;
64 }
65 
66 uint8_t*
68 {
69  return m_buffer;
70 }
71 
72 void
74 {
75  m_buffer = buffer;
76 }
77 
80 {
81  return &m_request;
82 }
83 
89 std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
90 
93 
95 {
96  NS_LOG_FUNCTION (this);
97 
98 #ifndef NS3_MPI
99  /*
100  * This class can only be constructed if MPI is available. Fail if an
101  * attempt is made to instantiate this class without MPI.
102  */
103  NS_FATAL_ERROR ("Must compile with MPI if Null Message simulator is used, see --enable-mpi option for waf");
104 #endif
105 }
106 
108 {
109  NS_LOG_FUNCTION (this);
110 }
111 
112 void
114 {
115  NS_LOG_FUNCTION (this);
116 }
117 
118 uint32_t
120 {
122  return g_sid;
123 }
124 
125 uint32_t
127 {
129  return g_size;
130 }
131 
132 bool
134 {
135  if (!g_initialized)
136  {
138  g_initialized = true;
139  }
140  return g_enabled;
141 }
142 
143 void
144 NullMessageMpiInterface::Enable (int* pargc, char*** pargv)
145 {
146  NS_LOG_FUNCTION (this << *pargc);
147 #ifdef NS3_MPI
148 
149  // Initialize the MPI interface
150  MPI_Init (pargc, pargv);
151  MPI_Barrier (MPI_COMM_WORLD);
152 
153  // SystemId and Size are unit32_t in interface but MPI uses int so convert.
154  int mpiSystemId;
155  int mpiSize;
156  MPI_Comm_rank (MPI_COMM_WORLD, &mpiSystemId);
157  MPI_Comm_size (MPI_COMM_WORLD, &mpiSize);
158 
159  g_sid = mpiSystemId;
160  g_size = mpiSize;
161 
162  g_enabled = true;
163  g_initialized = true;
164 
165 #endif
166 }
167 
168 void
170 {
172 #ifdef NS3_MPI
174 
176 
177  // Post a non-blocking receive for all peers
179  g_pRxBuffers = new char*[g_numNeighbors];
180  int index = 0;
181  for (uint32_t rank = 0; rank < g_size; ++rank)
182  {
184  if (bundle)
185  {
186  g_pRxBuffers[index] = new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE];
187  MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, rank, 0,
188  MPI_COMM_WORLD, &g_requests[index]);
189  ++index;
190  }
191  }
192 #endif
193 }
194 
195 void
196 NullMessageMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
197 {
198  NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
199 
201 
202 #ifdef NS3_MPI
203 
204  // Find the system id for the destination node
205  Ptr<Node> destNode = NodeList::GetNode (node);
206  uint32_t nodeSysId = destNode->GetSystemId ();
207 
208  NullMessageSentBuffer sendBuf;
209  g_pendingTx.push_back (sendBuf);
210  std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
211 
212  uint32_t serializedSize = p->GetSerializedSize ();
213  uint32_t bufferSize = serializedSize + ( 2 * sizeof (uint64_t) ) + ( 2 * sizeof (uint32_t) );
214  uint8_t* buffer = new uint8_t[bufferSize];
215  iter->SetBuffer (buffer);
216  // Add the time, dest node and dest device
217  uint64_t t = rxTime.GetInteger ();
218  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
219  *pTime++ = t;
220 
221  Time guarantee_update = NullMessageSimulatorImpl::GetInstance ()->CalculateGuaranteeTime (nodeSysId);
222  *pTime++ = guarantee_update.GetTimeStep ();
223 
224  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
225  *pData++ = node;
226  *pData++ = dev;
227  // Serialize the packet
228  p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
229 
230  MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
231  0, MPI_COMM_WORLD, (iter->GetRequest ()));
232 
234 
235 #endif
236 }
237 
238 void
240 {
241  NS_LOG_FUNCTION (guarantee_update.GetTimeStep () << bundle);
242 
244 
245 #ifdef NS3_MPI
246 
247  NullMessageSentBuffer sendBuf;
248  g_pendingTx.push_back (sendBuf);
249  std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
250 
251  uint32_t bufferSize = 2 * sizeof (uint64_t) + 2 * sizeof (uint32_t);
252  uint8_t* buffer = new uint8_t[bufferSize];
253  iter->SetBuffer (buffer);
254  // Add the time, dest node and dest device
255  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
256  *pTime++ = 0;
257  *pTime++ = guarantee_update.GetInteger ();
258  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
259  *pData++ = 0;
260  *pData++ = 0;
261 
262  // Find the system id for the destination MPI rank
263  uint32_t nodeSysId = bundle->GetSystemId ();
264 
265  MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
266  0, MPI_COMM_WORLD, (iter->GetRequest ()));
267 #endif
268 }
269 
270 void
272 {
274 
275  ReceiveMessages(true);
276 }
277 
278 
279 void
281 {
283 
284  ReceiveMessages(false);
285 }
286 
287 
288 void
290 {
291  NS_LOG_FUNCTION (blocking);
292 
294 
295 #ifdef NS3_MPI
296 
297  // stop flag set to true when no more messages are found to
298  // process.
299  bool stop = false;
300 
301 
302  if (!g_numNeighbors) {
303  // Not communicating with anyone.
304  return;
305  }
306 
307  do
308  {
309  int messageReceived = 0;
310  int index = 0;
311  MPI_Status status;
312 
313  if (blocking)
314  {
315  MPI_Waitany (g_numNeighbors, g_requests, &index, &status);
316  messageReceived = 1; /* Wait always implies message was received */
317  stop = true;
318  }
319  else
320  {
321  MPI_Testany (g_numNeighbors, g_requests, &index, &messageReceived, &status);
322  }
323 
324  if (messageReceived)
325  {
326  int count;
327  MPI_Get_count (&status, MPI_CHAR, &count);
328 
329  // Get the meta data first
330  uint64_t* pTime = reinterpret_cast<uint64_t *> (g_pRxBuffers[index]);
331  uint64_t time = *pTime++;
332  uint64_t guaranteeUpdate = *pTime++;
333 
334  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
335  uint32_t node = *pData++;
336  uint32_t dev = *pData++;
337 
338  Time rxTime (time);
339 
340  // rxtime == 0 means this is a Null Message
341  if (rxTime > 0)
342  {
343  count -= sizeof (time) + sizeof (guaranteeUpdate) + sizeof (node) + sizeof (dev);
344 
345  Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
346 
347  // Find the correct node/device to schedule receive event
348  Ptr<Node> pNode = NodeList::GetNode (node);
349  Ptr<MpiReceiver> pMpiRec = 0;
350  uint32_t nDevices = pNode->GetNDevices ();
351  for (uint32_t i = 0; i < nDevices; ++i)
352  {
353  Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
354  if (pThisDev->GetIfIndex () == dev)
355  {
356  pMpiRec = pThisDev->GetObject<MpiReceiver> ();
357  break;
358  }
359  }
360  NS_ASSERT (pNode && pMpiRec);
361 
362  // Schedule the rx event
363  Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
364  &MpiReceiver::Receive, pMpiRec, p);
365 
366  }
367 
368  // Update guarantee time for both packet receives and Null Messages.
369  Ptr<RemoteChannelBundle> bundle = RemoteChannelBundleManager::Find (status.MPI_SOURCE);
370  NS_ASSERT (bundle);
371 
372  bundle->SetGuaranteeTime (Time (guaranteeUpdate));
373 
374  // Re-queue the next read
375  MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, status.MPI_SOURCE, 0,
376  MPI_COMM_WORLD, &g_requests[index]);
377 
378  }
379  else
380  {
381  // if non-blocking and no message received in testany then stop message loop
382  stop = true;
383  }
384  }
385  while (!stop);
386 #endif
387 }
388 
389 void
391 {
393 
395 
396 #ifdef NS3_MPI
397  std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin ();
398  while (iter != g_pendingTx.end ())
399  {
400  MPI_Status status;
401  int flag = 0;
402  MPI_Test (iter->GetRequest (), &flag, &status);
403  std::list<NullMessageSentBuffer>::iterator current = iter; // Save current for erasing
404  ++iter; // Advance to next
405  if (flag)
406  { // This message is complete
407  g_pendingTx.erase (current);
408  }
409  }
410 #endif
411 }
412 
413 void
415 {
416  NS_LOG_FUNCTION (this);
417 
418 #ifdef NS3_MPI
419  int flag = 0;
420  MPI_Initialized (&flag);
421  if (flag)
422  {
423 
424  for (std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin ();
425  iter != g_pendingTx.end ();
426  ++iter)
427  {
428  MPI_Cancel (iter->GetRequest ());
429  MPI_Request_free (iter->GetRequest ());
430  }
431 
432  for (uint32_t i = 0; i < g_numNeighbors; ++i)
433  {
434  MPI_Cancel (&g_requests[i]);
435  MPI_Request_free (&g_requests[i]);
436  }
437 
438  MPI_Finalize ();
439 
440  for (uint32_t i = 0; i < g_numNeighbors; ++i)
441  {
442  delete [] g_pRxBuffers[i];
443  }
444  delete [] g_pRxBuffers;
445  delete [] g_requests;
446 
447  g_pendingTx.clear ();
448 
449  g_enabled = false;
450  g_initialized = false;
451 
452  }
453  else
454  {
455  NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
456  }
457 #endif
458 }
459 
460 } // namespace ns3
virtual void Destroy()
Delete all buffers.
Time CalculateGuaranteeTime(uint32_t systemId)
keep track of time values and allow control of global simulation resolution
Definition: nstime.h:81
smart pointer class similar to boost::intrusive_ptr
Definition: ptr.h:59
#define NS_LOG_FUNCTION(parameters)
Definition: log.h:345
const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
static Ptr< SimulatorImpl > GetImplementation(void)
Definition: simulator.cc:346
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:194
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:608
#define NS_ASSERT(condition)
Definition: assert.h:64
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
Definition: log.h:309
uint32_t GetSystemId(void) const
Definition: node.cc:111
#define NS_FATAL_ERROR(msg)
fatal error handling
Definition: fatal-error.h:72
NS_LOG_COMPONENT_DEFINE("NullMessageMpiInterface")
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
static NullMessageSimulatorImpl * GetInstance(void)
static void TestSendComplete()
Check for completed sends.
virtual void Enable(int *pargc, char ***pargv)
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:43
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
Ptr< NetDevice > GetDevice(uint32_t index) const
Definition: node.cc:132
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:42
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
static void ScheduleWithContext(uint32_t context, Time const &time, MEM mem_ptr, OBJ obj)
Schedule an event with the given context.
Definition: simulator.h:904
virtual void Disable()
Terminates the MPI environment by calling MPI_Finalize This function must be called after Destroy ()...
uint32_t GetNDevices(void) const
Definition: node.cc:140
static void ReceiveMessagesBlocking()
Blocking message receive.
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
int64_t GetTimeStep(void) const
Definition: nstime.h:356
int64_t GetInteger(void) const
Definition: nstime.h:364
static Time Now(void)
Return the "current simulation time".
Definition: simulator.cc:180
uint8_t * m_buffer
Buffer for send.
Time current
uint32_t GetId(void) const
Definition: node.cc:104
uint32_t GetSerializedSize(void) const
Definition: packet.cc:565
Non-blocking send buffers for Null Message implementation.
static void InitializeSendReceiveBuffers(void)
Initialize send and receive buffers.
static std::list< NullMessageSentBuffer > g_pendingTx
MPI_Request m_request
MPI request posted for the send.