src/stream/ngx_stream_handler.c - nginx source code

Functions defined

Source code


  1. /*
  2. * Copyright (C) Roman Arutyunyan
  3. * Copyright (C) Nginx, Inc.
  4. */


  5. #include <ngx_config.h>
  6. #include <ngx_core.h>
  7. #include <ngx_event.h>
  8. #include <ngx_stream.h>


  9. static void ngx_stream_log_session(ngx_stream_session_t *s);
  10. static void ngx_stream_close_connection(ngx_connection_t *c);
  11. static u_char *ngx_stream_log_error(ngx_log_t *log, u_char *buf, size_t len);
  12. static void ngx_stream_proxy_protocol_handler(ngx_event_t *rev);


  13. void
  14. ngx_stream_init_connection(ngx_connection_t *c)
  15. {
  16.     u_char                        text[NGX_SOCKADDR_STRLEN];
  17.     size_t                        len;
  18.     ngx_uint_t                    i;
  19.     ngx_time_t                   *tp;
  20.     ngx_event_t                  *rev;
  21.     struct sockaddr              *sa;
  22.     ngx_stream_port_t            *port;
  23.     struct sockaddr_in           *sin;
  24.     ngx_stream_in_addr_t         *addr;
  25.     ngx_stream_session_t         *s;
  26.     ngx_stream_conf_ctx_t        *ctx;
  27.     ngx_stream_addr_conf_t       *addr_conf;
  28. #if (NGX_HAVE_INET6)
  29.     struct sockaddr_in6          *sin6;
  30.     ngx_stream_in6_addr_t        *addr6;
  31. #endif
  32.     ngx_stream_core_srv_conf_t   *cscf;
  33.     ngx_stream_core_main_conf_t  *cmcf;

  34.     /* find the server configuration for the address:port */

  35.     port = c->listening->servers;

  36.     if (port->naddrs > 1) {

  37.         /*
  38.          * There are several addresses on this port and one of them
  39.          * is the "*:port" wildcard so getsockname() is needed to determine
  40.          * the server address.
  41.          *
  42.          * AcceptEx() and recvmsg() already gave this address.
  43.          */

  44.         if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {
  45.             ngx_stream_close_connection(c);
  46.             return;
  47.         }

  48.         sa = c->local_sockaddr;

  49.         switch (sa->sa_family) {

  50. #if (NGX_HAVE_INET6)
  51.         case AF_INET6:
  52.             sin6 = (struct sockaddr_in6 *) sa;

  53.             addr6 = port->addrs;

  54.             /* the last address is "*" */

  55.             for (i = 0; i < port->naddrs - 1; i++) {
  56.                 if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) {
  57.                     break;
  58.                 }
  59.             }

  60.             addr_conf = &addr6[i].conf;

  61.             break;
  62. #endif

  63.         default: /* AF_INET */
  64.             sin = (struct sockaddr_in *) sa;

  65.             addr = port->addrs;

  66.             /* the last address is "*" */

  67.             for (i = 0; i < port->naddrs - 1; i++) {
  68.                 if (addr[i].addr == sin->sin_addr.s_addr) {
  69.                     break;
  70.                 }
  71.             }

  72.             addr_conf = &addr[i].conf;

  73.             break;
  74.         }

  75.     } else {
  76.         switch (c->local_sockaddr->sa_family) {

  77. #if (NGX_HAVE_INET6)
  78.         case AF_INET6:
  79.             addr6 = port->addrs;
  80.             addr_conf = &addr6[0].conf;
  81.             break;
  82. #endif

  83.         default: /* AF_INET */
  84.             addr = port->addrs;
  85.             addr_conf = &addr[0].conf;
  86.             break;
  87.         }
  88.     }

  89.     s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t));
  90.     if (s == NULL) {
  91.         ngx_stream_close_connection(c);
  92.         return;
  93.     }

  94.     ctx = addr_conf->default_server->ctx;

  95.     s->signature = NGX_STREAM_MODULE;
  96.     s->main_conf = ctx->main_conf;
  97.     s->srv_conf = ctx->srv_conf;
  98.     s->virtual_names = addr_conf->virtual_names;

  99. #if (NGX_STREAM_SSL)
  100.     s->ssl = addr_conf->ssl;
  101. #endif

  102.     if (c->buffer) {
  103.         s->received += c->buffer->last - c->buffer->pos;
  104.     }

  105.     s->connection = c;
  106.     c->data = s;

  107.     cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);

  108.     ngx_set_connection_log(c, cscf->error_log);

  109.     len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1);

  110.     ngx_log_error(NGX_LOG_INFO, c->log, 0, "*%uA %sclient %*s connected to %V",
  111.                   c->number, c->type == SOCK_DGRAM ? "udp " : "",
  112.                   len, text, &c->listening->addr_text);

  113.     c->log->connection = c->number;
  114.     c->log->handler = ngx_stream_log_error;
  115.     c->log->data = s;
  116.     c->log->action = "initializing session";
  117.     c->log_error = NGX_ERROR_INFO;

  118.     s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module);
  119.     if (s->ctx == NULL) {
  120.         ngx_stream_close_connection(c);
  121.         return;
  122.     }

  123.     cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);

  124.     s->variables = ngx_pcalloc(s->connection->pool,
  125.                                cmcf->variables.nelts
  126.                                * sizeof(ngx_stream_variable_value_t));

  127.     if (s->variables == NULL) {
  128.         ngx_stream_close_connection(c);
  129.         return;
  130.     }

  131.     tp = ngx_timeofday();
  132.     s->start_sec = tp->sec;
  133.     s->start_msec = tp->msec;

  134.     rev = c->read;
  135.     rev->handler = ngx_stream_session_handler;

  136.     if (addr_conf->proxy_protocol) {
  137.         c->log->action = "reading PROXY protocol";

  138.         rev->handler = ngx_stream_proxy_protocol_handler;

  139.         if (!rev->ready) {
  140.             ngx_add_timer(rev, cscf->proxy_protocol_timeout);

  141.             if (ngx_handle_read_event(rev, 0) != NGX_OK) {
  142.                 ngx_stream_finalize_session(s,
  143.                                             NGX_STREAM_INTERNAL_SERVER_ERROR);
  144.             }

  145.             return;
  146.         }
  147.     }

  148.     if (ngx_use_accept_mutex) {
  149.         ngx_post_event(rev, &ngx_posted_events);
  150.         return;
  151.     }

  152.     rev->handler(rev);
  153. }


  154. static void
  155. ngx_stream_proxy_protocol_handler(ngx_event_t *rev)
  156. {
  157.     u_char                      *p, buf[NGX_PROXY_PROTOCOL_MAX_HEADER];
  158.     size_t                       size;
  159.     ssize_t                      n;
  160.     ngx_err_t                    err;
  161.     ngx_connection_t            *c;
  162.     ngx_stream_session_t        *s;
  163.     ngx_stream_core_srv_conf_t  *cscf;

  164.     c = rev->data;
  165.     s = c->data;

  166.     ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
  167.                    "stream PROXY protocol handler");

  168.     if (rev->timedout) {
  169.         ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
  170.         ngx_stream_finalize_session(s, NGX_STREAM_OK);
  171.         return;
  172.     }

  173.     n = recv(c->fd, (char *) buf, sizeof(buf), MSG_PEEK);

  174.     err = ngx_socket_errno;

  175.     ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "recv(): %z", n);

  176.     if (n == -1) {
  177.         if (err == NGX_EAGAIN) {
  178.             rev->ready = 0;

  179.             if (!rev->timer_set) {
  180.                 cscf = ngx_stream_get_module_srv_conf(s,
  181.                                                       ngx_stream_core_module);

  182.                 ngx_add_timer(rev, cscf->proxy_protocol_timeout);
  183.             }

  184.             if (ngx_handle_read_event(rev, 0) != NGX_OK) {
  185.                 ngx_stream_finalize_session(s,
  186.                                             NGX_STREAM_INTERNAL_SERVER_ERROR);
  187.             }

  188.             return;
  189.         }

  190.         ngx_connection_error(c, err, "recv() failed");

  191.         ngx_stream_finalize_session(s, NGX_STREAM_OK);
  192.         return;
  193.     }

  194.     if (rev->timer_set) {
  195.         ngx_del_timer(rev);
  196.     }

  197.     p = ngx_proxy_protocol_read(c, buf, buf + n);

  198.     if (p == NULL) {
  199.         ngx_stream_finalize_session(s, NGX_STREAM_BAD_REQUEST);
  200.         return;
  201.     }

  202.     size = p - buf;

  203.     if (c->recv(c, buf, size) != (ssize_t) size) {
  204.         ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
  205.         return;
  206.     }

  207.     c->log->action = "initializing session";

  208.     ngx_stream_session_handler(rev);
  209. }


  210. void
  211. ngx_stream_session_handler(ngx_event_t *rev)
  212. {
  213.     ngx_connection_t      *c;
  214.     ngx_stream_session_t  *s;

  215.     c = rev->data;
  216.     s = c->data;

  217.     ngx_stream_core_run_phases(s);
  218. }


  219. void
  220. ngx_stream_finalize_session(ngx_stream_session_t *s, ngx_uint_t rc)
  221. {
  222.     ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
  223.                    "finalize stream session: %i", rc);

  224.     s->status = rc;

  225.     ngx_stream_log_session(s);

  226.     ngx_stream_close_connection(s->connection);
  227. }


  228. static void
  229. ngx_stream_log_session(ngx_stream_session_t *s)
  230. {
  231.     ngx_uint_t                    i, n;
  232.     ngx_stream_handler_pt        *log_handler;
  233.     ngx_stream_core_main_conf_t  *cmcf;

  234.     cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);

  235.     log_handler = cmcf->phases[NGX_STREAM_LOG_PHASE].handlers.elts;
  236.     n = cmcf->phases[NGX_STREAM_LOG_PHASE].handlers.nelts;

  237.     for (i = 0; i < n; i++) {
  238.         log_handler[i](s);
  239.     }
  240. }


  241. static void
  242. ngx_stream_close_connection(ngx_connection_t *c)
  243. {
  244.     ngx_pool_t  *pool;

  245.     ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
  246.                    "close stream connection: %d", c->fd);

  247. #if (NGX_STREAM_SSL)

  248.     if (c->ssl) {
  249.         if (ngx_ssl_shutdown(c) == NGX_AGAIN) {
  250.             c->ssl->handler = ngx_stream_close_connection;
  251.             return;
  252.         }
  253.     }

  254. #endif

  255. #if (NGX_STAT_STUB)
  256.     (void) ngx_atomic_fetch_add(ngx_stat_active, -1);
  257. #endif

  258.     pool = c->pool;

  259.     ngx_close_connection(c);

  260.     ngx_destroy_pool(pool);
  261. }


  262. static u_char *
  263. ngx_stream_log_error(ngx_log_t *log, u_char *buf, size_t len)
  264. {
  265.     u_char                *p;
  266.     ngx_stream_session_t  *s;

  267.     if (log->action) {
  268.         p = ngx_snprintf(buf, len, " while %s", log->action);
  269.         len -= p - buf;
  270.         buf = p;
  271.     }

  272.     s = log->data;

  273.     p = ngx_snprintf(buf, len, ", %sclient: %V, server: %V",
  274.                      s->connection->type == SOCK_DGRAM ? "udp " : "",
  275.                      &s->connection->addr_text,
  276.                      &s->connection->listening->addr_text);
  277.     len -= p - buf;
  278.     buf = p;

  279.     if (s->log_handler) {
  280.         p = s->log_handler(log, buf, len);
  281.     }

  282.     return p;
  283. }