A Discrete-Event Network Simulator
API
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License version 2 as
4  * published by the Free Software Foundation;
5  *
6  * This program is distributed in the hope that it will be useful,
7  * but WITHOUT ANY WARRANTY; without even the implied warranty of
8  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9  * GNU General Public License for more details.
10  *
11  * You should have received a copy of the GNU General Public License
12  * along with this program; if not, write to the Free Software
13  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14  *
15  * Author: George Riley <riley@ece.gatech.edu>
16  *
17  */
18 
26 
28 #include "mpi-interface.h"
29 
30 #include "ns3/assert.h"
31 #include "ns3/channel.h"
32 #include "ns3/event-impl.h"
33 #include "ns3/log.h"
34 #include "ns3/node-container.h"
35 #include "ns3/pointer.h"
36 #include "ns3/ptr.h"
37 #include "ns3/scheduler.h"
38 #include "ns3/simulator.h"
39 
40 #include <cmath>
41 #include <mpi.h>
42 
43 namespace ns3
44 {
45 
46 NS_LOG_COMPONENT_DEFINE("DistributedSimulatorImpl");
47 
48 NS_OBJECT_ENSURE_REGISTERED(DistributedSimulatorImpl);
49 
51 {
52 }
53 
54 Time
56 {
57  return m_smallestTime;
58 }
59 
60 uint32_t
62 {
63  return m_txCount;
64 }
65 
66 uint32_t
68 {
69  return m_rxCount;
70 }
71 
72 uint32_t
74 {
75  return m_myId;
76 }
77 
78 bool
80 {
81  return m_isFinished;
82 }
83 
90 
91 TypeId
93 {
94  static TypeId tid = TypeId("ns3::DistributedSimulatorImpl")
96  .SetGroupName("Mpi")
97  .AddConstructor<DistributedSimulatorImpl>();
98  return tid;
99 }
100 
102 {
103  NS_LOG_FUNCTION(this);
104 
107 
108  // Allocate the LBTS message buffer
110  m_grantedTime = Seconds(0);
111 
112  m_stop = false;
113  m_globalFinished = false;
116  m_currentTs = 0;
119  m_eventCount = 0;
120  m_events = nullptr;
121 }
122 
124 {
125  NS_LOG_FUNCTION(this);
126 }
127 
128 void
130 {
131  NS_LOG_FUNCTION(this);
132 
133  while (!m_events->IsEmpty())
134  {
135  Scheduler::Event next = m_events->RemoveNext();
136  next.impl->Unref();
137  }
138  m_events = nullptr;
139  delete[] m_pLBTS;
141 }
142 
143 void
145 {
146  NS_LOG_FUNCTION(this);
147 
148  while (!m_destroyEvents.empty())
149  {
150  Ptr<EventImpl> ev = m_destroyEvents.front().PeekEventImpl();
151  m_destroyEvents.pop_front();
152  NS_LOG_LOGIC("handle destroy " << ev);
153  if (!ev->IsCancelled())
154  {
155  ev->Invoke();
156  }
157  }
158 
160 }
161 
162 void
164 {
165  NS_LOG_FUNCTION(this);
166 
167  /* If running sequential simulation can ignore lookahead */
168  if (MpiInterface::GetSize() <= 1)
169  {
170  m_lookAhead = Seconds(0);
171  }
172  else
173  {
175  for (auto iter = c.Begin(); iter != c.End(); ++iter)
176  {
177  if ((*iter)->GetSystemId() != MpiInterface::GetSystemId())
178  {
179  continue;
180  }
181 
182  for (uint32_t i = 0; i < (*iter)->GetNDevices(); ++i)
183  {
184  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice(i);
185  // only works for p2p links currently
186  if (!localNetDevice->IsPointToPoint())
187  {
188  continue;
189  }
190  Ptr<Channel> channel = localNetDevice->GetChannel();
191  if (!channel)
192  {
193  continue;
194  }
195 
196  // grab the adjacent node
197  Ptr<Node> remoteNode;
198  if (channel->GetDevice(0) == localNetDevice)
199  {
200  remoteNode = (channel->GetDevice(1))->GetNode();
201  }
202  else
203  {
204  remoteNode = (channel->GetDevice(0))->GetNode();
205  }
206 
207  // if it's not remote, don't consider it
208  if (remoteNode->GetSystemId() == MpiInterface::GetSystemId())
209  {
210  continue;
211  }
212 
213  // compare delay on the channel with current value of
214  // m_lookAhead. if delay on channel is smaller, make
215  // it the new lookAhead.
216  TimeValue delay;
217  channel->GetAttribute("Delay", delay);
218 
219  if (delay.Get() < m_lookAhead)
220  {
221  m_lookAhead = delay.Get();
222  }
223  }
224  }
225  }
226 
227  // m_lookAhead is now set
229 
230  /*
231  * Compute the maximum inter-task latency and use that value
232  * for tasks with no inter-task links.
233  *
234  * Special processing for edge cases. For tasks that have no
235  * nodes need to determine a reasonable lookAhead value. Infinity
236  * would work correctly but introduces a performance issue; tasks
237  * with an infinite lookAhead would execute all their events
238  * before doing an AllGather resulting in very bad load balance
239  * during the first time window. Since all tasks participate in
240  * the AllGather it is desirable to have all the tasks advance in
241  * simulation time at a similar rate assuming roughly equal events
242  * per unit of simulation time in order to equalize the amount of
243  * work per time window.
244  */
245  long sendbuf;
246  long recvbuf;
247 
248  /* Tasks with no inter-task links do not contribute to max */
250  {
251  sendbuf = 0;
252  }
253  else
254  {
255  sendbuf = m_lookAhead.GetInteger();
256  }
257 
258  MPI_Allreduce(&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MpiInterface::GetCommunicator());
259 
260  /* For nodes that did not compute a lookahead use max from ranks
261  * that did compute a value. An edge case occurs if all nodes have
262  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
263  * will proceed without synchronization until a single AllGather
264  * occurs when all tasks have finished.
265  */
266  if (m_lookAhead == GetMaximumSimulationTime() && recvbuf != 0)
267  {
268  m_lookAhead = Time(recvbuf);
270  }
271 }
272 
273 void
275 {
276  if (lookAhead > Time(0))
277  {
278  NS_LOG_FUNCTION(this << lookAhead);
279  m_lookAhead = Min(m_lookAhead, lookAhead);
280  }
281  else
282  {
283  NS_LOG_WARN("attempted to set lookahead to a negative time: " << lookAhead);
284  }
285 }
286 
287 void
289 {
290  NS_LOG_FUNCTION(this << schedulerFactory);
291 
292  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler>();
293 
294  if (m_events)
295  {
296  while (!m_events->IsEmpty())
297  {
298  Scheduler::Event next = m_events->RemoveNext();
299  scheduler->Insert(next);
300  }
301  }
302  m_events = scheduler;
303 }
304 
305 void
307 {
308  NS_LOG_FUNCTION(this);
309 
310  Scheduler::Event next = m_events->RemoveNext();
311 
312  PreEventHook(EventId(next.impl, next.key.m_ts, next.key.m_context, next.key.m_uid));
313 
314  NS_ASSERT(next.key.m_ts >= m_currentTs);
316  m_eventCount++;
317 
318  NS_LOG_LOGIC("handle " << next.key.m_ts);
319  m_currentTs = next.key.m_ts;
321  m_currentUid = next.key.m_uid;
322  next.impl->Invoke();
323  next.impl->Unref();
324 }
325 
326 bool
328 {
329  return m_globalFinished;
330 }
331 
332 bool
334 {
335  return m_events->IsEmpty() || m_stop;
336 }
337 
338 uint64_t
340 {
341  // If local MPI task is has no more events or stop was called
342  // next event time is infinity.
343  if (IsLocalFinished())
344  {
346  }
347  else
348  {
349  Scheduler::Event ev = m_events->PeekNext();
350  return ev.key.m_ts;
351  }
352 }
353 
354 Time
356 {
357  return TimeStep(NextTs());
358 }
359 
360 void
362 {
363  NS_LOG_FUNCTION(this);
364 
366  m_stop = false;
367  m_globalFinished = false;
368  while (!m_globalFinished)
369  {
370  Time nextTime = Next();
371 
372  // If local event is beyond grantedTime then need to synchronize
373  // with other tasks to determine new time window. If local task
374  // is finished then continue to participate in allgather
375  // synchronizations with other tasks until all tasks have
376  // completed.
377  if (nextTime > m_grantedTime || IsLocalFinished())
378  {
379  // Can't process next event, calculate a new LBTS
380  // First receive any pending messages
382  // reset next time
383  nextTime = Next();
384  // And check for send completes
386  // Finally calculate the lbts
389  m_myId,
390  IsLocalFinished(),
391  nextTime);
392  m_pLBTS[m_myId] = lMsg;
393  MPI_Allgather(&lMsg,
394  sizeof(LbtsMessage),
395  MPI_BYTE,
396  m_pLBTS,
397  sizeof(LbtsMessage),
398  MPI_BYTE,
400  Time smallestTime = m_pLBTS[0].GetSmallestTime();
401  // The totRx and totTx counts insure there are no transient
402  // messages; If totRx != totTx, there are transients,
403  // so we don't update the granted time.
404  uint32_t totRx = m_pLBTS[0].GetRxCount();
405  uint32_t totTx = m_pLBTS[0].GetTxCount();
407 
408  for (uint32_t i = 1; i < m_systemCount; ++i)
409  {
410  if (m_pLBTS[i].GetSmallestTime() < smallestTime)
411  {
412  smallestTime = m_pLBTS[i].GetSmallestTime();
413  }
414  totRx += m_pLBTS[i].GetRxCount();
415  totTx += m_pLBTS[i].GetTxCount();
417  }
418 
419  // Global halting condition is all nodes have empty queue's and
420  // no messages are in-flight.
421  m_globalFinished &= totRx == totTx;
422 
423  if (totRx == totTx)
424  {
425  // If lookahead is infinite then granted time should be as well.
426  // Covers the edge case if all the tasks have no inter tasks
427  // links, prevents overflow of granted time.
429  {
431  }
432  else
433  {
434  // Overflow is possible here if near end of representable time.
435  m_grantedTime = smallestTime + m_lookAhead;
436  }
437  }
438  }
439 
440  // Execute next event if it is within the current time window.
441  // Local task may be completed.
442  if ((nextTime <= m_grantedTime) && (!IsLocalFinished()))
443  { // Safe to process
444  ProcessOneEvent();
445  }
446  }
447 
448  // If the simulator stopped naturally by lack of events, make a
449  // consistency test to check that we didn't lose any events along the way.
450  NS_ASSERT(!m_events->IsEmpty() || m_unscheduledEvents == 0);
451 }
452 
453 uint32_t
455 {
456  return m_myId;
457 }
458 
459 void
461 {
462  NS_LOG_FUNCTION(this);
463 
464  m_stop = true;
465 }
466 
467 EventId
469 {
470  NS_LOG_FUNCTION(this << delay.GetTimeStep());
471 
472  return Simulator::Schedule(delay, &Simulator::Stop);
473 }
474 
475 //
476 // Schedule an event for a _relative_ time in the future.
477 //
478 EventId
480 {
481  NS_LOG_FUNCTION(this << delay.GetTimeStep() << event);
482 
483  Time tAbsolute = delay + TimeStep(m_currentTs);
484 
485  NS_ASSERT(tAbsolute.IsPositive());
486  NS_ASSERT(tAbsolute >= TimeStep(m_currentTs));
487  Scheduler::Event ev;
488  ev.impl = event;
489  ev.key.m_ts = static_cast<uint64_t>(tAbsolute.GetTimeStep());
490  ev.key.m_context = GetContext();
491  ev.key.m_uid = m_uid;
492  m_uid++;
494  m_events->Insert(ev);
495  return EventId(event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
496 }
497 
498 void
499 DistributedSimulatorImpl::ScheduleWithContext(uint32_t context, const Time& delay, EventImpl* event)
500 {
501  NS_LOG_FUNCTION(this << context << delay.GetTimeStep() << m_currentTs << event);
502 
503  Scheduler::Event ev;
504  ev.impl = event;
505  ev.key.m_ts = m_currentTs + delay.GetTimeStep();
506  ev.key.m_context = context;
507  ev.key.m_uid = m_uid;
508  m_uid++;
510  m_events->Insert(ev);
511 }
512 
513 EventId
515 {
516  NS_LOG_FUNCTION(this << event);
517  return Schedule(Time(0), event);
518 }
519 
520 EventId
522 {
523  NS_LOG_FUNCTION(this << event);
524 
525  EventId id(Ptr<EventImpl>(event, false), m_currentTs, 0xffffffff, 2);
526  m_destroyEvents.push_back(id);
527  m_uid++;
528  return id;
529 }
530 
531 Time
533 {
534  return TimeStep(m_currentTs);
535 }
536 
537 Time
539 {
540  if (IsExpired(id))
541  {
542  return TimeStep(0);
543  }
544  else
545  {
546  return TimeStep(id.GetTs() - m_currentTs);
547  }
548 }
549 
550 void
552 {
553  if (id.GetUid() == EventId::UID::DESTROY)
554  {
555  // destroy events.
556  for (auto i = m_destroyEvents.begin(); i != m_destroyEvents.end(); i++)
557  {
558  if (*i == id)
559  {
560  m_destroyEvents.erase(i);
561  break;
562  }
563  }
564  return;
565  }
566  if (IsExpired(id))
567  {
568  return;
569  }
570  Scheduler::Event event;
571  event.impl = id.PeekEventImpl();
572  event.key.m_ts = id.GetTs();
573  event.key.m_context = id.GetContext();
574  event.key.m_uid = id.GetUid();
575  m_events->Remove(event);
576  event.impl->Cancel();
577  // whenever we remove an event from the event list, we have to unref it.
578  event.impl->Unref();
579 
581 }
582 
583 void
585 {
586  if (!IsExpired(id))
587  {
588  id.PeekEventImpl()->Cancel();
589  }
590 }
591 
592 bool
594 {
595  if (id.GetUid() == EventId::UID::DESTROY)
596  {
597  if (id.PeekEventImpl() == nullptr || id.PeekEventImpl()->IsCancelled())
598  {
599  return true;
600  }
601  // destroy events.
602  for (auto i = m_destroyEvents.begin(); i != m_destroyEvents.end(); i++)
603  {
604  if (*i == id)
605  {
606  return false;
607  }
608  }
609  return true;
610  }
611  return id.PeekEventImpl() == nullptr || id.GetTs() < m_currentTs ||
612  (id.GetTs() == m_currentTs && id.GetUid() <= m_currentUid) ||
613  id.PeekEventImpl()->IsCancelled();
614 }
615 
616 Time
618 {
621  return TimeStep(0x7fffffffffffffffLL);
622 }
623 
624 uint32_t
626 {
627  return m_currentContext;
628 }
629 
630 uint64_t
632 {
633  return m_eventCount;
634 }
635 
636 } // namespace ns3
Distributed simulator implementation using lookahead.
EventId Schedule(const Time &delay, EventImpl *event) override
Schedule a future event execution (in the same context).
uint64_t GetEventCount() const override
Get the number of events executed.
void Remove(const EventId &id) override
Remove an event from the event list.
static TypeId GetTypeId()
Register this type.
DestroyEvents m_destroyEvents
The container of events to run at Destroy()
void ScheduleWithContext(uint32_t context, const Time &delay, EventImpl *event) override
Schedule a future event execution (in a different context).
EventId ScheduleNow(EventImpl *event) override
Schedule an event to run at the current virtual time.
uint64_t NextTs() const
Get the timestep of the next event.
EventId ScheduleDestroy(EventImpl *event) override
Schedule an event to run at the end of the simulation, after the Stop() time or condition has been re...
Time m_grantedTime
End of current window.
uint32_t m_currentContext
Execution context of the current event.
Time GetMaximumSimulationTime() const override
Get the maximum representable simulation time.
LbtsMessage * m_pLBTS
Container for Lbts messages, one per rank.
uint64_t m_currentTs
Timestamp of the current event.
uint32_t GetSystemId() const override
Get the system id of this simulator.
void SetScheduler(ObjectFactory schedulerFactory) override
Set the Scheduler to be used to manage the event list.
bool IsFinished() const override
Check if the simulation should finish.
Ptr< Scheduler > m_events
The event priority queue.
Time Next() const
Get the time of the next event, as returned by NextTs().
void CalculateLookAhead()
Calculate lookahead constraint based on network latency.
uint32_t GetContext() const override
Get the current simulation context.
bool m_globalFinished
Are all parallel instances completed.
uint32_t m_uid
Next event unique id.
void Run() override
Run the simulation.
void ProcessOneEvent()
Process the next event.
void Cancel(const EventId &id) override
Set the cancel bit on this event: the event's associated function will not be invoked when it expires...
void Stop() override
Tell the Simulator the calling event should be the last one executed.
int m_unscheduledEvents
Number of events that have been inserted but not yet scheduled, not counting the "destroy" events; th...
void Destroy() override
Execute the events scheduled with ScheduleDestroy().
Time GetDelayLeft(const EventId &id) const override
Get the remaining time until this event will execute.
uint32_t m_systemCount
MPI communicator size.
virtual void BoundLookAhead(const Time lookAhead)
Add additional bound to lookahead constraints.
bool IsExpired(const EventId &id) const override
Check if an event has already run or been cancelled.
Time Now() const override
Return the current simulation virtual time.
bool IsLocalFinished() const
Check if this rank is finished.
bool m_stop
Flag calling for the end of the simulation.
static Time m_lookAhead
Current window size.
uint32_t m_currentUid
Unique id of the current event.
void DoDispose() override
Destructor implementation.
An identifier for simulation events.
Definition: event-id.h:55
A simulation event.
Definition: event-impl.h:46
void Invoke()
Called by the simulation engine to notify the event that it is time to execute.
Definition: event-impl.cc:47
static void ReceiveMessages()
Check for received messages complete.
static void TestSendComplete()
Check for completed sends.
Structure used for all-reduce LBTS computation.
uint32_t m_txCount
Count of transmitted messages.
uint32_t m_rxCount
Count of received messages.
uint32_t m_myId
System Id of the rank sending this LBTS.
Time m_smallestTime
Earliest next event timestamp.
bool m_isFinished
true when this rank has no more events.
static MPI_Comm GetCommunicator()
Return the communicator used to run ns-3.
static void Destroy()
Deletes storage used by the parallel environment.
static uint32_t GetSystemId()
Get the id number of this rank.
static uint32_t GetSize()
Get the number of ranks used by ns-3.
keep track of a set of node pointers.
Iterator End() const
Get an iterator which indicates past-the-last Node in the container.
static NodeContainer GetGlobal()
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
Iterator Begin() const
Get an iterator which refers to the first Node in the container.
uint32_t GetSystemId() const
Definition: node.cc:131
Instantiate subclasses of ns3::Object.
Ptr< Object > Create() const
Create an Object instance of the configured TypeId.
virtual void DoDispose()
Destructor implementation.
Definition: object.cc:352
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:77
Maintain the event list.
Definition: scheduler.h:157
void Unref() const
Decrement the reference count.
static EventId Schedule(const Time &delay, FUNC f, Ts &&... args)
Schedule an event to expire after delay.
Definition: simulator.h:571
@ NO_CONTEXT
Flag for events not associated with any particular context.
Definition: simulator.h:210
static void Stop()
Tell the Simulator the calling event should be the last one executed.
Definition: simulator.cc:186
The SimulatorImpl base class.
virtual void PreEventHook(const EventId &id)
Hook called before processing each event.
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:105
bool IsPositive() const
Exactly equivalent to t >= 0.
Definition: nstime.h:333
int64_t GetInteger() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:455
static Time Max()
Maximum representable Time Not to be confused with Max(Time,Time).
Definition: nstime.h:297
int64_t GetTimeStep() const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:445
Time Get() const
Definition: time.cc:530
a unique identifier for an interface.
Definition: type-id.h:59
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:931
Declaration of classes ns3::LbtsMessage and ns3::DistributedSimulatorImpl.
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
@ INVALID
INVALID.
Definition: aodv-rtable.h:53
@ VALID
VALID.
Definition: aodv-rtable.h:52
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:66
int64x64_t Min(const int64x64_t &a, const int64x64_t &b)
Minimum.
Definition: int64x64.h:229
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_LOG_LOGIC(msg)
Use NS_LOG to output a message of level LOG_LOGIC.
Definition: log.h:282
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition: log.h:261
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:46
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition: nstime.h:1326
Declaration of class ns3::MpiInterface.
void(* Time)(Time oldValue, Time newValue)
TracedValue callback signature for Time.
Definition: nstime.h:839
Every class exported by the ns3 library is enclosed in the ns3 namespace.
channel
Definition: third.py:88
Scheduler event.
Definition: scheduler.h:184
EventKey key
Key for sorting and ordering Events.
Definition: scheduler.h:186
EventImpl * impl
Pointer to the event implementation.
Definition: scheduler.h:185
uint32_t m_context
Event context.
Definition: scheduler.h:173
uint64_t m_ts
Event time stamp.
Definition: scheduler.h:171
uint32_t m_uid
Event unique id.
Definition: scheduler.h:172