28 #include "ns3/mpi-receiver.h"
30 #include "ns3/node-list.h"
31 #include "ns3/net-device.h"
32 #include "ns3/nstime.h"
33 #include "ns3/simulator.h"
103 NS_FATAL_ERROR (
"Must compile with MPI if Null Message simulator is used, see --enable-mpi option for waf");
150 MPI_Init (pargc, pargv);
151 MPI_Barrier (MPI_COMM_WORLD);
156 MPI_Comm_rank (MPI_COMM_WORLD, &mpiSystemId);
157 MPI_Comm_size (MPI_COMM_WORLD, &mpiSize);
181 for (uint32_t rank = 0; rank <
g_size; ++rank)
210 std::list<NullMessageSentBuffer>::reverse_iterator iter =
g_pendingTx.rbegin ();
213 uint32_t bufferSize = serializedSize + ( 2 *
sizeof (uint64_t) ) + ( 2 *
sizeof (uint32_t) );
214 uint8_t* buffer =
new uint8_t[bufferSize];
215 iter->SetBuffer (buffer);
218 uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
224 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
228 p->
Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
230 MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
231 0, MPI_COMM_WORLD, (iter->GetRequest ()));
249 std::list<NullMessageSentBuffer>::reverse_iterator iter =
g_pendingTx.rbegin ();
251 uint32_t bufferSize = 2 *
sizeof (uint64_t) + 2 *
sizeof (uint32_t);
252 uint8_t* buffer =
new uint8_t[bufferSize];
253 iter->SetBuffer (buffer);
255 uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
258 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
263 uint32_t nodeSysId = bundle->GetSystemId ();
265 MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
266 0, MPI_COMM_WORLD, (iter->GetRequest ()));
309 int messageReceived = 0;
327 MPI_Get_count (&status, MPI_CHAR, &count);
330 uint64_t* pTime =
reinterpret_cast<uint64_t *
> (
g_pRxBuffers[index]);
331 uint64_t time = *pTime++;
332 uint64_t guaranteeUpdate = *pTime++;
334 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
335 uint32_t node = *pData++;
336 uint32_t dev = *pData++;
343 count -=
sizeof (time) +
sizeof (guaranteeUpdate) +
sizeof (node) +
sizeof (dev);
345 Ptr<Packet> p = Create<Packet> (
reinterpret_cast<uint8_t *
> (pData), count,
true);
351 for (uint32_t i = 0; i < nDevices; ++i)
354 if (pThisDev->GetIfIndex () == dev)
372 bundle->SetGuaranteeTime (
Time (guaranteeUpdate));
397 std::list<NullMessageSentBuffer>::iterator iter =
g_pendingTx.begin ();
402 MPI_Test (iter->GetRequest (), &flag, &status);
403 std::list<NullMessageSentBuffer>::iterator
current = iter;
420 MPI_Initialized (&flag);
424 for (std::list<NullMessageSentBuffer>::iterator iter =
g_pendingTx.begin ();
428 MPI_Cancel (iter->GetRequest ());
429 MPI_Request_free (iter->GetRequest ());
455 NS_FATAL_ERROR (
"Cannot disable MPI environment without Initializing it first");
virtual void Destroy()
Delete all buffers.
Time CalculateGuaranteeTime(uint32_t systemId)
static MPI_Request * g_requests
keep track of time values and allow control of global simulation resolution
smart pointer class similar to boost::intrusive_ptr
#define NS_LOG_FUNCTION(parameters)
MPI_Request * GetRequest()
const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
static Ptr< SimulatorImpl > GetImplementation(void)
static Ptr< Node > GetNode(uint32_t n)
~NullMessageMpiInterface()
virtual uint32_t GetSystemId()
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
#define NS_ASSERT(condition)
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
uint32_t GetSystemId(void) const
void SetBuffer(uint8_t *buffer)
#define NS_FATAL_ERROR(msg)
fatal error handling
NS_LOG_COMPONENT_DEFINE("NullMessageMpiInterface")
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
static NullMessageSimulatorImpl * GetInstance(void)
static void TestSendComplete()
Check for completed sends.
virtual uint32_t GetSize()
virtual void Enable(int *pargc, char ***pargv)
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
Ptr< NetDevice > GetDevice(uint32_t index) const
Class to aggregate to a NetDevice if it supports MPI capability.
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
static void ScheduleWithContext(uint32_t context, Time const &time, MEM mem_ptr, OBJ obj)
Schedule an event with the given context.
virtual void Disable()
Terminates the MPI environment by calling MPI_Finalize This function must be called after Destroy ()...
uint32_t GetNDevices(void) const
static void ReceiveMessagesBlocking()
Blocking message receive.
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
int64_t GetTimeStep(void) const
int64_t GetInteger(void) const
static Time Now(void)
Return the "current simulation time".
uint8_t * m_buffer
Buffer for send.
static uint32_t Size(void)
static bool g_initialized
uint32_t GetId(void) const
uint32_t GetSerializedSize(void) const
Non-blocking send buffers for Null Message implementation.
static char ** g_pRxBuffers
static void InitializeSendReceiveBuffers(void)
Initialize send and receive buffers.
static uint32_t g_numNeighbors
static std::list< NullMessageSentBuffer > g_pendingTx
NullMessageMpiInterface()
MPI_Request m_request
MPI request posted for the send.