A Discrete-Event Network Simulator
API
null-message-mpi-interface.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright 2013. Lawrence Livermore National Security, LLC.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License version 2 as
7  * published by the Free Software Foundation;
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * Author: Steven Smith <smith84@llnl.gov>
19  *
20  */
21 
23 
26 #include "remote-channel-bundle.h"
27 
28 #include "ns3/mpi-receiver.h"
29 #include "ns3/node.h"
30 #include "ns3/node-list.h"
31 #include "ns3/net-device.h"
32 #include "ns3/nstime.h"
33 #include "ns3/simulator.h"
34 #include "ns3/log.h"
35 
36 #include <mpi.h>
37 
38 #include <iostream>
39 #include <iomanip>
40 #include <list>
41 
42 namespace ns3 {
43 
44 NS_LOG_COMPONENT_DEFINE ("NullMessageMpiInterface");
45 
50 const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE = 2000;
51 
53 {
54  m_buffer = 0;
55  m_request = 0;
56 }
57 
59 {
60  delete [] m_buffer;
61 }
62 
63 uint8_t*
65 {
66  return m_buffer;
67 }
68 
69 void
71 {
72  m_buffer = buffer;
73 }
74 
75 MPI_Request*
77 {
78  return &m_request;
79 }
80 
86 std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
87 
90 
92 {
93  NS_LOG_FUNCTION (this);
94 }
95 
97 {
98  NS_LOG_FUNCTION (this);
99 }
100 
101 void
103 {
104  NS_LOG_FUNCTION (this);
105 }
106 
107 uint32_t
109 {
111  return g_sid;
112 }
113 
114 uint32_t
116 {
118  return g_size;
119 }
120 
121 bool
123 {
124  if (!g_initialized)
125  {
127  g_initialized = true;
128  }
129  return g_enabled;
130 }
131 
132 void
133 NullMessageMpiInterface::Enable (int* pargc, char*** pargv)
134 {
135  NS_LOG_FUNCTION (this << *pargc);
136 
137  // Initialize the MPI interface
138  MPI_Init (pargc, pargv);
139  MPI_Barrier (MPI_COMM_WORLD);
140 
141  // SystemId and Size are unit32_t in interface but MPI uses int so convert.
142  int mpiSystemId;
143  int mpiSize;
144  MPI_Comm_rank (MPI_COMM_WORLD, &mpiSystemId);
145  MPI_Comm_size (MPI_COMM_WORLD, &mpiSize);
146 
147  g_sid = mpiSystemId;
148  g_size = mpiSize;
149 
150  g_enabled = true;
151  g_initialized = true;
152 }
153 
154 void
156 {
159 
161 
162  // Post a non-blocking receive for all peers
163  g_requests = new MPI_Request[g_numNeighbors];
164  g_pRxBuffers = new char*[g_numNeighbors];
165  int index = 0;
166  for (uint32_t rank = 0; rank < g_size; ++rank)
167  {
169  if (bundle)
170  {
171  g_pRxBuffers[index] = new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE];
172  MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, rank, 0,
173  MPI_COMM_WORLD, &g_requests[index]);
174  ++index;
175  }
176  }
177 }
178 
179 void
180 NullMessageMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
181 {
182  NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
183 
185 
186  // Find the system id for the destination node
187  Ptr<Node> destNode = NodeList::GetNode (node);
188  uint32_t nodeSysId = destNode->GetSystemId ();
189 
190  NullMessageSentBuffer sendBuf;
191  g_pendingTx.push_back (sendBuf);
192  std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
193 
194  uint32_t serializedSize = p->GetSerializedSize ();
195  uint32_t bufferSize = serializedSize + ( 2 * sizeof (uint64_t) ) + ( 2 * sizeof (uint32_t) );
196  uint8_t* buffer = new uint8_t[bufferSize];
197  iter->SetBuffer (buffer);
198  // Add the time, dest node and dest device
199  uint64_t t = rxTime.GetInteger ();
200  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
201  *pTime++ = t;
202 
203  Time guarantee_update = NullMessageSimulatorImpl::GetInstance ()->CalculateGuaranteeTime (nodeSysId);
204  *pTime++ = guarantee_update.GetTimeStep ();
205 
206  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
207  *pData++ = node;
208  *pData++ = dev;
209  // Serialize the packet
210  p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
211 
212  MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
213  0, MPI_COMM_WORLD, (iter->GetRequest ()));
214 
216 }
217 
218 void
220 {
221  NS_LOG_FUNCTION (guarantee_update.GetTimeStep () << bundle);
222 
224 
225  NullMessageSentBuffer sendBuf;
226  g_pendingTx.push_back (sendBuf);
227  std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
228 
229  uint32_t bufferSize = 2 * sizeof (uint64_t) + 2 * sizeof (uint32_t);
230  uint8_t* buffer = new uint8_t[bufferSize];
231  iter->SetBuffer (buffer);
232  // Add the time, dest node and dest device
233  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
234  *pTime++ = 0;
235  *pTime++ = guarantee_update.GetInteger ();
236  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
237  *pData++ = 0;
238  *pData++ = 0;
239 
240  // Find the system id for the destination MPI rank
241  uint32_t nodeSysId = bundle->GetSystemId ();
242 
243  MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
244  0, MPI_COMM_WORLD, (iter->GetRequest ()));
245 }
246 
247 void
249 {
251 
252  ReceiveMessages(true);
253 }
254 
255 
256 void
258 {
260 
261  ReceiveMessages(false);
262 }
263 
264 
265 void
267 {
268  NS_LOG_FUNCTION (blocking);
269 
271 
272  // stop flag set to true when no more messages are found to
273  // process.
274  bool stop = false;
275 
276 
277  if (!g_numNeighbors) {
278  // Not communicating with anyone.
279  return;
280  }
281 
282  do
283  {
284  int messageReceived = 0;
285  int index = 0;
286  MPI_Status status;
287 
288  if (blocking)
289  {
290  MPI_Waitany (g_numNeighbors, g_requests, &index, &status);
291  messageReceived = 1; /* Wait always implies message was received */
292  stop = true;
293  }
294  else
295  {
296  MPI_Testany (g_numNeighbors, g_requests, &index, &messageReceived, &status);
297  }
298 
299  if (messageReceived)
300  {
301  int count;
302  MPI_Get_count (&status, MPI_CHAR, &count);
303 
304  // Get the meta data first
305  uint64_t* pTime = reinterpret_cast<uint64_t *> (g_pRxBuffers[index]);
306  uint64_t time = *pTime++;
307  uint64_t guaranteeUpdate = *pTime++;
308 
309  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
310  uint32_t node = *pData++;
311  uint32_t dev = *pData++;
312 
313  Time rxTime (time);
314 
315  // rxtime == 0 means this is a Null Message
316  if (rxTime > Time (0))
317  {
318  count -= sizeof (time) + sizeof (guaranteeUpdate) + sizeof (node) + sizeof (dev);
319 
320  Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
321 
322  // Find the correct node/device to schedule receive event
323  Ptr<Node> pNode = NodeList::GetNode (node);
324  Ptr<MpiReceiver> pMpiRec = 0;
325  uint32_t nDevices = pNode->GetNDevices ();
326  for (uint32_t i = 0; i < nDevices; ++i)
327  {
328  Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
329  if (pThisDev->GetIfIndex () == dev)
330  {
331  pMpiRec = pThisDev->GetObject<MpiReceiver> ();
332  break;
333  }
334  }
335  NS_ASSERT (pNode && pMpiRec);
336 
337  // Schedule the rx event
338  Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
339  &MpiReceiver::Receive, pMpiRec, p);
340 
341  }
342 
343  // Update guarantee time for both packet receives and Null Messages.
344  Ptr<RemoteChannelBundle> bundle = RemoteChannelBundleManager::Find (status.MPI_SOURCE);
345  NS_ASSERT (bundle);
346 
347  bundle->SetGuaranteeTime (Time (guaranteeUpdate));
348 
349  // Re-queue the next read
350  MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, status.MPI_SOURCE, 0,
351  MPI_COMM_WORLD, &g_requests[index]);
352 
353  }
354  else
355  {
356  // if non-blocking and no message received in testany then stop message loop
357  stop = true;
358  }
359  }
360  while (!stop);
361 }
362 
363 void
365 {
367 
369 
370  std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin ();
371  while (iter != g_pendingTx.end ())
372  {
373  MPI_Status status;
374  int flag = 0;
375  MPI_Test (iter->GetRequest (), &flag, &status);
376  std::list<NullMessageSentBuffer>::iterator current = iter; // Save current for erasing
377  ++iter; // Advance to next
378  if (flag)
379  { // This message is complete
380  g_pendingTx.erase (current);
381  }
382  }
383 }
384 
385 void
387 {
388  NS_LOG_FUNCTION (this);
389 
390  int flag = 0;
391  MPI_Initialized (&flag);
392  if (flag)
393  {
394 
395  for (std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin ();
396  iter != g_pendingTx.end ();
397  ++iter)
398  {
399  MPI_Cancel (iter->GetRequest ());
400  MPI_Request_free (iter->GetRequest ());
401  }
402 
403  for (uint32_t i = 0; i < g_numNeighbors; ++i)
404  {
405  MPI_Cancel (&g_requests[i]);
406  MPI_Request_free (&g_requests[i]);
407  }
408 
409  MPI_Finalize ();
410 
411  for (uint32_t i = 0; i < g_numNeighbors; ++i)
412  {
413  delete [] g_pRxBuffers[i];
414  }
415  delete [] g_pRxBuffers;
416  delete [] g_requests;
417 
418  g_pendingTx.clear ();
419 
420  g_enabled = false;
421  g_initialized = false;
422 
423  }
424  else
425  {
426  NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
427  }
428 }
429 
430 } // namespace ns3
virtual void Destroy()
Delete all buffers.
Time CalculateGuaranteeTime(uint32_t systemId)
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:102
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:73
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
uint32_t GetId(void) const
Definition: node.cc:109
const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:405
static Ptr< SimulatorImpl > GetImplementation(void)
Get the SimulatorImpl singleton.
Definition: simulator.cc:353
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:241
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:144
uint32_t GetSerializedSize(void) const
Returns number of bytes required for packet serialization.
Definition: packet.cc:585
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:67
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:205
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:162
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
static NullMessageSimulatorImpl * GetInstance(void)
static void TestSendComplete()
Check for completed sends.
virtual void Enable(int *pargc, char ***pargv)
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:44
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:42
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
virtual void Disable()
Terminates the MPI environment by calling MPI_Finalize This function must be called after Destroy ()...
static void ScheduleWithContext(uint32_t context, Time const &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:572
Every class exported by the ns3 library is enclosed in the ns3 namespace.
static void ReceiveMessagesBlocking()
Blocking message receive.
uint32_t GetSystemId(void) const
Definition: node.cc:123
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
static Time Now(void)
Return the current simulation virtual time.
Definition: simulator.cc:195
uint8_t * m_buffer
Buffer for send.
Non-blocking send buffers for Null Message implementation.
static void InitializeSendReceiveBuffers(void)
Initialize send and receive buffers.
static std::list< NullMessageSentBuffer > g_pendingTx
uint32_t GetNDevices(void) const
Definition: node.cc:152
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:628
MPI_Request m_request
MPI request posted for the send.
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:397