28 #include "ns3/mpi-receiver.h"
30 #include "ns3/node-list.h"
31 #include "ns3/net-device.h"
32 #include "ns3/nstime.h"
33 #include "ns3/simulator.h"
53 const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE = 2000;
104 NS_FATAL_ERROR (
"Must compile with MPI if Null Message simulator is used, see --enable-mpi option for waf");
151 MPI_Init (pargc, pargv);
152 MPI_Barrier (MPI_COMM_WORLD);
157 MPI_Comm_rank (MPI_COMM_WORLD, &mpiSystemId);
158 MPI_Comm_size (MPI_COMM_WORLD, &mpiSize);
182 for (uint32_t rank = 0; rank <
g_size; ++rank)
187 g_pRxBuffers[index] =
new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE];
188 MPI_Irecv (
g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, rank, 0,
211 std::list<NullMessageSentBuffer>::reverse_iterator iter =
g_pendingTx.rbegin ();
214 uint32_t bufferSize = serializedSize + ( 2 *
sizeof (uint64_t) ) + ( 2 *
sizeof (uint32_t) );
215 uint8_t* buffer =
new uint8_t[bufferSize];
216 iter->SetBuffer (buffer);
219 uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
225 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
229 p->
Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
231 MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
232 0, MPI_COMM_WORLD, (iter->GetRequest ()));
250 std::list<NullMessageSentBuffer>::reverse_iterator iter =
g_pendingTx.rbegin ();
252 uint32_t bufferSize = 2 *
sizeof (uint64_t) + 2 *
sizeof (uint32_t);
253 uint8_t* buffer =
new uint8_t[bufferSize];
254 iter->SetBuffer (buffer);
256 uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
259 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
264 uint32_t nodeSysId = bundle->GetSystemId ();
266 MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
267 0, MPI_COMM_WORLD, (iter->GetRequest ()));
310 int messageReceived = 0;
328 MPI_Get_count (&status, MPI_CHAR, &count);
331 uint64_t* pTime =
reinterpret_cast<uint64_t *
> (
g_pRxBuffers[index]);
332 uint64_t time = *pTime++;
333 uint64_t guaranteeUpdate = *pTime++;
335 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
336 uint32_t node = *pData++;
337 uint32_t dev = *pData++;
342 if (rxTime >
Time (0))
344 count -=
sizeof (time) +
sizeof (guaranteeUpdate) +
sizeof (node) +
sizeof (dev);
346 Ptr<Packet> p = Create<Packet> (
reinterpret_cast<uint8_t *
> (pData), count,
true);
352 for (uint32_t i = 0; i < nDevices; ++i)
355 if (pThisDev->GetIfIndex () == dev)
373 bundle->SetGuaranteeTime (
Time (guaranteeUpdate));
376 MPI_Irecv (
g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, status.MPI_SOURCE, 0,
398 std::list<NullMessageSentBuffer>::iterator iter =
g_pendingTx.begin ();
403 MPI_Test (iter->GetRequest (), &flag, &status);
404 std::list<NullMessageSentBuffer>::iterator current = iter;
421 MPI_Initialized (&flag);
425 for (std::list<NullMessageSentBuffer>::iterator iter =
g_pendingTx.begin ();
429 MPI_Cancel (iter->GetRequest ());
430 MPI_Request_free (iter->GetRequest ());
456 NS_FATAL_ERROR (
"Cannot disable MPI environment without Initializing it first");
virtual void Destroy()
Delete all buffers.
Time CalculateGuaranteeTime(uint32_t systemId)
static MPI_Request * g_requests
Simulation virtual time values and global simulation resolution.
Smart pointer class similar to boost::intrusive_ptr.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
MPI_Request * GetRequest()
static Ptr< SimulatorImpl > GetImplementation(void)
Get the SimulatorImpl singleton.
static Ptr< Node > GetNode(uint32_t n)
~NullMessageMpiInterface()
virtual uint32_t GetSystemId()
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
uint32_t GetSystemId(void) const
void SetBuffer(uint8_t *buffer)
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
static NullMessageSimulatorImpl * GetInstance(void)
static void TestSendComplete()
Check for completed sends.
virtual uint32_t GetSize()
virtual void Enable(int *pargc, char ***pargv)
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Class to aggregate to a NetDevice if it supports MPI capability.
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
virtual void Disable()
Terminates the MPI environment by calling MPI_Finalize This function must be called after Destroy ()...
uint32_t GetNDevices(void) const
Every class exported by the ns3 library is enclosed in the ns3 namespace.
static void ReceiveMessagesBlocking()
Blocking message receive.
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
NullMessageSentBuffer()
maximum MPI message size for easy buffer creation
static Time Now(void)
Return the current simulation virtual time.
uint8_t * m_buffer
Buffer for send.
static void ScheduleWithContext(uint32_t context, Time const &delay, MEM mem_ptr, OBJ obj)
Schedule an event with the given context.
static uint32_t Size(void)
static bool g_initialized
uint32_t GetId(void) const
uint32_t GetSerializedSize(void) const
Returns number of bytes required for packet serialization.
Non-blocking send buffers for Null Message implementation.
static char ** g_pRxBuffers
static void InitializeSendReceiveBuffers(void)
Initialize send and receive buffers.
static uint32_t g_numNeighbors
static std::list< NullMessageSentBuffer > g_pendingTx
NullMessageMpiInterface()
MPI_Request m_request
MPI request posted for the send.