ns-3 Direct Code Execution
API
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
unix-stream-socket-fd.cc
Go to the documentation of this file.
2 #include "utils.h"
3 #include "process.h"
4 #include "dce-manager.h"
5 #include "ns3/log.h"
6 #include "ns3/socket.h"
7 #include "ns3/packet.h"
8 #include <errno.h>
9 #include <algorithm>
10 #include "ns3/socket-factory.h"
11 #include <poll.h>
12 #include <fcntl.h>
13 #include "file-usage.h"
14 
15 NS_LOG_COMPONENT_DEFINE ("UnixStreamSocketFd");
16 
17 namespace ns3 {
18 
19 UnixStreamSocketFd::UnixStreamSocketFd (Ptr<Socket> sock, bool connected)
20  : UnixSocketFd (sock),
21  m_backlog (0),
22  m_peerAddress (0),
23  m_shutWrite (0)
24 {
25  NS_LOG_FUNCTION (this << sock);
26  m_socket->SetAcceptCallback (MakeCallback (&UnixStreamSocketFd::ConnectionRequest, this),
27  MakeCallback (&UnixStreamSocketFd::ConnectionCreated, this));
28  m_socket->SetConnectCallback (MakeCallback (&UnixStreamSocketFd::ConnectionSuccess, this),
29  MakeCallback (&UnixStreamSocketFd::ConnectionError, this));
30  m_socket->SetCloseCallbacks (MakeCallback (&UnixStreamSocketFd::CloseSuccess, this),
31  MakeCallback (&UnixStreamSocketFd::CloseError, this));
32  m_state = (connected) ? CONNECTED : CREATED;
33 }
34 
36 {
37  SetPeerAddress (0);
38  ClearSocket ();
39 }
40 
41 ssize_t
42 UnixStreamSocketFd::DoRecvmsg (struct msghdr *msg, int flags)
43 {
44  Thread *current = Current ();
45  NS_LOG_FUNCTION (this << current << msg << flags << m_state);
46  NS_ASSERT (current != 0);
47 
48  if (!isPeekedData ())
49  {
50  if (!WaitRecvDoSignal (flags & MSG_DONTWAIT))
51  {
52  // current->err set by callee
53  return -1;
54  }
55  switch (m_state)
56  {
57  default:
58  case CREATED:
59  {
60  current->err = EINVAL;
61  return -1;
62  }
63 
64  case CONNECTED:
65  break;
66 
67  case CLOSING:
68  return 0;
69 
70  case REMOTECLOSED:
71  case CLOSED:
72  {
73  if (m_socket->GetRxAvailable () <= 0)
74  {
75  return 0;
76  }
77  }
78  break;
79  }
80  }
81 
82  uint32_t totalAvailable = 0;
83  uint32_t count = msg->msg_iov[0].iov_len;
84  uint8_t *buf = (uint8_t *)msg->msg_iov[0].iov_base;
85  size_t toCopy = 0;
86  ssize_t ret = 0;
87  Ptr<Packet> packet = 0;
88 
89  for (uint32_t i = 0; i < msg->msg_iovlen; i++)
90  {
91  totalAvailable += msg->msg_iov[i].iov_len;
92  }
93 
94  if (isPeekedData ())
95  {
96  m_peekedData->CopyData (buf, count);
97  toCopy = m_peekedData->GetSize ();
98 
99  if (toCopy > totalAvailable)
100  {
101  toCopy = totalAvailable;
102  }
103  Ns3AddressToPosixAddress (GetPeekedFrom (), (struct sockaddr*)msg->msg_name, &msg->msg_namelen);
104  }
105  else
106  {
107  Address from;
108  packet = m_socket->RecvFrom (totalAvailable, flags & ~MSG_DONTWAIT & ~MSG_PEEK, from);
109  if (packet == 0)
110  {
111  current->err = ErrnoToSimuErrno ();
112  return -1;
113  }
114  NS_ASSERT (packet->GetSize () <= totalAvailable);
115  packet->CopyData (buf, count);
116  toCopy = packet->GetSize ();
117  Ns3AddressToPosixAddress (from, (struct sockaddr*)msg->msg_name, &msg->msg_namelen);
118  if (flags & MSG_PEEK)
119  {
120  AddPeekedData (buf, toCopy, from);
121  }
122  }
123  for (uint32_t i = 0; i < msg->msg_iovlen && toCopy > 0; i++)
124  {
125  uint32_t len = std::min (toCopy, msg->msg_iov[i].iov_len);
126  memcpy (msg->msg_iov[i].iov_base, buf, len);
127  toCopy -= len;
128  buf += len;
129  ret += len;
130  }
131 
132  if (!(flags & MSG_PEEK) && isPeekedData ())
133  {
134  m_peekedData->RemoveAtStart (ret);
135  if (m_peekedData->GetSize () <= 0)
136  {
137  m_peekedData = 0;
138  }
139  }
140  return ret;
141 }
145 void
146 UnixStreamSocketFd::MainSend (int *r, Ptr<Packet> p)
147 {
148  *r = m_socket->Send (p, 0);
149 }
150 ssize_t
151 UnixStreamSocketFd::DoSendmsg (const struct msghdr *msg, int flags)
152 {
153  Thread *current = Current ();
154  NS_LOG_FUNCTION (this << current << msg << flags);
155  NS_ASSERT (current != 0);
156 
157  if (msg->msg_name != 0 || msg->msg_namelen != 0)
158  {
159  current->err = EISCONN;
160  return -1;
161  }
162  WaitQueueEntryTimeout *wq = 0;
163 
164  ssize_t retval = 0;
165  for (uint32_t i = 0; i < msg->msg_iovlen; ++i)
166  {
167  uint8_t *buf = (uint8_t *)msg->msg_iov[i].iov_base;
168  ssize_t len = msg->msg_iov[i].iov_len;
169  while (len > 0)
170  {
171  while (m_socket->GetTxAvailable () == 0)
172  {
173  if (flags & MSG_DONTWAIT)
174  {
175  if (retval != 0)
176  {
177  // this is a short write
178  RETURNFREE (retval);
179  }
180  else
181  {
182  current->err = EAGAIN;
183  RETURNFREE (-1);
184  }
185  }
186 
187  if (CONNECTED != m_state)
188  {
189  if (retval != 0)
190  {
191  // this is a short write
192  RETURNFREE (retval);
193  }
194  else
195  {
196  current->err = ENOTCONN;
197  RETURNFREE (-1);
198  }
199  }
200 
201  if (m_shutWrite)
202  {
203  current->err = EPIPE;
204  RETURNFREE (-1);
205  }
206 
207  if (!wq)
208  {
209  wq = new WaitQueueEntryTimeout (POLLOUT | POLLHUP, GetSendTimeout ());
210  }
211  AddWaitQueue (wq, true);
212  PollTable::Result res = wq->Wait ();
213  RemoveWaitQueue (wq, true);
214 
215  switch (res)
216  {
217  case PollTable::OK:
218  break;
220  if (retval != 0)
221  {
222  // this is a short write
223  RETURNFREE (retval);
224  }
225  else
226  {
227  UtilsDoSignal ();
228  current->err = EINTR;
229  RETURNFREE (-1);
230  }
231  break;
232  case PollTable::TIMEOUT:
233  if (retval != 0)
234  {
235  // this is a short write
236  RETURNFREE (retval);
237  }
238  else
239  {
240  current->err = EAGAIN;
241  RETURNFREE (-1);
242  }
243  break;
244  }
245  }
246  ssize_t availLen = std::min (len, (ssize_t)m_socket->GetTxAvailable ());
247  Ptr<Packet> packet = Create<Packet> (buf, availLen);
248  TaskManager *manager = TaskManager::Current ();
249  int result = -1;
250 
251  manager->ExecOnMain (MakeEvent (&UnixStreamSocketFd::MainSend, this, &result, packet));
252 
253  if (result == -1)
254  {
255  if (retval != 0)
256  {
257  // this is a short write
258  RETURNFREE (retval);
259  }
260  else
261  {
262  current->err = ErrnoToSimuErrno ();
263  RETURNFREE (-1);
264  }
265  }
266  NS_ASSERT (result == availLen);
267  len -= result;
268  buf += result;
269  retval += result;
270  }
271  }
272  RETURNFREE (retval);
273 }
274 int
276 {
277  Thread *current = Current ();
278  NS_LOG_FUNCTION (this << current << backlog);
279  NS_ASSERT (current != 0);
280 
281  int retval = m_socket->Listen ();
282  if (retval == -1)
283  {
284  current->err = ErrnoToSimuErrno ();
285  return -1;
286  }
287  m_state = LISTENING;
288  m_backlog = backlog;
289  return 0;
290 }
291 int
292 UnixStreamSocketFd::Accept (struct sockaddr *my_addr, socklen_t *addrlen)
293 {
294  Thread *current = Current ();
295  NS_LOG_FUNCTION (this << current << my_addr << addrlen << GetRecvTimeout ());
296  NS_ASSERT (current != 0);
297 
298  WaitQueueEntryTimeout *wq = 0;
299 
300  while (m_connectionQueue.empty ())
301  {
302  if (m_statusFlags & O_NONBLOCK)
303  {
304  current->err = EWOULDBLOCK;
305  return -1;
306  }
307 
308  if (!wq)
309  {
310  wq = new WaitQueueEntryTimeout (POLLIN | POLLHUP, GetRecvTimeout ());
311  }
312  AddWaitQueue (wq, true);
313  NS_LOG_DEBUG ("Accept: waiting ...");
314  PollTable::Result res = wq->Wait ();
315  NS_LOG_DEBUG ("Accept: wait result:" << res);
316  RemoveWaitQueue (wq, true);
317 
318  switch (res)
319  {
320  case PollTable::OK:
321  break;
323  {
324  UtilsDoSignal ();
325  current->err = EINTR;
326  RETURNFREE (-1);
327  }
328  case PollTable::TIMEOUT:
329  {
330  current->err = EAGAIN;
331  RETURNFREE (-1);
332  }
333  }
334  }
335 
336  // create an fd for the socket.
337  int fd = UtilsAllocateFd ();
338  if (fd == -1)
339  {
340  current->err = EMFILE;
341  RETURNFREE (-1);
342  }
343 
344  Ptr<Socket> sock = m_connectionQueue.front ().first;
345  Address ad = m_connectionQueue.front ().second;
346  m_connectionQueue.pop_front ();
347  UnixStreamSocketFd *socket = new UnixStreamSocketFd (sock, true);
348  Ns3AddressToPosixAddress (ad, my_addr, addrlen);
349  socket->SetPeerAddress (new Address (ad));
350  socket->IncFdCount ();
351  current->process->openFiles[fd] = new FileUsage (fd, socket);
352 
353  RETURNFREE (fd);
354 }
355 bool
357 {
358  bool ret = 0;
359  uint32_t rx = 0;
360 
361  if (0 == m_socket)
362  {
363  ret = 0;
364  }
365  else
366  {
367  switch (m_state)
368  {
369  case CREATED:
370  ret = 1;
371  break;
372  case LISTENING:
373  ret = (0 == m_connectionQueue.empty ());
374  break;
375  case CONNECTING:
376  ret = 0;
377  break;
378  case CONNECTED:
379  ret = (m_socket->GetRxAvailable () > 0);
380  break;
381 
382  case CLOSING:
383  case REMOTECLOSED:
384  case CLOSED:
385  ret = 1;
386  break;
387 
388  default:
389  ret = 0;
390  break;
391  }
392  rx = m_socket->GetRxAvailable ();
393  }
394  NS_LOG_FUNCTION (m_socket << m_state << rx << m_connectionQueue.empty () << " ret " << ret);
395 
396  return ret;
397 }
398 bool
400 {
401  return m_socket != 0 && m_socket->GetTxAvailable () != 0;
402 }
403 bool
405 {
406  return false;
407 }
408 bool
409 UnixStreamSocketFd::ConnectionRequest (Ptr<Socket> sock, const Address & from)
410 {
411  NS_LOG_FUNCTION (sock << from);
412 
413  return ((int)m_connectionQueue.size ()) < m_backlog;
414 }
415 void
416 UnixStreamSocketFd::ConnectionCreated (Ptr<Socket> sock, const Address & from)
417 {
418  NS_LOG_FUNCTION (sock << from);
419  NS_ASSERT (((int)m_connectionQueue.size ()) < m_backlog);
420  m_connectionQueue.push_back (std::make_pair (sock, from));
421 
422  int pi = POLLIN;
423  WakeWaiters (&pi);
424 }
425 int
427 {
428  Thread *current = Current ();
429  NS_LOG_FUNCTION (this << current << how);
430  NS_ASSERT (current != 0);
431 
432  int retval;
433  if (how == SHUT_RD)
434  {
435  retval = m_socket->ShutdownRecv ();
436  }
437  else if (how == SHUT_WR)
438  {
439  m_shutWrite = 1;
440  return 0;
441  }
442  else if (how == SHUT_RDWR)
443  {
444  retval = m_socket->ShutdownRecv ();
445  m_shutWrite = 1;
446  }
447  else
448  {
449  current->err = EINVAL;
450  return -1;
451  }
452  if (retval == -1)
453  {
454  current->err = ErrnoToSimuErrno ();
455  return -1;
456  }
457  m_state = CLOSED;
458  return 0;
459 }
460 void
462 {
463  NS_LOG_FUNCTION (this);
464  if (CONNECTING == m_state)
465  {
466  m_state = CONNECTED;
467  Address ad;
468  if (0 == sock->GetSockName (ad))
469  {
470  SetPeerAddress (new Address (ad));
471  }
472  }
473  int pi = POLLIN;
474  WakeWaiters (&pi);
475 }
476 void
478 {
479  NS_LOG_FUNCTION (this);
480  if (CONNECTING == m_state)
481  {
482  m_state = CREATED;
483  }
484  int pi = POLLHUP;
485  WakeWaiters (&pi);
486 }
487 void
489 {
490  NS_LOG_FUNCTION (this << m_state);
491  if (CLOSING == m_state)
492  {
493  m_state = CLOSED;
494  }
495  else
496  {
498  int pi = POLLHUP;
499  WakeWaiters (&pi);
500  }
501  SetPeerAddress (0);
502 }
503 void
505 {
506  NS_LOG_FUNCTION (this << m_state);
507 }
508 
509 int
510 UnixStreamSocketFd::Connect (const struct sockaddr *my_addr, socklen_t addrlen)
511 {
512  Thread *current = Current ();
513  NS_LOG_FUNCTION (this << current);
514  NS_ASSERT (current != 0);
515 
516  if (CONNECTING == m_state)
517  {
518  current->err = EALREADY;
519  return -1;
520  }
521 
522  if (CLOSED == m_state)
523  {
524  TypeId tid = TypeId::LookupByName ("ns3::TcpSocketFactory");
525  Ptr<SocketFactory> factory = current->process->manager->GetObject<SocketFactory> (tid);
526 
527  Ptr<Socket> socket = factory->CreateSocket ();
528 
529  socket->SetAcceptCallback (MakeCallback (&UnixStreamSocketFd::ConnectionRequest, this),
530  MakeCallback (&UnixStreamSocketFd::ConnectionCreated, this));
531  socket->SetConnectCallback (MakeCallback (&UnixStreamSocketFd::ConnectionSuccess, this),
532  MakeCallback (&UnixStreamSocketFd::ConnectionError, this));
533  socket->SetCloseCallbacks (MakeCallback (&UnixStreamSocketFd::CloseSuccess, this),
534  MakeCallback (&UnixStreamSocketFd::CloseError, this));
535 
536  ChangeSocket (socket);
537 
538  m_state = CREATED;
539 
540  }
541 
543 
544  int sup = UnixSocketFd::Connect (my_addr, addrlen);
545 
546  if (0 == sup)
547  {
548  sup = -1;
549  WaitQueueEntryTimeout *wq = new WaitQueueEntryTimeout (POLLIN | POLLHUP, GetRecvTimeout ());
550 
551  while (CONNECTING == m_state)
552  {
553  AddWaitQueue (wq, true);
554  NS_LOG_DEBUG ("Connect: waiting ...");
555  PollTable::Result res = wq->Wait ();
556  NS_LOG_DEBUG ("Connect: wait result:" << res);
557  RemoveWaitQueue (wq, true);
558 
559  switch (res)
560  {
561  case PollTable::OK:
562  break;
564  {
565  UtilsDoSignal ();
566  current->err = EINTR;
567  RETURNFREE (-1);
568  }
569  case PollTable::TIMEOUT:
570  {
571  current->err = EAGAIN;
572  RETURNFREE (-1);
573  }
574  }
575  }
576  delete wq;
577  wq = 0;
578  }
579  if (CONNECTED == m_state)
580  {
581  sup = 0;
582  Address ad = PosixAddressToNs3Address (my_addr, addrlen);
583 
584  SetPeerAddress (new Address (ad));
585  }
586  else
587  {
588  sup = -1;
589 
590  if ((m_state == CLOSED)||(REMOTECLOSED == m_state))
591  {
592  Current ()->err = ECONNREFUSED;
593  }
594  }
595  return sup;
596 }
597 int
598 UnixStreamSocketFd::Getpeername (struct sockaddr *name, socklen_t *namelen)
599 {
600  Thread *current = Current ();
601  NS_LOG_FUNCTION (this << current << name << *namelen);
602  NS_ASSERT (current != 0);
603  if (0 != m_peerAddress)
604  {
605  if (Ns3AddressToPosixAddress (*m_peerAddress, name, namelen) == -1)
606  {
607  current->err = EINVAL;
608  return -1;
609  }
610  return 0;
611  }
612  current->err = ENOTCONN;
613  return -1;
614 }
615 int
617 {
618  int ret = 0;
619 
620  if (CanRecv ())
621  {
622  ret |= POLLIN;
623  }
624  if (CanSend ())
625  {
626  ret |= POLLOUT;
627  }
628  if (HangupReceived ())
629  {
630  ret |= POLLHUP;
631  }
632 
633  if (ptable)
634  {
635  ptable->PollWait (this);
636  }
637 
638  return ret;
639 }
640 void
642 {
643  if (0 != m_peerAddress)
644  {
645  delete (m_peerAddress);
646  m_peerAddress = 0;
647  }
648  m_peerAddress = a;
649 }
650 int
652 {
653  Thread *current = Current ();
654  NS_LOG_FUNCTION (this << current);
655  NS_ASSERT (current != 0);
656 
657  if (CLOSING != m_state)
658  {
659  m_state = CLOSING;
660 
662 
663  return 0;
664  }
665  Current ()->err = EBADF;
666  return -1;
667 }
668 } // namespace ns3