A Discrete-Event Network Simulator
API
null-message-mpi-interface.cc
Go to the documentation of this file.
1 /*
2  * Copyright 2013. Lawrence Livermore National Security, LLC.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License version 2 as
6  * published by the Free Software Foundation;
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software
15  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16  *
17  * Author: Steven Smith <smith84@llnl.gov>
18  *
19  */
20 
28 
29 #include "mpi-receiver.h"
32 #include "remote-channel-bundle.h"
33 
34 #include "ns3/log.h"
35 #include "ns3/net-device.h"
36 #include "ns3/node-list.h"
37 #include "ns3/node.h"
38 #include "ns3/nstime.h"
39 #include "ns3/simulator.h"
40 
41 #include <iomanip>
42 #include <iostream>
43 #include <list>
44 #include <mpi.h>
45 
46 namespace ns3
47 {
48 
49 NS_LOG_COMPONENT_DEFINE("NullMessageMpiInterface");
50 
51 NS_OBJECT_ENSURE_REGISTERED(NullMessageMpiInterface);
52 
61 {
62  public:
65 
69  uint8_t* GetBuffer();
73  void SetBuffer(uint8_t* buffer);
77  MPI_Request* GetRequest();
78 
79  private:
83  uint8_t* m_buffer;
84 
88  MPI_Request m_request;
89 };
90 
95 const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE = 2000;
96 
98 {
99  m_buffer = nullptr;
100  m_request = MPI_REQUEST_NULL;
101 }
102 
104 {
105  delete[] m_buffer;
106 }
107 
108 uint8_t*
110 {
111  return m_buffer;
112 }
113 
114 void
116 {
117  m_buffer = buffer;
118 }
119 
120 MPI_Request*
122 {
123  return &m_request;
124 }
125 
126 uint32_t NullMessageMpiInterface::g_sid = 0;
131 
132 std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
133 
134 MPI_Comm NullMessageMpiInterface::g_communicator = MPI_COMM_WORLD;
138 
139 TypeId
141 {
142  static TypeId tid =
143  TypeId("ns3::NullMessageMpiInterface").SetParent<Object>().SetGroupName("Mpi");
144  return tid;
145 }
146 
148 {
149  NS_LOG_FUNCTION(this);
150 }
151 
153 {
154  NS_LOG_FUNCTION(this);
155 }
156 
157 void
159 {
160  NS_LOG_FUNCTION(this);
161 }
162 
163 uint32_t
165 {
167  return g_sid;
168 }
169 
170 uint32_t
172 {
174  return g_size;
175 }
176 
177 MPI_Comm
179 {
181  return g_communicator;
182 }
183 
184 bool
186 {
187  return g_enabled;
188 }
189 
190 void
191 NullMessageMpiInterface::Enable(int* pargc, char*** pargv)
192 {
193  NS_LOG_FUNCTION(this << *pargc);
194 
195  NS_ASSERT(g_enabled == false);
196 
197  // Initialize the MPI interface
198  MPI_Init(pargc, pargv);
199  Enable(MPI_COMM_WORLD);
200  g_mpiInitCalled = true;
201 }
202 
203 void
204 NullMessageMpiInterface::Enable(MPI_Comm communicator)
205 {
206  NS_LOG_FUNCTION(this);
207 
208  NS_ASSERT(g_enabled == false);
209 
210  // Standard MPI practice is to duplicate the communicator for
211  // library to use. Library communicates in isolated communication
212  // context.
213  MPI_Comm_dup(communicator, &g_communicator);
214  g_freeCommunicator = true;
215 
216  // SystemId and Size are unit32_t in interface but MPI uses int so convert.
217  int mpiSystemId;
218  int mpiSize;
219  MPI_Comm_rank(g_communicator, &mpiSystemId);
220  MPI_Comm_size(g_communicator, &mpiSize);
221 
222  g_sid = mpiSystemId;
223  g_size = mpiSize;
224 
225  g_enabled = true;
226 
227  MPI_Barrier(g_communicator);
228 }
229 
230 void
232 {
235 
237 
238  // Post a non-blocking receive for all peers
239  g_requests = new MPI_Request[g_numNeighbors];
240  g_pRxBuffers = new char*[g_numNeighbors];
241  int index = 0;
242  for (uint32_t rank = 0; rank < g_size; ++rank)
243  {
245  if (bundle)
246  {
247  g_pRxBuffers[index] = new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE];
248  MPI_Irecv(g_pRxBuffers[index],
250  MPI_CHAR,
251  rank,
252  0,
254  &g_requests[index]);
255  ++index;
256  }
257  }
258 }
259 
260 void
261 NullMessageMpiInterface::SendPacket(Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
262 {
263  NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
264 
266 
267  // Find the system id for the destination node
268  Ptr<Node> destNode = NodeList::GetNode(node);
269  uint32_t nodeSysId = destNode->GetSystemId();
270 
271  NullMessageSentBuffer sendBuf;
272  g_pendingTx.push_back(sendBuf);
273  auto iter = g_pendingTx.rbegin(); // Points to the last element
274 
275  uint32_t serializedSize = p->GetSerializedSize();
276  uint32_t bufferSize = serializedSize + (2 * sizeof(uint64_t)) + (2 * sizeof(uint32_t));
277  auto buffer = new uint8_t[bufferSize];
278  iter->SetBuffer(buffer);
279  // Add the time, dest node and dest device
280  uint64_t t = rxTime.GetInteger();
281  auto pTime = reinterpret_cast<uint64_t*>(buffer);
282  *pTime++ = t;
283 
284  Time guarantee_update =
286  *pTime++ = guarantee_update.GetTimeStep();
287 
288  auto pData = reinterpret_cast<uint32_t*>(pTime);
289  *pData++ = node;
290  *pData++ = dev;
291  // Serialize the packet
292  p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
293 
294  MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
295  bufferSize,
296  MPI_CHAR,
297  nodeSysId,
298  0,
300  (iter->GetRequest()));
301 
303 }
304 
305 void
308 {
309  NS_LOG_FUNCTION(guarantee_update.GetTimeStep() << bundle);
310 
312 
313  NullMessageSentBuffer sendBuf;
314  g_pendingTx.push_back(sendBuf);
315  auto iter = g_pendingTx.rbegin(); // Points to the last element
316 
317  uint32_t bufferSize = 2 * sizeof(uint64_t) + 2 * sizeof(uint32_t);
318  auto buffer = new uint8_t[bufferSize];
319  iter->SetBuffer(buffer);
320  // Add the time, dest node and dest device
321  auto pTime = reinterpret_cast<uint64_t*>(buffer);
322  *pTime++ = 0;
323  *pTime++ = guarantee_update.GetInteger();
324  auto pData = reinterpret_cast<uint32_t*>(pTime);
325  *pData++ = 0;
326  *pData++ = 0;
327 
328  // Find the system id for the destination MPI rank
329  uint32_t nodeSysId = bundle->GetSystemId();
330 
331  MPI_Isend(reinterpret_cast<void*>(iter->GetBuffer()),
332  bufferSize,
333  MPI_CHAR,
334  nodeSysId,
335  0,
337  (iter->GetRequest()));
338 }
339 
340 void
342 {
344 
345  ReceiveMessages(true);
346 }
347 
348 void
350 {
352 
353  ReceiveMessages(false);
354 }
355 
356 void
358 {
359  NS_LOG_FUNCTION(blocking);
360 
362 
363  // stop flag set to true when no more messages are found to
364  // process.
365  bool stop = false;
366 
367  if (!g_numNeighbors)
368  {
369  // Not communicating with anyone.
370  return;
371  }
372 
373  do
374  {
375  int messageReceived = 0;
376  int index = 0;
377  MPI_Status status;
378 
379  if (blocking)
380  {
381  MPI_Waitany(g_numNeighbors, g_requests, &index, &status);
382  messageReceived = 1; /* Wait always implies message was received */
383  stop = true;
384  }
385  else
386  {
387  MPI_Testany(g_numNeighbors, g_requests, &index, &messageReceived, &status);
388  }
389 
390  if (messageReceived)
391  {
392  int count;
393  MPI_Get_count(&status, MPI_CHAR, &count);
394 
395  // Get the meta data first
396  auto pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
397  uint64_t time = *pTime++;
398  uint64_t guaranteeUpdate = *pTime++;
399 
400  auto pData = reinterpret_cast<uint32_t*>(pTime);
401  uint32_t node = *pData++;
402  uint32_t dev = *pData++;
403 
404  Time rxTime(time);
405 
406  // rxtime == 0 means this is a Null Message
407  if (rxTime > Time(0))
408  {
409  count -= sizeof(time) + sizeof(guaranteeUpdate) + sizeof(node) + sizeof(dev);
410 
411  Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
412 
413  // Find the correct node/device to schedule receive event
414  Ptr<Node> pNode = NodeList::GetNode(node);
415  Ptr<MpiReceiver> pMpiRec = nullptr;
416  uint32_t nDevices = pNode->GetNDevices();
417  for (uint32_t i = 0; i < nDevices; ++i)
418  {
419  Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
420  if (pThisDev->GetIfIndex() == dev)
421  {
422  pMpiRec = pThisDev->GetObject<MpiReceiver>();
423  break;
424  }
425  }
426  NS_ASSERT(pNode && pMpiRec);
427 
428  // Schedule the rx event
430  rxTime - Simulator::Now(),
432  pMpiRec,
433  p);
434  }
435 
436  // Update guarantee time for both packet receives and Null Messages.
438  NS_ASSERT(bundle);
439 
440  bundle->SetGuaranteeTime(Time(guaranteeUpdate));
441 
442  // Re-queue the next read
443  MPI_Irecv(g_pRxBuffers[index],
445  MPI_CHAR,
446  status.MPI_SOURCE,
447  0,
449  &g_requests[index]);
450  }
451  else
452  {
453  // if non-blocking and no message received in testany then stop message loop
454  stop = true;
455  }
456  } while (!stop);
457 }
458 
459 void
461 {
463 
465 
466  auto iter = g_pendingTx.begin();
467  while (iter != g_pendingTx.end())
468  {
469  MPI_Status status;
470  int flag = 0;
471  MPI_Test(iter->GetRequest(), &flag, &status);
472  auto current = iter; // Save current for erasing
473  ++iter; // Advance to next
474  if (flag)
475  { // This message is complete
476  g_pendingTx.erase(current);
477  }
478  }
479 }
480 
481 void
483 {
484  NS_LOG_FUNCTION(this);
485 
486  if (g_enabled)
487  {
488  for (auto iter = g_pendingTx.begin(); iter != g_pendingTx.end(); ++iter)
489  {
490  MPI_Cancel(iter->GetRequest());
491  MPI_Request_free(iter->GetRequest());
492  }
493 
494  for (uint32_t i = 0; i < g_numNeighbors; ++i)
495  {
496  MPI_Cancel(&g_requests[i]);
497  MPI_Request_free(&g_requests[i]);
498  }
499 
500  for (uint32_t i = 0; i < g_numNeighbors; ++i)
501  {
502  delete[] g_pRxBuffers[i];
503  }
504  delete[] g_pRxBuffers;
505  delete[] g_requests;
506 
507  g_pendingTx.clear();
508 
509  if (g_freeCommunicator)
510  {
511  MPI_Comm_free(&g_communicator);
512  g_freeCommunicator = false;
513  }
514 
515  if (g_mpiInitCalled)
516  {
517  int flag = 0;
518  MPI_Initialized(&flag);
519  if (flag)
520  {
521  MPI_Finalize();
522  }
523  else
524  {
525  NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
526  }
527  }
528 
529  g_enabled = false;
530  g_mpiInitCalled = false;
531  }
532  else
533  {
534  NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
535  }
536 }
537 
538 } // namespace ns3
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:48
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:51
uint32_t GetSystemId() const
Definition: node.cc:131
uint32_t GetNDevices() const
Definition: node.cc:162
uint32_t GetId() const
Definition: node.cc:117
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:152
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:251
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void Destroy() override
Deletes storage used by the parallel environment.
static void ReceiveMessagesBlocking()
Blocking message receive.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
bool IsEnabled() override
Returns enabled state of parallel environment.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static TypeId GetTypeId()
Register this type.
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static MPI_Request * g_requests
Pending non-blocking receives.
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
static void TestSendComplete()
Check for completed sends.
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
static bool g_enabled
Has this interface been enabled.
static char ** g_pRxBuffers
Data buffers for non-blocking receives.
static void InitializeSendReceiveBuffers()
Initialize send and receive buffers.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t g_size
Size of the MPI COM_WORLD group.
void Disable() override
Clean up the ns-3 parallel communications interface.
static std::list< NullMessageSentBuffer > g_pendingTx
List of pending non-blocking sends.
static bool g_freeCommunicator
Did we create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
static uint32_t g_numNeighbors
Number of neighbor tasks, tasks that this task shares a link with.
Non-blocking send buffers for Null Message implementation.
MPI_Request m_request
MPI request posted for the send.
uint8_t * m_buffer
Buffer for send.
static NullMessageSimulatorImpl * GetInstance()
Time CalculateGuaranteeTime(uint32_t systemId)
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
A base class which provides memory management and object aggregation.
Definition: object.h:89
uint32_t GetSerializedSize() const
Returns number of bytes required for packet serialization.
Definition: packet.cc:610
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:663
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:77
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
Get the bundle corresponding to a remote rank.
static std::size_t Size()
Get the number of ns-3 channels in this bundle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:588
static Time Now()
Return the current simulation virtual time.
Definition: simulator.cc:208
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:455
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:445
a unique identifier for an interface.
Definition: type-id.h:59
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:931
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:179
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
ns3::MpiReceiver declaration, provides an interface to aggregate to MPI-compatible NetDevices.
void(* Time)(Time oldValue, Time newValue)
TracedValue callback signature for Time.
Definition: nstime.h:839
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
Declaration of classes ns3::NullMessageSentBuffer and ns3::NullMessageMpiInterface.
Declaration of class ns3::NullMessageSimulatorImpl.
Declaration of class ns3::RemoteChannelBundleManager.
Declaration of class ns3::RemoteChannelBundle.