src/event/ngx_event_udp.c - nginx source code

Functions defined

Source code


  1. /*
  2. * Copyright (C) Roman Arutyunyan
  3. * Copyright (C) Nginx, Inc.
  4. */


  5. #include <ngx_config.h>
  6. #include <ngx_core.h>
  7. #include <ngx_event.h>


  8. #if !(NGX_WIN32)

  9. static void ngx_close_accepted_udp_connection(ngx_connection_t *c);
  10. static ssize_t ngx_udp_shared_recv(ngx_connection_t *c, u_char *buf,
  11.     size_t size);
  12. static ngx_int_t ngx_insert_udp_connection(ngx_connection_t *c);
  13. static ngx_connection_t *ngx_lookup_udp_connection(ngx_listening_t *ls,
  14.     struct sockaddr *sockaddr, socklen_t socklen,
  15.     struct sockaddr *local_sockaddr, socklen_t local_socklen);


  16. void
  17. ngx_event_recvmsg(ngx_event_t *ev)
  18. {
  19.     ssize_t            n;
  20.     ngx_buf_t          buf;
  21.     ngx_log_t         *log;
  22.     ngx_err_t          err;
  23.     socklen_t          socklen, local_socklen;
  24.     ngx_event_t       *rev, *wev;
  25.     struct iovec       iov[1];
  26.     struct msghdr      msg;
  27.     ngx_sockaddr_t     sa, lsa;
  28.     struct sockaddr   *sockaddr, *local_sockaddr;
  29.     ngx_listening_t   *ls;
  30.     ngx_event_conf_t  *ecf;
  31.     ngx_connection_t  *c, *lc;
  32.     static u_char      buffer[65535];

  33. #if (NGX_HAVE_ADDRINFO_CMSG)
  34.     u_char             msg_control[CMSG_SPACE(sizeof(ngx_addrinfo_t))];
  35. #endif

  36.     if (ev->timedout) {
  37.         if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
  38.             return;
  39.         }

  40.         ev->timedout = 0;
  41.     }

  42.     ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);

  43.     if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
  44.         ev->available = ecf->multi_accept;
  45.     }

  46.     lc = ev->data;
  47.     ls = lc->listening;
  48.     ev->ready = 0;

  49.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
  50.                    "recvmsg on %V, ready: %d", &ls->addr_text, ev->available);

  51.     do {
  52.         ngx_memzero(&msg, sizeof(struct msghdr));

  53.         iov[0].iov_base = (void *) buffer;
  54.         iov[0].iov_len = sizeof(buffer);

  55.         msg.msg_name = &sa;
  56.         msg.msg_namelen = sizeof(ngx_sockaddr_t);
  57.         msg.msg_iov = iov;
  58.         msg.msg_iovlen = 1;

  59. #if (NGX_HAVE_ADDRINFO_CMSG)
  60.         if (ls->wildcard) {
  61.             msg.msg_control = &msg_control;
  62.             msg.msg_controllen = sizeof(msg_control);

  63.             ngx_memzero(&msg_control, sizeof(msg_control));
  64.         }
  65. #endif

  66.         n = recvmsg(lc->fd, &msg, 0);

  67.         if (n == -1) {
  68.             err = ngx_socket_errno;

  69.             if (err == NGX_EAGAIN) {
  70.                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err,
  71.                                "recvmsg() not ready");
  72.                 return;
  73.             }

  74.             ngx_log_error(NGX_LOG_ALERT, ev->log, err, "recvmsg() failed");

  75.             return;
  76.         }

  77. #if (NGX_HAVE_ADDRINFO_CMSG)
  78.         if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) {
  79.             ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
  80.                           "recvmsg() truncated data");
  81.             continue;
  82.         }
  83. #endif

  84.         sockaddr = msg.msg_name;
  85.         socklen = msg.msg_namelen;

  86.         if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) {
  87.             socklen = sizeof(ngx_sockaddr_t);
  88.         }

  89.         if (socklen == 0) {

  90.             /*
  91.              * on Linux recvmsg() returns zero msg_namelen
  92.              * when receiving packets from unbound AF_UNIX sockets
  93.              */

  94.             socklen = sizeof(struct sockaddr);
  95.             ngx_memzero(&sa, sizeof(struct sockaddr));
  96.             sa.sockaddr.sa_family = ls->sockaddr->sa_family;
  97.         }

  98.         local_sockaddr = ls->sockaddr;
  99.         local_socklen = ls->socklen;

  100. #if (NGX_HAVE_ADDRINFO_CMSG)

  101.         if (ls->wildcard) {
  102.             struct cmsghdr  *cmsg;

  103.             ngx_memcpy(&lsa, local_sockaddr, local_socklen);
  104.             local_sockaddr = &lsa.sockaddr;

  105.             for (cmsg = CMSG_FIRSTHDR(&msg);
  106.                  cmsg != NULL;
  107.                  cmsg = CMSG_NXTHDR(&msg, cmsg))
  108.             {
  109.                 if (ngx_get_srcaddr_cmsg(cmsg, local_sockaddr) == NGX_OK) {
  110.                     break;
  111.                 }
  112.             }
  113.         }

  114. #endif

  115.         c = ngx_lookup_udp_connection(ls, sockaddr, socklen, local_sockaddr,
  116.                                       local_socklen);

  117.         if (c) {

  118. #if (NGX_DEBUG)
  119.             if (c->log->log_level & NGX_LOG_DEBUG_EVENT) {
  120.                 ngx_log_handler_pt  handler;

  121.                 handler = c->log->handler;
  122.                 c->log->handler = NULL;

  123.                 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
  124.                                "recvmsg: fd:%d n:%z", c->fd, n);

  125.                 c->log->handler = handler;
  126.             }
  127. #endif

  128.             ngx_memzero(&buf, sizeof(ngx_buf_t));

  129.             buf.pos = buffer;
  130.             buf.last = buffer + n;

  131.             rev = c->read;

  132.             c->udp->buffer = &buf;

  133.             rev->ready = 1;
  134.             rev->active = 0;

  135.             rev->handler(rev);

  136.             if (c->udp) {
  137.                 c->udp->buffer = NULL;
  138.             }

  139.             rev->ready = 0;
  140.             rev->active = 1;

  141.             goto next;
  142.         }

  143. #if (NGX_STAT_STUB)
  144.         (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1);
  145. #endif

  146.         ngx_accept_disabled = ngx_cycle->connection_n / 8
  147.                               - ngx_cycle->free_connection_n;

  148.         c = ngx_get_connection(lc->fd, ev->log);
  149.         if (c == NULL) {
  150.             return;
  151.         }

  152.         c->shared = 1;
  153.         c->type = SOCK_DGRAM;
  154.         c->socklen = socklen;

  155. #if (NGX_STAT_STUB)
  156.         (void) ngx_atomic_fetch_add(ngx_stat_active, 1);
  157. #endif

  158.         c->pool = ngx_create_pool(ls->pool_size, ev->log);
  159.         if (c->pool == NULL) {
  160.             ngx_close_accepted_udp_connection(c);
  161.             return;
  162.         }

  163.         c->sockaddr = ngx_palloc(c->pool, socklen);
  164.         if (c->sockaddr == NULL) {
  165.             ngx_close_accepted_udp_connection(c);
  166.             return;
  167.         }

  168.         ngx_memcpy(c->sockaddr, sockaddr, socklen);

  169.         log = ngx_palloc(c->pool, sizeof(ngx_log_t));
  170.         if (log == NULL) {
  171.             ngx_close_accepted_udp_connection(c);
  172.             return;
  173.         }

  174.         *log = ls->log;

  175.         c->recv = ngx_udp_shared_recv;
  176.         c->send = ngx_udp_send;
  177.         c->send_chain = ngx_udp_send_chain;

  178.         c->need_flush_buf = 1;

  179.         c->log = log;
  180.         c->pool->log = log;
  181.         c->listening = ls;

  182.         if (local_sockaddr == &lsa.sockaddr) {
  183.             local_sockaddr = ngx_palloc(c->pool, local_socklen);
  184.             if (local_sockaddr == NULL) {
  185.                 ngx_close_accepted_udp_connection(c);
  186.                 return;
  187.             }

  188.             ngx_memcpy(local_sockaddr, &lsa, local_socklen);
  189.         }

  190.         c->local_sockaddr = local_sockaddr;
  191.         c->local_socklen = local_socklen;

  192.         c->buffer = ngx_create_temp_buf(c->pool, n);
  193.         if (c->buffer == NULL) {
  194.             ngx_close_accepted_udp_connection(c);
  195.             return;
  196.         }

  197.         c->buffer->last = ngx_cpymem(c->buffer->last, buffer, n);

  198.         rev = c->read;
  199.         wev = c->write;

  200.         rev->active = 1;
  201.         wev->ready = 1;

  202.         rev->log = log;
  203.         wev->log = log;

  204.         /*
  205.          * TODO: MT: - ngx_atomic_fetch_add()
  206.          *             or protection by critical section or light mutex
  207.          *
  208.          * TODO: MP: - allocated in a shared memory
  209.          *           - ngx_atomic_fetch_add()
  210.          *             or protection by critical section or light mutex
  211.          */

  212.         c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);

  213.         c->start_time = ngx_current_msec;

  214. #if (NGX_STAT_STUB)
  215.         (void) ngx_atomic_fetch_add(ngx_stat_handled, 1);
  216. #endif

  217.         if (ls->addr_ntop) {
  218.             c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
  219.             if (c->addr_text.data == NULL) {
  220.                 ngx_close_accepted_udp_connection(c);
  221.                 return;
  222.             }

  223.             c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
  224.                                              c->addr_text.data,
  225.                                              ls->addr_text_max_len, 0);
  226.             if (c->addr_text.len == 0) {
  227.                 ngx_close_accepted_udp_connection(c);
  228.                 return;
  229.             }
  230.         }

  231. #if (NGX_DEBUG)
  232.         {
  233.         ngx_str_t  addr;
  234.         u_char     text[NGX_SOCKADDR_STRLEN];

  235.         ngx_debug_accepted_connection(ecf, c);

  236.         if (log->log_level & NGX_LOG_DEBUG_EVENT) {
  237.             addr.data = text;
  238.             addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
  239.                                      NGX_SOCKADDR_STRLEN, 1);

  240.             ngx_log_debug4(NGX_LOG_DEBUG_EVENT, log, 0,
  241.                            "*%uA recvmsg: %V fd:%d n:%z",
  242.                            c->number, &addr, c->fd, n);
  243.         }

  244.         }
  245. #endif

  246.         if (ngx_insert_udp_connection(c) != NGX_OK) {
  247.             ngx_close_accepted_udp_connection(c);
  248.             return;
  249.         }

  250.         log->data = NULL;
  251.         log->handler = NULL;

  252.         ls->handler(c);

  253.     next:

  254.         if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
  255.             ev->available -= n;
  256.         }

  257.     } while (ev->available);
  258. }


  259. static void
  260. ngx_close_accepted_udp_connection(ngx_connection_t *c)
  261. {
  262.     ngx_free_connection(c);

  263.     c->fd = (ngx_socket_t) -1;

  264.     if (c->pool) {
  265.         ngx_destroy_pool(c->pool);
  266.     }

  267. #if (NGX_STAT_STUB)
  268.     (void) ngx_atomic_fetch_add(ngx_stat_active, -1);
  269. #endif
  270. }


  271. static ssize_t
  272. ngx_udp_shared_recv(ngx_connection_t *c, u_char *buf, size_t size)
  273. {
  274.     ssize_t     n;
  275.     ngx_buf_t  *b;

  276.     if (c->udp == NULL || c->udp->buffer == NULL) {
  277.         return NGX_AGAIN;
  278.     }

  279.     b = c->udp->buffer;

  280.     n = ngx_min(b->last - b->pos, (ssize_t) size);

  281.     ngx_memcpy(buf, b->pos, n);

  282.     c->udp->buffer = NULL;

  283.     c->read->ready = 0;
  284.     c->read->active = 1;

  285.     return n;
  286. }


  287. void
  288. ngx_udp_rbtree_insert_value(ngx_rbtree_node_t *temp,
  289.     ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
  290. {
  291.     ngx_int_t               rc;
  292.     ngx_connection_t       *c, *ct;
  293.     ngx_rbtree_node_t     **p;
  294.     ngx_udp_connection_t   *udp, *udpt;

  295.     for ( ;; ) {

  296.         if (node->key < temp->key) {

  297.             p = &temp->left;

  298.         } else if (node->key > temp->key) {

  299.             p = &temp->right;

  300.         } else { /* node->key == temp->key */

  301.             udp = (ngx_udp_connection_t *) node;
  302.             c = udp->connection;

  303.             udpt = (ngx_udp_connection_t *) temp;
  304.             ct = udpt->connection;

  305.             rc = ngx_memn2cmp(udp->key.data, udpt->key.data,
  306.                               udp->key.len, udpt->key.len);

  307.             if (rc == 0 && c->listening->wildcard) {
  308.                 rc = ngx_cmp_sockaddr(c->local_sockaddr, c->local_socklen,
  309.                                       ct->local_sockaddr, ct->local_socklen, 1);
  310.             }

  311.             p = (rc < 0) ? &temp->left : &temp->right;
  312.         }

  313.         if (*p == sentinel) {
  314.             break;
  315.         }

  316.         temp = *p;
  317.     }

  318.     *p = node;
  319.     node->parent = temp;
  320.     node->left = sentinel;
  321.     node->right = sentinel;
  322.     ngx_rbt_red(node);
  323. }


  324. static ngx_int_t
  325. ngx_insert_udp_connection(ngx_connection_t *c)
  326. {
  327.     uint32_t               hash;
  328.     ngx_pool_cleanup_t    *cln;
  329.     ngx_udp_connection_t  *udp;

  330.     if (c->udp) {
  331.         return NGX_OK;
  332.     }

  333.     udp = ngx_pcalloc(c->pool, sizeof(ngx_udp_connection_t));
  334.     if (udp == NULL) {
  335.         return NGX_ERROR;
  336.     }

  337.     udp->connection = c;

  338.     ngx_crc32_init(hash);
  339.     ngx_crc32_update(&hash, (u_char *) c->sockaddr, c->socklen);

  340.     if (c->listening->wildcard) {
  341.         ngx_crc32_update(&hash, (u_char *) c->local_sockaddr, c->local_socklen);
  342.     }

  343.     ngx_crc32_final(hash);

  344.     udp->node.key = hash;
  345.     udp->key.data = (u_char *) c->sockaddr;
  346.     udp->key.len = c->socklen;

  347.     cln = ngx_pool_cleanup_add(c->pool, 0);
  348.     if (cln == NULL) {
  349.         return NGX_ERROR;
  350.     }

  351.     cln->data = c;
  352.     cln->handler = ngx_delete_udp_connection;

  353.     ngx_rbtree_insert(&c->listening->rbtree, &udp->node);

  354.     c->udp = udp;

  355.     return NGX_OK;
  356. }


  357. void
  358. ngx_delete_udp_connection(void *data)
  359. {
  360.     ngx_connection_t  *c = data;

  361.     if (c->udp == NULL) {
  362.         return;
  363.     }

  364.     ngx_rbtree_delete(&c->listening->rbtree, &c->udp->node);

  365.     c->udp = NULL;
  366. }


  367. static ngx_connection_t *
  368. ngx_lookup_udp_connection(ngx_listening_t *ls, struct sockaddr *sockaddr,
  369.     socklen_t socklen, struct sockaddr *local_sockaddr, socklen_t local_socklen)
  370. {
  371.     uint32_t               hash;
  372.     ngx_int_t              rc;
  373.     ngx_connection_t      *c;
  374.     ngx_rbtree_node_t     *node, *sentinel;
  375.     ngx_udp_connection_t  *udp;

  376. #if (NGX_HAVE_UNIX_DOMAIN)

  377.     if (sockaddr->sa_family == AF_UNIX) {
  378.         struct sockaddr_un *saun = (struct sockaddr_un *) sockaddr;

  379.         if (socklen <= (socklen_t) offsetof(struct sockaddr_un, sun_path)
  380.             || saun->sun_path[0] == '\0')
  381.         {
  382.             ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0,
  383.                            "unbound unix socket");
  384.             return NULL;
  385.         }
  386.     }

  387. #endif

  388.     node = ls->rbtree.root;
  389.     sentinel = ls->rbtree.sentinel;

  390.     ngx_crc32_init(hash);
  391.     ngx_crc32_update(&hash, (u_char *) sockaddr, socklen);

  392.     if (ls->wildcard) {
  393.         ngx_crc32_update(&hash, (u_char *) local_sockaddr, local_socklen);
  394.     }

  395.     ngx_crc32_final(hash);

  396.     while (node != sentinel) {

  397.         if (hash < node->key) {
  398.             node = node->left;
  399.             continue;
  400.         }

  401.         if (hash > node->key) {
  402.             node = node->right;
  403.             continue;
  404.         }

  405.         /* hash == node->key */

  406.         udp = (ngx_udp_connection_t *) node;

  407.         c = udp->connection;

  408.         rc = ngx_cmp_sockaddr(sockaddr, socklen,
  409.                               c->sockaddr, c->socklen, 1);

  410.         if (rc == 0 && ls->wildcard) {
  411.             rc = ngx_cmp_sockaddr(local_sockaddr, local_socklen,
  412.                                   c->local_sockaddr, c->local_socklen, 1);
  413.         }

  414.         if (rc == 0) {
  415.             return c;
  416.         }

  417.         node = (rc < 0) ? node->left : node->right;
  418.     }

  419.     return NULL;
  420. }

  421. #else

  422. void
  423. ngx_delete_udp_connection(void *data)
  424. {
  425.     return;
  426. }

  427. #endif