28 #include "ns3/mpi-receiver.h"
30 #include "ns3/node-list.h"
31 #include "ns3/net-device.h"
32 #include "ns3/nstime.h"
33 #include "ns3/simulator.h"
53 const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE = 2000;
104 NS_FATAL_ERROR (
"Must compile with MPI if Null Message simulator is used, see --enable-mpi option for waf");
154 MPI_Init (pargc, pargv);
155 MPI_Barrier (MPI_COMM_WORLD);
160 MPI_Comm_rank (MPI_COMM_WORLD, &mpiSystemId);
161 MPI_Comm_size (MPI_COMM_WORLD, &mpiSize);
185 for (uint32_t rank = 0; rank <
g_size; ++rank)
190 g_pRxBuffers[index] =
new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE];
191 MPI_Irecv (
g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, rank, 0,
214 std::list<NullMessageSentBuffer>::reverse_iterator iter =
g_pendingTx.rbegin ();
217 uint32_t bufferSize = serializedSize + ( 2 *
sizeof (uint64_t) ) + ( 2 *
sizeof (uint32_t) );
218 uint8_t* buffer =
new uint8_t[bufferSize];
219 iter->SetBuffer (buffer);
222 uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
228 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
232 p->
Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
234 MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
235 0, MPI_COMM_WORLD, (iter->GetRequest ()));
253 std::list<NullMessageSentBuffer>::reverse_iterator iter =
g_pendingTx.rbegin ();
255 uint32_t bufferSize = 2 *
sizeof (uint64_t) + 2 *
sizeof (uint32_t);
256 uint8_t* buffer =
new uint8_t[bufferSize];
257 iter->SetBuffer (buffer);
259 uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
262 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
267 uint32_t nodeSysId = bundle->GetSystemId ();
269 MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
270 0, MPI_COMM_WORLD, (iter->GetRequest ()));
313 int messageReceived = 0;
331 MPI_Get_count (&status, MPI_CHAR, &count);
334 uint64_t* pTime =
reinterpret_cast<uint64_t *
> (
g_pRxBuffers[index]);
335 uint64_t time = *pTime++;
336 uint64_t guaranteeUpdate = *pTime++;
338 uint32_t* pData =
reinterpret_cast<uint32_t *
> (pTime);
339 uint32_t node = *pData++;
340 uint32_t dev = *pData++;
345 if (rxTime >
Time (0))
347 count -=
sizeof (time) +
sizeof (guaranteeUpdate) +
sizeof (node) +
sizeof (dev);
349 Ptr<Packet> p = Create<Packet> (
reinterpret_cast<uint8_t *
> (pData), count,
true);
355 for (uint32_t i = 0; i < nDevices; ++i)
358 if (pThisDev->GetIfIndex () == dev)
376 bundle->SetGuaranteeTime (
Time (guaranteeUpdate));
379 MPI_Irecv (
g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, status.MPI_SOURCE, 0,
401 std::list<NullMessageSentBuffer>::iterator iter =
g_pendingTx.begin ();
406 MPI_Test (iter->GetRequest (), &flag, &status);
407 std::list<NullMessageSentBuffer>::iterator current = iter;
424 MPI_Initialized (&flag);
428 for (std::list<NullMessageSentBuffer>::iterator iter =
g_pendingTx.begin ();
432 MPI_Cancel (iter->GetRequest ());
433 MPI_Request_free (iter->GetRequest ());
459 NS_FATAL_ERROR (
"Cannot disable MPI environment without Initializing it first");
virtual void Destroy()
Delete all buffers.
Time CalculateGuaranteeTime(uint32_t systemId)
static MPI_Request * g_requests
Simulation virtual time values and global simulation resolution.
Smart pointer class similar to boost::intrusive_ptr.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
MPI_Request * GetRequest()
static Ptr< SimulatorImpl > GetImplementation(void)
Get the SimulatorImpl singleton.
static Ptr< Node > GetNode(uint32_t n)
~NullMessageMpiInterface()
virtual uint32_t GetSystemId()
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
#define NS_UNUSED(x)
Mark a local variable as unused.
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
uint32_t GetSystemId(void) const
void SetBuffer(uint8_t *buffer)
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
static NullMessageSimulatorImpl * GetInstance(void)
static void TestSendComplete()
Check for completed sends.
virtual uint32_t GetSize()
virtual void Enable(int *pargc, char ***pargv)
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Class to aggregate to a NetDevice if it supports MPI capability.
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
virtual void Disable()
Terminates the MPI environment by calling MPI_Finalize This function must be called after Destroy ()...
uint32_t GetNDevices(void) const
Every class exported by the ns3 library is enclosed in the ns3 namespace.
static void ReceiveMessagesBlocking()
Blocking message receive.
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
NullMessageSentBuffer()
maximum MPI message size for easy buffer creation
static Time Now(void)
Return the current simulation virtual time.
uint8_t * m_buffer
Buffer for send.
static void ScheduleWithContext(uint32_t context, Time const &delay, MEM mem_ptr, OBJ obj)
Schedule an event with the given context.
static bool g_initialized
uint32_t GetId(void) const
uint32_t GetSerializedSize(void) const
Returns number of bytes required for packet serialization.
static std::size_t Size(void)
Non-blocking send buffers for Null Message implementation.
static char ** g_pRxBuffers
static void InitializeSendReceiveBuffers(void)
Initialize send and receive buffers.
static uint32_t g_numNeighbors
static std::list< NullMessageSentBuffer > g_pendingTx
NullMessageMpiInterface()
MPI_Request m_request
MPI request posted for the send.