src/http/modules/ngx_http_upstream_keepalive_module.c - nginx source code

Global variables defined

Data types defined

Functions defined

Source code


  1. /*
  2. * Copyright (C) Maxim Dounin
  3. * Copyright (C) Nginx, Inc.
  4. */


  5. #include <ngx_config.h>
  6. #include <ngx_core.h>
  7. #include <ngx_http.h>


  8. typedef struct {
  9.     ngx_uint_t                         max_cached;
  10.     ngx_uint_t                         requests;
  11.     ngx_msec_t                         time;
  12.     ngx_msec_t                         timeout;

  13.     ngx_queue_t                        cache;
  14.     ngx_queue_t                        free;

  15.     ngx_http_upstream_init_pt          original_init_upstream;
  16.     ngx_http_upstream_init_peer_pt     original_init_peer;

  17. } ngx_http_upstream_keepalive_srv_conf_t;


  18. typedef struct {
  19.     ngx_http_upstream_keepalive_srv_conf_t  *conf;

  20.     ngx_queue_t                        queue;
  21.     ngx_connection_t                  *connection;

  22.     socklen_t                          socklen;
  23.     ngx_sockaddr_t                     sockaddr;

  24. } ngx_http_upstream_keepalive_cache_t;


  25. typedef struct {
  26.     ngx_http_upstream_keepalive_srv_conf_t  *conf;

  27.     ngx_http_upstream_t               *upstream;

  28.     void                              *data;

  29.     ngx_event_get_peer_pt              original_get_peer;
  30.     ngx_event_free_peer_pt             original_free_peer;

  31. #if (NGX_HTTP_SSL)
  32.     ngx_event_set_peer_session_pt      original_set_session;
  33.     ngx_event_save_peer_session_pt     original_save_session;
  34. #endif

  35. } ngx_http_upstream_keepalive_peer_data_t;


  36. static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
  37.     ngx_http_upstream_srv_conf_t *us);
  38. static ngx_int_t ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc,
  39.     void *data);
  40. static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc,
  41.     void *data, ngx_uint_t state);

  42. static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev);
  43. static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev);
  44. static void ngx_http_upstream_keepalive_close(ngx_connection_t *c);

  45. #if (NGX_HTTP_SSL)
  46. static ngx_int_t ngx_http_upstream_keepalive_set_session(
  47.     ngx_peer_connection_t *pc, void *data);
  48. static void ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc,
  49.     void *data);
  50. #endif

  51. static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf);
  52. static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd,
  53.     void *conf);


  54. static ngx_command_t  ngx_http_upstream_keepalive_commands[] = {

  55.     { ngx_string("keepalive"),
  56.       NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
  57.       ngx_http_upstream_keepalive,
  58.       NGX_HTTP_SRV_CONF_OFFSET,
  59.       0,
  60.       NULL },

  61.     { ngx_string("keepalive_time"),
  62.       NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
  63.       ngx_conf_set_msec_slot,
  64.       NGX_HTTP_SRV_CONF_OFFSET,
  65.       offsetof(ngx_http_upstream_keepalive_srv_conf_t, time),
  66.       NULL },

  67.     { ngx_string("keepalive_timeout"),
  68.       NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
  69.       ngx_conf_set_msec_slot,
  70.       NGX_HTTP_SRV_CONF_OFFSET,
  71.       offsetof(ngx_http_upstream_keepalive_srv_conf_t, timeout),
  72.       NULL },

  73.     { ngx_string("keepalive_requests"),
  74.       NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
  75.       ngx_conf_set_num_slot,
  76.       NGX_HTTP_SRV_CONF_OFFSET,
  77.       offsetof(ngx_http_upstream_keepalive_srv_conf_t, requests),
  78.       NULL },

  79.       ngx_null_command
  80. };


  81. static ngx_http_module_t  ngx_http_upstream_keepalive_module_ctx = {
  82.     NULL,                                  /* preconfiguration */
  83.     NULL,                                  /* postconfiguration */

  84.     NULL,                                  /* create main configuration */
  85.     NULL,                                  /* init main configuration */

  86.     ngx_http_upstream_keepalive_create_conf, /* create server configuration */
  87.     NULL,                                  /* merge server configuration */

  88.     NULL,                                  /* create location configuration */
  89.     NULL                                   /* merge location configuration */
  90. };


  91. ngx_module_t  ngx_http_upstream_keepalive_module = {
  92.     NGX_MODULE_V1,
  93.     &ngx_http_upstream_keepalive_module_ctx, /* module context */
  94.     ngx_http_upstream_keepalive_commands,    /* module directives */
  95.     NGX_HTTP_MODULE,                       /* module type */
  96.     NULL,                                  /* init master */
  97.     NULL,                                  /* init module */
  98.     NULL,                                  /* init process */
  99.     NULL,                                  /* init thread */
  100.     NULL,                                  /* exit thread */
  101.     NULL,                                  /* exit process */
  102.     NULL,                                  /* exit master */
  103.     NGX_MODULE_V1_PADDING
  104. };


  105. static ngx_int_t
  106. ngx_http_upstream_init_keepalive(ngx_conf_t *cf,
  107.     ngx_http_upstream_srv_conf_t *us)
  108. {
  109.     ngx_uint_t                               i;
  110.     ngx_http_upstream_keepalive_srv_conf_t  *kcf;
  111.     ngx_http_upstream_keepalive_cache_t     *cached;

  112.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0,
  113.                    "init keepalive");

  114.     kcf = ngx_http_conf_upstream_srv_conf(us,
  115.                                           ngx_http_upstream_keepalive_module);

  116.     ngx_conf_init_msec_value(kcf->time, 3600000);
  117.     ngx_conf_init_msec_value(kcf->timeout, 60000);
  118.     ngx_conf_init_uint_value(kcf->requests, 1000);

  119.     if (kcf->original_init_upstream(cf, us) != NGX_OK) {
  120.         return NGX_ERROR;
  121.     }

  122.     kcf->original_init_peer = us->peer.init;

  123.     us->peer.init = ngx_http_upstream_init_keepalive_peer;

  124.     /* allocate cache items and add to free queue */

  125.     cached = ngx_pcalloc(cf->pool,
  126.                 sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached);
  127.     if (cached == NULL) {
  128.         return NGX_ERROR;
  129.     }

  130.     ngx_queue_init(&kcf->cache);
  131.     ngx_queue_init(&kcf->free);

  132.     for (i = 0; i < kcf->max_cached; i++) {
  133.         ngx_queue_insert_head(&kcf->free, &cached[i].queue);
  134.         cached[i].conf = kcf;
  135.     }

  136.     return NGX_OK;
  137. }


  138. static ngx_int_t
  139. ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
  140.     ngx_http_upstream_srv_conf_t *us)
  141. {
  142.     ngx_http_upstream_keepalive_peer_data_t  *kp;
  143.     ngx_http_upstream_keepalive_srv_conf_t   *kcf;

  144.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
  145.                    "init keepalive peer");

  146.     kcf = ngx_http_conf_upstream_srv_conf(us,
  147.                                           ngx_http_upstream_keepalive_module);

  148.     kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t));
  149.     if (kp == NULL) {
  150.         return NGX_ERROR;
  151.     }

  152.     if (kcf->original_init_peer(r, us) != NGX_OK) {
  153.         return NGX_ERROR;
  154.     }

  155.     kp->conf = kcf;
  156.     kp->upstream = r->upstream;
  157.     kp->data = r->upstream->peer.data;
  158.     kp->original_get_peer = r->upstream->peer.get;
  159.     kp->original_free_peer = r->upstream->peer.free;

  160.     r->upstream->peer.data = kp;
  161.     r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer;
  162.     r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer;

  163. #if (NGX_HTTP_SSL)
  164.     kp->original_set_session = r->upstream->peer.set_session;
  165.     kp->original_save_session = r->upstream->peer.save_session;
  166.     r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session;
  167.     r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session;
  168. #endif

  169.     return NGX_OK;
  170. }


  171. static ngx_int_t
  172. ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
  173. {
  174.     ngx_http_upstream_keepalive_peer_data_t  *kp = data;
  175.     ngx_http_upstream_keepalive_cache_t      *item;

  176.     ngx_int_t          rc;
  177.     ngx_queue_t       *q, *cache;
  178.     ngx_connection_t  *c;

  179.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
  180.                    "get keepalive peer");

  181.     /* ask balancer */

  182.     rc = kp->original_get_peer(pc, kp->data);

  183.     if (rc != NGX_OK) {
  184.         return rc;
  185.     }

  186.     /* search cache for suitable connection */

  187.     cache = &kp->conf->cache;

  188.     for (q = ngx_queue_head(cache);
  189.          q != ngx_queue_sentinel(cache);
  190.          q = ngx_queue_next(q))
  191.     {
  192.         item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
  193.         c = item->connection;

  194.         if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr,
  195.                          item->socklen, pc->socklen)
  196.             == 0)
  197.         {
  198.             ngx_queue_remove(q);
  199.             ngx_queue_insert_head(&kp->conf->free, q);

  200.             goto found;
  201.         }
  202.     }

  203.     return NGX_OK;

  204. found:

  205.     ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
  206.                    "get keepalive peer: using connection %p", c);

  207.     c->idle = 0;
  208.     c->sent = 0;
  209.     c->data = NULL;
  210.     c->log = pc->log;
  211.     c->read->log = pc->log;
  212.     c->write->log = pc->log;
  213.     c->pool->log = pc->log;

  214.     if (c->read->timer_set) {
  215.         ngx_del_timer(c->read);
  216.     }

  217.     pc->connection = c;
  218.     pc->cached = 1;

  219.     return NGX_DONE;
  220. }


  221. static void
  222. ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,
  223.     ngx_uint_t state)
  224. {
  225.     ngx_http_upstream_keepalive_peer_data_t  *kp = data;
  226.     ngx_http_upstream_keepalive_cache_t      *item;

  227.     ngx_queue_t          *q;
  228.     ngx_connection_t     *c;
  229.     ngx_http_upstream_t  *u;

  230.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
  231.                    "free keepalive peer");

  232.     /* cache valid connections */

  233.     u = kp->upstream;
  234.     c = pc->connection;

  235.     if (state & NGX_PEER_FAILED
  236.         || c == NULL
  237.         || c->read->eof
  238.         || c->read->error
  239.         || c->read->timedout
  240.         || c->write->error
  241.         || c->write->timedout)
  242.     {
  243.         goto invalid;
  244.     }

  245.     if (c->requests >= kp->conf->requests) {
  246.         goto invalid;
  247.     }

  248.     if (ngx_current_msec - c->start_time > kp->conf->time) {
  249.         goto invalid;
  250.     }

  251.     if (!u->keepalive) {
  252.         goto invalid;
  253.     }

  254.     if (!u->request_body_sent) {
  255.         goto invalid;
  256.     }

  257.     if (ngx_terminate || ngx_exiting) {
  258.         goto invalid;
  259.     }

  260.     if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
  261.         goto invalid;
  262.     }

  263.     ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
  264.                    "free keepalive peer: saving connection %p", c);

  265.     if (ngx_queue_empty(&kp->conf->free)) {

  266.         q = ngx_queue_last(&kp->conf->cache);
  267.         ngx_queue_remove(q);

  268.         item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);

  269.         ngx_http_upstream_keepalive_close(item->connection);

  270.     } else {
  271.         q = ngx_queue_head(&kp->conf->free);
  272.         ngx_queue_remove(q);

  273.         item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
  274.     }

  275.     ngx_queue_insert_head(&kp->conf->cache, q);

  276.     item->connection = c;

  277.     pc->connection = NULL;

  278.     c->read->delayed = 0;
  279.     ngx_add_timer(c->read, kp->conf->timeout);

  280.     if (c->write->timer_set) {
  281.         ngx_del_timer(c->write);
  282.     }

  283.     c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
  284.     c->read->handler = ngx_http_upstream_keepalive_close_handler;

  285.     c->data = item;
  286.     c->idle = 1;
  287.     c->log = ngx_cycle->log;
  288.     c->read->log = ngx_cycle->log;
  289.     c->write->log = ngx_cycle->log;
  290.     c->pool->log = ngx_cycle->log;

  291.     item->socklen = pc->socklen;
  292.     ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);

  293.     if (c->read->ready) {
  294.         ngx_http_upstream_keepalive_close_handler(c->read);
  295.     }

  296. invalid:

  297.     kp->original_free_peer(pc, kp->data, state);
  298. }


  299. static void
  300. ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev)
  301. {
  302.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
  303.                    "keepalive dummy handler");
  304. }


  305. static void
  306. ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev)
  307. {
  308.     ngx_http_upstream_keepalive_srv_conf_t  *conf;
  309.     ngx_http_upstream_keepalive_cache_t     *item;

  310.     int                n;
  311.     char               buf[1];
  312.     ngx_connection_t  *c;

  313.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
  314.                    "keepalive close handler");

  315.     c = ev->data;

  316.     if (c->close || c->read->timedout) {
  317.         goto close;
  318.     }

  319.     n = recv(c->fd, buf, 1, MSG_PEEK);

  320.     if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
  321.         ev->ready = 0;

  322.         if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
  323.             goto close;
  324.         }

  325.         return;
  326.     }

  327. close:

  328.     item = c->data;
  329.     conf = item->conf;

  330.     ngx_http_upstream_keepalive_close(c);

  331.     ngx_queue_remove(&item->queue);
  332.     ngx_queue_insert_head(&conf->free, &item->queue);
  333. }


  334. static void
  335. ngx_http_upstream_keepalive_close(ngx_connection_t *c)
  336. {

  337. #if (NGX_HTTP_SSL)

  338.     if (c->ssl) {
  339.         c->ssl->no_wait_shutdown = 1;
  340.         c->ssl->no_send_shutdown = 1;

  341.         if (ngx_ssl_shutdown(c) == NGX_AGAIN) {
  342.             c->ssl->handler = ngx_http_upstream_keepalive_close;
  343.             return;
  344.         }
  345.     }

  346. #endif

  347.     ngx_destroy_pool(c->pool);
  348.     ngx_close_connection(c);
  349. }


  350. #if (NGX_HTTP_SSL)

  351. static ngx_int_t
  352. ngx_http_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data)
  353. {
  354.     ngx_http_upstream_keepalive_peer_data_t  *kp = data;

  355.     return kp->original_set_session(pc, kp->data);
  356. }


  357. static void
  358. ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data)
  359. {
  360.     ngx_http_upstream_keepalive_peer_data_t  *kp = data;

  361.     kp->original_save_session(pc, kp->data);
  362.     return;
  363. }

  364. #endif


  365. static void *
  366. ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf)
  367. {
  368.     ngx_http_upstream_keepalive_srv_conf_t  *conf;

  369.     conf = ngx_pcalloc(cf->pool,
  370.                        sizeof(ngx_http_upstream_keepalive_srv_conf_t));
  371.     if (conf == NULL) {
  372.         return NULL;
  373.     }

  374.     /*
  375.      * set by ngx_pcalloc():
  376.      *
  377.      *     conf->original_init_upstream = NULL;
  378.      *     conf->original_init_peer = NULL;
  379.      *     conf->max_cached = 0;
  380.      */

  381.     conf->time = NGX_CONF_UNSET_MSEC;
  382.     conf->timeout = NGX_CONF_UNSET_MSEC;
  383.     conf->requests = NGX_CONF_UNSET_UINT;

  384.     return conf;
  385. }


  386. static char *
  387. ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
  388. {
  389.     ngx_http_upstream_srv_conf_t            *uscf;
  390.     ngx_http_upstream_keepalive_srv_conf_t  *kcf = conf;

  391.     ngx_int_t    n;
  392.     ngx_str_t   *value;

  393.     if (kcf->max_cached) {
  394.         return "is duplicate";
  395.     }

  396.     /* read options */

  397.     value = cf->args->elts;

  398.     n = ngx_atoi(value[1].data, value[1].len);

  399.     if (n == NGX_ERROR || n == 0) {
  400.         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
  401.                            "invalid value \"%V\" in \"%V\" directive",
  402.                            &value[1], &cmd->name);
  403.         return NGX_CONF_ERROR;
  404.     }

  405.     kcf->max_cached = n;

  406.     /* init upstream handler */

  407.     uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);

  408.     kcf->original_init_upstream = uscf->peer.init_upstream
  409.                                   ? uscf->peer.init_upstream
  410.                                   : ngx_http_upstream_init_round_robin;

  411.     uscf->peer.init_upstream = ngx_http_upstream_init_keepalive;

  412.     return NGX_CONF_OK;
  413. }