A Discrete-Event Network Simulator
API
granted-time-window-mpi-interface.cc
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License version 2 as
4  * published by the Free Software Foundation;
5  *
6  * This program is distributed in the hope that it will be useful,
7  * but WITHOUT ANY WARRANTY; without even the implied warranty of
8  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9  * GNU General Public License for more details.
10  *
11  * You should have received a copy of the GNU General Public License
12  * along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * Author: George Riley <riley@ece.gatech.edu>
16  */
17 
24 // This object contains static methods that provide an easy interface
25 // to the necessary MPI information.
26 
28 
29 #include "mpi-interface.h"
30 #include "mpi-receiver.h"
31 
32 #include "ns3/log.h"
33 #include "ns3/net-device.h"
34 #include "ns3/node-list.h"
35 #include "ns3/node.h"
36 #include "ns3/nstime.h"
37 #include "ns3/simulator-impl.h"
38 #include "ns3/simulator.h"
39 
40 #include <iomanip>
41 #include <iostream>
42 #include <list>
43 #include <mpi.h>
44 
45 namespace ns3
46 {
47 
48 NS_LOG_COMPONENT_DEFINE("GrantedTimeWindowMpiInterface");
49 
50 NS_OBJECT_ENSURE_REGISTERED(GrantedTimeWindowMpiInterface);
51 
53 {
54  m_buffer = nullptr;
55  m_request = MPI_REQUEST_NULL;
56 }
57 
59 {
60  delete[] m_buffer;
61 }
62 
63 uint8_t*
65 {
66  return m_buffer;
67 }
68 
69 void
70 SentBuffer::SetBuffer(uint8_t* buffer)
71 {
72  m_buffer = buffer;
73 }
74 
75 MPI_Request*
77 {
78  return &m_request;
79 }
80 
87 std::list<SentBuffer> GrantedTimeWindowMpiInterface::g_pendingTx;
88 
91 MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
93 
94 TypeId
96 {
97  static TypeId tid =
98  TypeId("ns3::GrantedTimeWindowMpiInterface").SetParent<Object>().SetGroupName("Mpi");
99  return tid;
100 }
101 
102 void
104 {
105  NS_LOG_FUNCTION(this);
106 
107  for (uint32_t i = 0; i < GetSize(); ++i)
108  {
109  delete[] g_pRxBuffers[i];
110  }
111  delete[] g_pRxBuffers;
112  delete[] g_requests;
113 
114  g_pendingTx.clear();
115 }
116 
117 uint32_t
119 {
121  return g_rxCount;
122 }
123 
124 uint32_t
126 {
128  return g_txCount;
129 }
130 
131 uint32_t
133 {
135  return g_sid;
136 }
137 
138 uint32_t
140 {
142  return g_size;
143 }
144 
145 bool
147 {
148  return g_enabled;
149 }
150 
151 MPI_Comm
153 {
155  return g_communicator;
156 }
157 
158 void
159 GrantedTimeWindowMpiInterface::Enable(int* pargc, char*** pargv)
160 {
161  NS_LOG_FUNCTION(this << pargc << pargv);
162 
163  NS_ASSERT(g_enabled == false);
164 
165  // Initialize the MPI interface
166  MPI_Init(pargc, pargv);
167  Enable(MPI_COMM_WORLD);
168  g_mpiInitCalled = true;
169  g_enabled = true;
170 }
171 
172 void
174 {
175  NS_LOG_FUNCTION(this);
176 
177  NS_ASSERT(g_enabled == false);
178 
179  // Standard MPI practice is to duplicate the communicator for
180  // library to use. Library communicates in isolated communication
181  // context.
182  MPI_Comm_dup(communicator, &g_communicator);
183  g_freeCommunicator = true;
184 
185  MPI_Barrier(g_communicator);
186 
187  int mpiSystemId;
188  int mpiSize;
189  MPI_Comm_rank(g_communicator, &mpiSystemId);
190  MPI_Comm_size(g_communicator, &mpiSize);
191  g_sid = mpiSystemId;
192  g_size = mpiSize;
193 
194  g_enabled = true;
195  // Post a non-blocking receive for all peers
196  g_pRxBuffers = new char*[g_size];
197  g_requests = new MPI_Request[g_size];
198  for (uint32_t i = 0; i < GetSize(); ++i)
199  {
200  g_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
201  MPI_Irecv(g_pRxBuffers[i],
203  MPI_CHAR,
204  MPI_ANY_SOURCE,
205  0,
207  &g_requests[i]);
208  }
209 }
210 
211 void
213  const Time& rxTime,
214  uint32_t node,
215  uint32_t dev)
216 {
217  NS_LOG_FUNCTION(this << p << rxTime.GetTimeStep() << node << dev);
218 
219  SentBuffer sendBuf;
220  g_pendingTx.push_back(sendBuf);
221  auto i = g_pendingTx.rbegin(); // Points to the last element
222 
223  uint32_t serializedSize = p->GetSerializedSize();
224  auto buffer = new uint8_t[serializedSize + 16];
225  i->SetBuffer(buffer);
226  // Add the time, dest node and dest device
227  uint64_t t = rxTime.GetInteger();
228  auto pTime = reinterpret_cast<uint64_t*>(buffer);
229  *pTime++ = t;
230  auto pData = reinterpret_cast<uint32_t*>(pTime);
231  *pData++ = node;
232  *pData++ = dev;
233  // Serialize the packet
234  p->Serialize(reinterpret_cast<uint8_t*>(pData), serializedSize);
235 
236  // Find the system id for the destination node
237  Ptr<Node> destNode = NodeList::GetNode(node);
238  uint32_t nodeSysId = destNode->GetSystemId();
239 
240  MPI_Isend(reinterpret_cast<void*>(i->GetBuffer()),
241  serializedSize + 16,
242  MPI_CHAR,
243  nodeSysId,
244  0,
246  (i->GetRequest()));
247  g_txCount++;
248 }
249 
250 void
252 {
254 
255  // Poll the non-block reads to see if data arrived
256  while (true)
257  {
258  int flag = 0;
259  int index = 0;
260  MPI_Status status;
261 
262  MPI_Testany(MpiInterface::GetSize(), g_requests, &index, &flag, &status);
263  if (!flag)
264  {
265  break; // No more messages
266  }
267  int count;
268  MPI_Get_count(&status, MPI_CHAR, &count);
269  g_rxCount++; // Count this receive
270 
271  // Get the meta data first
272  auto pTime = reinterpret_cast<uint64_t*>(g_pRxBuffers[index]);
273  uint64_t time = *pTime++;
274  auto pData = reinterpret_cast<uint32_t*>(pTime);
275  uint32_t node = *pData++;
276  uint32_t dev = *pData++;
277 
278  Time rxTime(time);
279 
280  count -= sizeof(time) + sizeof(node) + sizeof(dev);
281 
282  Ptr<Packet> p = Create<Packet>(reinterpret_cast<uint8_t*>(pData), count, true);
283 
284  // Find the correct node/device to schedule receive event
285  Ptr<Node> pNode = NodeList::GetNode(node);
286  Ptr<MpiReceiver> pMpiRec = nullptr;
287  uint32_t nDevices = pNode->GetNDevices();
288  for (uint32_t i = 0; i < nDevices; ++i)
289  {
290  Ptr<NetDevice> pThisDev = pNode->GetDevice(i);
291  if (pThisDev->GetIfIndex() == dev)
292  {
293  pMpiRec = pThisDev->GetObject<MpiReceiver>();
294  break;
295  }
296  }
297 
298  NS_ASSERT(pNode && pMpiRec);
299 
300  // Schedule the rx event
302  rxTime - Simulator::Now(),
304  pMpiRec,
305  p);
306 
307  // Re-queue the next read
308  MPI_Irecv(g_pRxBuffers[index],
310  MPI_CHAR,
311  MPI_ANY_SOURCE,
312  0,
314  &g_requests[index]);
315  }
316 }
317 
318 void
320 {
322 
323  auto i = g_pendingTx.begin();
324  while (i != g_pendingTx.end())
325  {
326  MPI_Status status;
327  int flag = 0;
328  MPI_Test(i->GetRequest(), &flag, &status);
329  auto current = i; // Save current for erasing
330  i++; // Advance to next
331  if (flag)
332  { // This message is complete
333  g_pendingTx.erase(current);
334  }
335  }
336 }
337 
338 void
340 {
342 
343  if (g_freeCommunicator)
344  {
345  MPI_Comm_free(&g_communicator);
346  g_freeCommunicator = false;
347  }
348 
349  // ns-3 should MPI finalize only if ns-3 was used to initialize
350  if (g_mpiInitCalled)
351  {
352  int flag = 0;
353  MPI_Initialized(&flag);
354  if (flag)
355  {
356  MPI_Finalize();
357  }
358  else
359  {
360  NS_FATAL_ERROR("Cannot disable MPI environment without Initializing it first");
361  }
362  g_mpiInitCalled = false;
363  }
364 
365  g_enabled = false;
366 }
367 
368 } // namespace ns3
static void ReceiveMessages()
Check for received messages complete.
MPI_Comm GetCommunicator() override
Return the communicator used to run ns-3.
static bool g_freeCommunicator
Did ns-3 create the communicator? Have to free it.
uint32_t GetSystemId() override
Get the id number of this rank.
void Disable() override
Clean up the ns-3 parallel communications interface.
static void TestSendComplete()
Check for completed sends.
static bool g_mpiInitCalled
Has MPI Init been called by this interface.
void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev) override
Send a packet to a remote node.
static uint32_t g_size
Size of the MPI COM_WORLD group.
static bool g_enabled
Has this interface been enabled.
static std::list< SentBuffer > g_pendingTx
List of pending non-blocking sends.
void Enable(int *pargc, char ***pargv) override
Setup the parallel communication interface.
bool IsEnabled() override
Returns enabled state of parallel environment.
static MPI_Request * g_requests
Pending non-blocking receives.
static uint32_t g_rxCount
Total packets received.
uint32_t GetSize() override
Get the number of ranks used by ns-3.
static char ** g_pRxBuffers
Data buffers for non-blocking reads.
void Destroy() override
Deletes storage used by the parallel environment.
static uint32_t g_txCount
Total packets sent.
static MPI_Comm g_communicator
MPI communicator being used for ns-3 tasks.
static TypeId GetTypeId()
Register this type.
static uint32_t g_sid
System ID (rank) for this task.
static uint32_t GetSize()
Get the number of ranks used by ns-3.
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:48
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:51
uint32_t GetSystemId() const
Definition: node.cc:131
uint32_t GetNDevices() const
Definition: node.cc:162
uint32_t GetId() const
Definition: node.cc:117
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:152
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:251
A base class which provides memory management and object aggregation.
Definition: object.h:89
uint32_t GetSerializedSize() const
Returns number of bytes required for packet serialization.
Definition: packet.cc:610
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:663
Tracks non-blocking sends.
MPI_Request m_request
The MPI request handle.
static void ScheduleWithContext(uint32_t context, const Time &delay, FUNC f, Ts &&... args)
Schedule an event with the given context.
Definition: simulator.h:588
static Time Now()
Return the current simulation virtual time.
Definition: simulator.cc:208
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:455
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:445
a unique identifier for an interface.
Definition: type-id.h:59
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:931
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:179
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
Declaration of class ns3::MpiInterface.
ns3::MpiReceiver declaration, provides an interface to aggregate to MPI-compatible NetDevices.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
const uint32_t MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation