ns-3 Direct Code Execution
API
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pthread-fiber-manager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008 Sam Jansen, INRIA
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License version 2 as
6  * published by the Free Software Foundation;
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software
15  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16  *
17  * Authors: Sam Jansen <sam.jansen@gmail.com>
18  * Mathieu Lacage <mathieu.lacage@inria.fr>
19  */
20 #include "pthread-fiber-manager.h"
21 #include "ns3/assert.h"
22 #include "ns3/log.h"
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <limits.h>
27 #include <stdio.h>
28 #include <setjmp.h>
29 #include <algorithm>
30 #include <unistd.h>
31 #include <signal.h>
32 
33 #ifdef HAVE_VALGRIND_H
34 # include "valgrind/valgrind.h"
35 # include "valgrind/memcheck.h"
36 #else
37 # define VALGRIND_STACK_REGISTER(start,end) (0)
38 # define VALGRIND_STACK_DEREGISTER(id)
39 # define VALGRIND_MAKE_MEM_DEFINED(start,sz)
40 #endif
41 
42 
43 namespace ns3 {
44 
45 NS_LOG_COMPONENT_DEFINE ("PthreadFiberManager");
46 
48 {
52 };
53 
55 {
56 public:
58  : m_min (~0),
59  m_max (0)
60  {
61  }
62  void AddBound (void *address)
63  {
64  unsigned long v = (unsigned long) address;
65  m_min = std::min (v, m_min);
66  m_max = std::max (v, m_max);
67  }
68  void * GetStart (void) const
69  {
70  int size = getpagesize ();
71  long start = m_min - (m_min % size);
72  return (void*)start;
73  }
74  size_t GetSize (void) const
75  {
76  int size = getpagesize ();
77  unsigned long start = m_min - (m_min % size);
78  unsigned long end = ((m_max % size) == 0) ? m_max : (m_max + (size - (m_max % size)));
79  return end - start;
80  }
81 private:
82  unsigned long m_min;
83  unsigned long m_max;
84 };
85 
86 
87 struct PthreadFiberThread;
88 class StackTrampoline;
89 
90 struct PthreadFiber : public Fiber
91 {
94  void *stack_copy;
95  jmp_buf yield_env;
98 };
99 
101 {
102  pthread_t thread;
103  pthread_mutex_t mutex;
104  pthread_cond_t condvar;
105  uint32_t refcount;
107  jmp_buf initial_env;
108  void (*func)(void *);
109  void *context;
110  size_t stack_size;
115 };
116 
118 {
119 public:
121  {
122  int status = 0;
123  status = pthread_mutex_lock (&g_mutex);
124  g_current = this;
125  NS_ASSERT_MSG (status == 0, "lock failed");
126  m_stack = malloc (SIGSTKSZ);
127  stack_t newstack;
128  stack_t oldstack;
129  newstack.ss_sp = m_stack;
130  newstack.ss_size = SIGSTKSZ;
131  newstack.ss_flags = 0;
132  m_vgId = VALGRIND_STACK_REGISTER (m_stack,((unsigned long)m_stack) + SIGSTKSZ);
133  status = sigaltstack (&newstack, &oldstack);
134  NS_ASSERT_MSG (status == 0, "first sigaltstack failed stack=" << m_stack <<
135  " stacksize=" << SIGSTKSZ);
136  struct sigaction newact;
137  struct sigaction oldact;
138  newact.sa_handler = &StackTrampoline::SignalHandler;
139  newact.sa_flags = SA_ONSTACK | SA_RESETHAND;
140  sigemptyset (&newact.sa_mask);
141  status = sigaction (SIGUSR1, &newact, &oldact);
142  NS_ASSERT_MSG (status == 0, "first sigaction failed");
143  status = raise (SIGUSR1);
144  NS_ASSERT_MSG (status == 0, "raise failed");
145  status = sigaltstack (&oldstack, 0);
146  NS_ASSERT_MSG (status == 0, "second sigaltstack failed");
147  g_current = 0;
148  status = pthread_mutex_unlock (&g_mutex);
149  NS_ASSERT_MSG (status == 0, "unlock failed");
150  }
152  {
154  free (m_stack);
155  }
156 
157  void Jump (PthreadFiberThread *thread)
158  {
159  NS_LOG_FUNCTION (this << thread);
160  m_jumpTarget = thread;
161  struct PthreadFiber *next = thread->next;
162  struct PthreadFiber *previous = thread->previous;
163  // this is a CPU optimization: we make sure that
164  // there is indeed data to save and restore before
165  // switching to a temporary stack to do save/restore.
166  if (previous != next)
167  {
169  longjmp (m_buf, 1);
170  }
171  }
172 
173 private:
174  // This is the handler that gets invoked the first time
175  // we trigger the SIGUSR1 signal. The first time this function
176  // enters, we return immediately (setjmp returns 0)
177  // Every other time that setjmp returns because someone called
178  // longjmp in StackTrampoline::Jump, we call instead DoWork
179  // to save the current stack frame and restore the stack of the
180  // thread that is going to be executed.
181  static void SignalHandler (int signo)
182  {
183  StackTrampoline *self = g_current;
184  if (setjmp (self->m_buf) == 0)
185  {
186  return;
187  }
188  VALGRIND_MAKE_MEM_DEFINED (self->m_stack, SIGSTKSZ);
189  DoWork (self->m_jumpTarget);
190  }
191  static void DoWork (PthreadFiberThread *thread)
192  {
193  NS_LOG_DEBUG ("alternate trampoline stack");
194  struct PthreadFiber *next = thread->next;
195  struct PthreadFiber *previous = thread->previous;
196  if (previous != next)
197  {
198  if (previous != 0)
199  {
200  // first, we save the stack of previous.
201  if (previous->stack_copy == 0)
202  {
203  previous->stack_copy = malloc (previous->thread->stack_size);
204  }
206  previous->stack_bounds.GetSize ());
207  NS_LOG_DEBUG ("save start=" << previous->stack_bounds.GetStart () <<
208  " size=" << previous->stack_bounds.GetSize ());
209  memcpy (previous->stack_copy,
210  previous->stack_bounds.GetStart (),
211  previous->stack_bounds.GetSize ());
212  }
213  // then, we restore the stack of next
215  next->stack_bounds.GetSize ());
216  NS_LOG_DEBUG ("restore start=" << next->stack_bounds.GetStart () <<
217  " size=" << next->stack_bounds.GetSize ());
218  memcpy (next->stack_bounds.GetStart (),
219  next->stack_copy,
220  next->stack_bounds.GetSize ());
221  }
222  // Finally, we can go back to the thread's last suspend point
223  // which was either in Yield or in Clone.
224  longjmp (next->yield_env, 1);
225  }
226  void *m_stack;
227  jmp_buf m_buf;
229  unsigned int m_vgId;
230  static pthread_mutex_t g_mutex;
232 };
233 
234 pthread_mutex_t StackTrampoline::g_mutex = PTHREAD_MUTEX_INITIALIZER;
236 
237 
238 
239 
241  : m_notifySwitch (0)
242 {
243  NS_LOG_FUNCTION (this);
244  m_trampoline = new StackTrampoline ();
245 }
247 {
248  NS_LOG_FUNCTION (this);
249  delete m_trampoline;
250 }
251 
252 struct Fiber *
254 {
255  NS_LOG_FUNCTION (this << fib);
256  struct PthreadFiber *fiber = (struct PthreadFiber *)fib;
257  // check that this is not the main thread that
258  // we are cloning.
259  NS_ASSERT (fiber->thread->stack_size != 0);
260  struct PthreadFiber *clone = new PthreadFiber ();
261  clone->thread = fiber->thread;
262  clone->thread->refcount++;
263  clone->state = SLEEP;
264  clone->stack_bounds = clone->thread->stack_bounds;
265  clone->stack_bounds.AddBound (__builtin_frame_address (0));
267 
268  clone->stack_copy = malloc (fiber->thread->stack_size);
269  // save current stack so that the next call to SwitchTo
270  // on the clone restores the stack before jumping back
271  // to the below.
272  {
273  void *src = clone->stack_bounds.GetStart ();
274  void *dst = clone->stack_copy;
275  size_t sz = clone->stack_bounds.GetSize ();
276  VALGRIND_MAKE_MEM_DEFINED (src, sz);
277  NS_LOG_DEBUG ("save start=" << clone->stack_bounds.GetStart () <<
278  " size=" << clone->stack_bounds.GetSize ());
279 
280 // dst = clone->stack_copy = malloc (sz);
281 
282  memcpy (dst, src, sz);
283 
284 // NS_ASSERT (sz >= fiber->thread->stack_size);
285 
286  }
287  // save the current state in jmp_buf so that the next call to
288  // SwitchTo on the clone comes back here.
289  if (setjmp (clone->yield_env) == 0)
290  {
291  // returning directly. parent
292  NS_LOG_DEBUG ("created clone " << clone);
293  return clone;
294  }
295  else
296  {
297  // child
298  NS_LOG_DEBUG ("returning from clone " << clone);
299  return 0;
300  }
301 }
302 
303 void *
305 {
306  return __builtin_frame_address (0);
307 }
308 
309 void
311 {
312  int error;
313  pthread_attr_t attr;
314  error = pthread_attr_init (&attr);
315  NS_ASSERT_MSG (error == 0, "error=" << strerror (error));
316  error = pthread_attr_setstacksize (&attr, std::max (fiber->thread->stack_size,
317  (size_t)PTHREAD_STACK_MIN));
318  NS_ASSERT_MSG (error == 0, "error=" << strerror (error));
319  error = pthread_create (&fiber->thread->thread, &attr, &PthreadFiberManager::Run,
320  (void*) fiber);
321  NS_ASSERT_MSG (error == 0, "error=" << strerror (error));
322  error = pthread_attr_destroy (&attr);
323  NS_ASSERT_MSG (error == 0, "error=" << strerror (error));
324  fiber->thread->thread_started = true;
325 }
326 
327 void
329 {
330  NS_LOG_FUNCTION (this << fiber);
331  pthread_mutex_lock (&fiber->thread->mutex);
332  fiber->state = RUNNING;
333  fiber->thread->next = fiber;
334  if (fiber->thread->thread_started)
335  {
336  // and now we can wakup the target thread. yay !
337  pthread_cond_signal (&fiber->thread->condvar);
338  }
339  else
340  {
341  Start (fiber);
342  }
343 
344  while (fiber->state == RUNNING)
345  {
346  // and wait until the target thread completes.
347  pthread_cond_wait (&fiber->thread->condvar, &fiber->thread->mutex);
348  }
349 
350  pthread_mutex_unlock (&fiber->thread->mutex);
351 }
352 
353 void
355 {
356  NS_LOG_FUNCTION (this << fiber);
357  fiber->state = SLEEP;
358  pthread_cond_signal (&fiber->thread->condvar);
359  while (fiber->state != RUNNING)
360  {
361  fiber->thread->previous = fiber;
362  if (fiber->state == DESTROY)
363  {
364  pthread_mutex_unlock (&fiber->thread->mutex);
365  // now, we jump back to the creator of the thread
366  // i.e., we unwind the stack without invoking the
367  // destructors of its local variables
368  longjmp (fiber->thread->initial_env, 1);
369  }
370  else
371  {
372  fiber->stack_bounds = fiber->thread->stack_bounds;
373  fiber->stack_bounds.AddBound (__builtin_frame_address (0));
375  if (setjmp (fiber->yield_env) == 0)
376  {
377  // force the thread variable to be stored on the stack.
378  volatile PthreadFiberThread *thread = fiber->thread;
379  NS_LOG_DEBUG ("Yield after setjmp before wait");
380  // wait for the master thread to re-schedule us.
381  pthread_cond_wait (&((PthreadFiberThread *)thread)->condvar,
382  &((PthreadFiberThread *)thread)->mutex);
383  NS_LOG_DEBUG ("Yield after wait");
384  // finally, jump back where we want to go within this thread
385  ((PthreadFiberThread *)thread)->trampoline->Jump ((PthreadFiberThread *)thread);
386  }
387  NS_LOG_DEBUG ("Yield after setjmp");
388  }
389  }
390 }
391 
392 void *
394 {
395  struct PthreadFiber *fiber = (struct PthreadFiber *) arg;
396  struct PthreadFiberThread *thread = (struct PthreadFiberThread *) fiber->thread;
397  thread->stack_bounds.AddBound (__builtin_frame_address (0));
399  pthread_mutex_lock (&thread->mutex);
400  if (setjmp (thread->initial_env) == 0)
401  {
402  thread->func (thread->context);
403  fiber->state = DESTROY;
404  pthread_cond_signal (&thread->condvar);
405  pthread_mutex_unlock (&thread->mutex);
406  pthread_detach (thread->thread);
407  }
408  else
409  {
410  // oops, we are returning from a Delete
411  // we can easily return and we are done !
412  }
413  return 0;
414 }
415 
416 struct Fiber *
417 PthreadFiberManager::Create (void (*callback)(void *),
418  void *context,
419  uint32_t stackSize)
420 {
421  struct PthreadFiber *fiber = (struct PthreadFiber *)CreateFromCaller ();
422  fiber->thread->func = callback;
423  fiber->thread->context = context;
424  fiber->thread->thread_started = false;
425  fiber->thread->stack_size = stackSize;
426  fiber->state = SLEEP;
427  return fiber;
428 }
429 struct Fiber *
431 {
432  struct PthreadFiberThread *thread = new PthreadFiberThread ();
433  thread->trampoline = m_trampoline;
434  thread->refcount = 1;
435  thread->thread_started = true;
436  thread->func = NULL;
437  thread->stack_size = 0;
438  thread->previous = 0;
439  pthread_mutex_init (&thread->mutex, NULL);
440  pthread_cond_init (&thread->condvar, NULL);
441  struct PthreadFiber *fiber = new PthreadFiber ();
442  fiber->thread = thread;
443  fiber->state = RUNNING;
444  fiber->stack_copy = 0;
445  return fiber;
446 }
447 void
449 {
450  struct PthreadFiber *fiber = (struct PthreadFiber *)fib;
451  fiber->thread->refcount--;
452  if (fiber->thread->refcount == 0)
453  {
454  if (fiber->thread->func != 0)
455  {
456  pthread_mutex_lock (&fiber->thread->mutex);
457  if (fiber->state != DESTROY && fiber->thread->thread_started)
458  {
459  fiber->state = DESTROY;
460  pthread_cond_signal (&fiber->thread->condvar);
461  pthread_mutex_unlock (&fiber->thread->mutex);
462  pthread_join (fiber->thread->thread, 0);
463  }
464  else
465  {
466  pthread_mutex_unlock (&fiber->thread->mutex);
467  }
468  }
469  int status = pthread_mutex_destroy (&fiber->thread->mutex);
470  NS_ASSERT (status == 0);
471  status = pthread_cond_destroy (&fiber->thread->condvar);
472  NS_ASSERT (status == 0);
473  delete fiber->thread;
474  }
475  else if (fiber->thread->previous == fiber)
476  {
477  fiber->thread->previous = 0;
478  }
479  if (fiber->stack_copy != 0)
480  {
481  free (fiber->stack_copy);
482  }
483  delete fiber;
484 }
485 void
487  const struct Fiber *toFiber)
488 {
489  struct PthreadFiber *from = (struct PthreadFiber *)fromFiber;
490  struct PthreadFiber *to = (struct PthreadFiber *)toFiber;
491  if (from->thread->func != NULL)
492  {
493  // We're in an application thread, and we know our mutexes are locked
494  Yield (from);
495  }
496  else
497  {
498  // We're the controller (main) thread
499  Wakeup (to);
500  }
501  if (m_notifySwitch != 0)
502  {
503  m_notifySwitch ();
504  }
505 }
506 uint32_t
508 {
509  struct PthreadFiber *fiber = (struct PthreadFiber *)fib;
510  return fiber->thread->stack_size;
511 }
512 void
514 {
515  m_notifySwitch = fn;
516 }
517 
518 
519 } // namespace ns3