src/http/modules/ngx_http_upstream_keepalive_module.c - nginx

Global variables defined

Data types defined

Functions defined

Source code


  1. /*
  2. * Copyright (C) Maxim Dounin
  3. * Copyright (C) Nginx, Inc.
  4. */


  5. #include <ngx_config.h>
  6. #include <ngx_core.h>
  7. #include <ngx_http.h>


  8. typedef struct {
  9.     ngx_uint_t                         max_cached;
  10.     ngx_uint_t                         requests;
  11.     ngx_msec_t                         time;
  12.     ngx_msec_t                         timeout;

  13.     ngx_queue_t                        cache;
  14.     ngx_queue_t                        free;

  15.     ngx_http_upstream_init_peer_pt     original_init_peer;

  16.     ngx_uint_t                         local; /* unsigned  local:1; */

  17. } ngx_http_upstream_keepalive_srv_conf_t;


  18. typedef struct {
  19.     ngx_http_upstream_keepalive_srv_conf_t  *conf;

  20.     ngx_queue_t                        queue;
  21.     ngx_connection_t                  *connection;

  22.     socklen_t                          socklen;
  23.     ngx_sockaddr_t                     sockaddr;

  24.     ngx_http_upstream_conf_t          *tag;

  25. } ngx_http_upstream_keepalive_cache_t;


  26. typedef struct {
  27.     ngx_http_upstream_keepalive_srv_conf_t  *conf;

  28.     ngx_http_upstream_t               *upstream;

  29.     void                              *data;

  30.     ngx_event_get_peer_pt              original_get_peer;
  31.     ngx_event_free_peer_pt             original_free_peer;

  32. #if (NGX_HTTP_SSL)
  33.     ngx_event_set_peer_session_pt      original_set_session;
  34.     ngx_event_save_peer_session_pt     original_save_session;
  35. #endif

  36.     ngx_event_notify_peer_pt           original_notify;

  37. } ngx_http_upstream_keepalive_peer_data_t;


  38. static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
  39.     ngx_http_upstream_srv_conf_t *us);
  40. static ngx_int_t ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc,
  41.     void *data);
  42. static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc,
  43.     void *data, ngx_uint_t state);

  44. static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev);
  45. static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev);
  46. static void ngx_http_upstream_keepalive_close(ngx_connection_t *c);

  47. #if (NGX_HTTP_SSL)
  48. static ngx_int_t ngx_http_upstream_keepalive_set_session(
  49.     ngx_peer_connection_t *pc, void *data);
  50. static void ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc,
  51.     void *data);
  52. #endif

  53. static void ngx_http_upstream_notify_keepalive_peer(ngx_peer_connection_t *pc,
  54.     void *data, ngx_uint_t type);

  55. static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf);
  56. static char *ngx_http_upstream_keepalive_init_main_conf(ngx_conf_t *cf,
  57.     void *conf);
  58. static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd,
  59.     void *conf);


  60. static ngx_command_t  ngx_http_upstream_keepalive_commands[] = {

  61.     { ngx_string("keepalive"),
  62.       NGX_HTTP_UPS_CONF|NGX_CONF_TAKE12,
  63.       ngx_http_upstream_keepalive,
  64.       NGX_HTTP_SRV_CONF_OFFSET,
  65.       0,
  66.       NULL },

  67.     { ngx_string("keepalive_time"),
  68.       NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
  69.       ngx_conf_set_msec_slot,
  70.       NGX_HTTP_SRV_CONF_OFFSET,
  71.       offsetof(ngx_http_upstream_keepalive_srv_conf_t, time),
  72.       NULL },

  73.     { ngx_string("keepalive_timeout"),
  74.       NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
  75.       ngx_conf_set_msec_slot,
  76.       NGX_HTTP_SRV_CONF_OFFSET,
  77.       offsetof(ngx_http_upstream_keepalive_srv_conf_t, timeout),
  78.       NULL },

  79.     { ngx_string("keepalive_requests"),
  80.       NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
  81.       ngx_conf_set_num_slot,
  82.       NGX_HTTP_SRV_CONF_OFFSET,
  83.       offsetof(ngx_http_upstream_keepalive_srv_conf_t, requests),
  84.       NULL },

  85.       ngx_null_command
  86. };


  87. static ngx_http_module_t  ngx_http_upstream_keepalive_module_ctx = {
  88.     NULL,                                  /* preconfiguration */
  89.     NULL,                                  /* postconfiguration */

  90.     NULL,                                  /* create main configuration */
  91.     ngx_http_upstream_keepalive_init_main_conf, /* init main configuration */

  92.     ngx_http_upstream_keepalive_create_conf, /* create server configuration */
  93.     NULL,                                  /* merge server configuration */

  94.     NULL,                                  /* create location configuration */
  95.     NULL                                   /* merge location configuration */
  96. };


  97. ngx_module_t  ngx_http_upstream_keepalive_module = {
  98.     NGX_MODULE_V1,
  99.     &ngx_http_upstream_keepalive_module_ctx, /* module context */
  100.     ngx_http_upstream_keepalive_commands,    /* module directives */
  101.     NGX_HTTP_MODULE,                       /* module type */
  102.     NULL,                                  /* init master */
  103.     NULL,                                  /* init module */
  104.     NULL,                                  /* init process */
  105.     NULL,                                  /* init thread */
  106.     NULL,                                  /* exit thread */
  107.     NULL,                                  /* exit process */
  108.     NULL,                                  /* exit master */
  109.     NGX_MODULE_V1_PADDING
  110. };


  111. static ngx_int_t
  112. ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
  113.     ngx_http_upstream_srv_conf_t *us)
  114. {
  115.     ngx_http_upstream_keepalive_peer_data_t  *kp;
  116.     ngx_http_upstream_keepalive_srv_conf_t   *kcf;

  117.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
  118.                    "init keepalive peer");

  119.     kcf = ngx_http_conf_upstream_srv_conf(us,
  120.                                           ngx_http_upstream_keepalive_module);

  121.     kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t));
  122.     if (kp == NULL) {
  123.         return NGX_ERROR;
  124.     }

  125.     if (kcf->original_init_peer(r, us) != NGX_OK) {
  126.         return NGX_ERROR;
  127.     }

  128.     kp->conf = kcf;
  129.     kp->upstream = r->upstream;
  130.     kp->data = r->upstream->peer.data;
  131.     kp->original_get_peer = r->upstream->peer.get;
  132.     kp->original_free_peer = r->upstream->peer.free;

  133.     r->upstream->peer.data = kp;
  134.     r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer;
  135.     r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer;

  136. #if (NGX_HTTP_SSL)
  137.     kp->original_set_session = r->upstream->peer.set_session;
  138.     kp->original_save_session = r->upstream->peer.save_session;
  139.     r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session;
  140.     r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session;
  141. #endif

  142.     if (r->upstream->peer.notify) {
  143.         kp->original_notify = r->upstream->peer.notify;
  144.         r->upstream->peer.notify = ngx_http_upstream_notify_keepalive_peer;
  145.     }

  146.     return NGX_OK;
  147. }


  148. static ngx_int_t
  149. ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
  150. {
  151.     ngx_http_upstream_keepalive_peer_data_t  *kp = data;
  152.     ngx_http_upstream_keepalive_cache_t      *item;

  153.     ngx_int_t          rc;
  154.     ngx_queue_t       *q, *cache;
  155.     ngx_connection_t  *c;

  156.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
  157.                    "get keepalive peer");

  158.     /* ask balancer */

  159.     rc = kp->original_get_peer(pc, kp->data);

  160.     if (rc != NGX_OK) {
  161.         return rc;
  162.     }

  163.     /* search cache for suitable connection */

  164.     cache = &kp->conf->cache;

  165.     for (q = ngx_queue_head(cache);
  166.          q != ngx_queue_sentinel(cache);
  167.          q = ngx_queue_next(q))
  168.     {
  169.         item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
  170.         c = item->connection;

  171.         if (kp->conf->local && item->tag != kp->upstream->conf) {
  172.             continue;
  173.         }

  174.         if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr,
  175.                          item->socklen, pc->socklen)
  176.             == 0)
  177.         {
  178.             ngx_queue_remove(q);
  179.             ngx_queue_insert_head(&kp->conf->free, q);

  180.             goto found;
  181.         }
  182.     }

  183.     return NGX_OK;

  184. found:

  185.     ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
  186.                    "get keepalive peer: using connection %p", c);

  187.     c->idle = 0;
  188.     c->sent = 0;
  189.     c->data = NULL;
  190.     c->log = pc->log;
  191.     c->read->log = pc->log;
  192.     c->write->log = pc->log;
  193.     c->pool->log = pc->log;

  194.     if (c->read->timer_set) {
  195.         ngx_del_timer(c->read);
  196.     }

  197.     pc->connection = c;
  198.     pc->cached = 1;

  199.     return NGX_DONE;
  200. }


  201. static void
  202. ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,
  203.     ngx_uint_t state)
  204. {
  205.     ngx_http_upstream_keepalive_peer_data_t  *kp = data;
  206.     ngx_http_upstream_keepalive_cache_t      *item;

  207.     ngx_queue_t          *q;
  208.     ngx_connection_t     *c;
  209.     ngx_http_upstream_t  *u;

  210.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
  211.                    "free keepalive peer");

  212.     /* cache valid connections */

  213.     u = kp->upstream;
  214.     c = pc->connection;

  215.     if (state & NGX_PEER_FAILED
  216.         || c == NULL
  217.         || c->read->eof
  218.         || c->read->error
  219.         || c->read->timedout
  220.         || c->write->error
  221.         || c->write->timedout)
  222.     {
  223.         goto invalid;
  224.     }

  225.     if (c->requests >= kp->conf->requests) {
  226.         goto invalid;
  227.     }

  228.     if (ngx_current_msec - c->start_time > kp->conf->time) {
  229.         goto invalid;
  230.     }

  231.     if (!u->keepalive) {
  232.         goto invalid;
  233.     }

  234.     if (!u->request_body_sent) {
  235.         goto invalid;
  236.     }

  237.     if (ngx_terminate || ngx_exiting) {
  238.         goto invalid;
  239.     }

  240.     if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
  241.         goto invalid;
  242.     }

  243.     ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
  244.                    "free keepalive peer: saving connection %p", c);

  245.     if (ngx_queue_empty(&kp->conf->free)) {

  246.         q = ngx_queue_last(&kp->conf->cache);
  247.         ngx_queue_remove(q);

  248.         item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);

  249.         ngx_http_upstream_keepalive_close(item->connection);

  250.     } else {
  251.         q = ngx_queue_head(&kp->conf->free);
  252.         ngx_queue_remove(q);

  253.         item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
  254.     }

  255.     ngx_queue_insert_head(&kp->conf->cache, q);

  256.     item->connection = c;
  257.     item->tag = u->conf;

  258.     pc->connection = NULL;

  259.     c->read->delayed = 0;
  260.     ngx_add_timer(c->read, kp->conf->timeout);

  261.     if (c->write->timer_set) {
  262.         ngx_del_timer(c->write);
  263.     }

  264.     c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
  265.     c->read->handler = ngx_http_upstream_keepalive_close_handler;

  266.     c->data = item;
  267.     c->idle = 1;
  268.     c->log = ngx_cycle->log;
  269.     c->read->log = ngx_cycle->log;
  270.     c->write->log = ngx_cycle->log;
  271.     c->pool->log = ngx_cycle->log;

  272.     item->socklen = pc->socklen;
  273.     ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);

  274.     if (c->read->ready) {
  275.         ngx_http_upstream_keepalive_close_handler(c->read);
  276.     }

  277. invalid:

  278.     kp->original_free_peer(pc, kp->data, state);
  279. }


  280. static void
  281. ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev)
  282. {
  283.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
  284.                    "keepalive dummy handler");
  285. }


  286. static void
  287. ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev)
  288. {
  289.     ngx_http_upstream_keepalive_srv_conf_t  *conf;
  290.     ngx_http_upstream_keepalive_cache_t     *item;

  291.     int                n;
  292.     char               buf[1];
  293.     ngx_connection_t  *c;

  294.     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
  295.                    "keepalive close handler");

  296.     c = ev->data;

  297.     if (c->close || c->read->timedout) {
  298.         goto close;
  299.     }

  300.     n = recv(c->fd, buf, 1, MSG_PEEK);

  301.     if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
  302.         ev->ready = 0;

  303.         if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
  304.             goto close;
  305.         }

  306.         return;
  307.     }

  308. close:

  309.     item = c->data;
  310.     conf = item->conf;

  311.     ngx_http_upstream_keepalive_close(c);

  312.     ngx_queue_remove(&item->queue);
  313.     ngx_queue_insert_head(&conf->free, &item->queue);
  314. }


  315. static void
  316. ngx_http_upstream_keepalive_close(ngx_connection_t *c)
  317. {

  318. #if (NGX_HTTP_SSL)

  319.     if (c->ssl) {
  320.         c->ssl->no_wait_shutdown = 1;
  321.         c->ssl->no_send_shutdown = 1;

  322.         if (ngx_ssl_shutdown(c) == NGX_AGAIN) {
  323.             c->ssl->handler = ngx_http_upstream_keepalive_close;
  324.             return;
  325.         }
  326.     }

  327. #endif

  328.     ngx_destroy_pool(c->pool);
  329.     ngx_close_connection(c);
  330. }


  331. #if (NGX_HTTP_SSL)

  332. static ngx_int_t
  333. ngx_http_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data)
  334. {
  335.     ngx_http_upstream_keepalive_peer_data_t  *kp = data;

  336.     return kp->original_set_session(pc, kp->data);
  337. }


  338. static void
  339. ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data)
  340. {
  341.     ngx_http_upstream_keepalive_peer_data_t  *kp = data;

  342.     kp->original_save_session(pc, kp->data);
  343.     return;
  344. }

  345. #endif


  346. static void
  347. ngx_http_upstream_notify_keepalive_peer(ngx_peer_connection_t *pc, void *data,
  348.     ngx_uint_t type)
  349. {
  350.     ngx_http_upstream_keepalive_peer_data_t  *kp = data;

  351.     kp->original_notify(pc, kp->data, type);
  352. }


  353. static void *
  354. ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf)
  355. {
  356.     ngx_http_upstream_keepalive_srv_conf_t  *conf;

  357.     conf = ngx_pcalloc(cf->pool,
  358.                        sizeof(ngx_http_upstream_keepalive_srv_conf_t));
  359.     if (conf == NULL) {
  360.         return NULL;
  361.     }

  362.     /*
  363.      * set by ngx_pcalloc():
  364.      *
  365.      *     conf->original_init_peer = NULL;
  366.      *     conf->local = 0;
  367.      */

  368.     conf->time = NGX_CONF_UNSET_MSEC;
  369.     conf->timeout = NGX_CONF_UNSET_MSEC;
  370.     conf->requests = NGX_CONF_UNSET_UINT;
  371.     conf->max_cached = NGX_CONF_UNSET_UINT;

  372.     return conf;
  373. }


  374. static char *
  375. ngx_http_upstream_keepalive_init_main_conf(ngx_conf_t *cf, void *conf)
  376. {
  377.     ngx_uint_t                                i, j;
  378.     ngx_http_upstream_srv_conf_t            **uscfp;
  379.     ngx_http_upstream_main_conf_t            *umcf;
  380.     ngx_http_upstream_keepalive_cache_t      *cached;
  381.     ngx_http_upstream_keepalive_srv_conf_t   *kcf;

  382.     umcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_upstream_module);

  383.     uscfp = umcf->upstreams.elts;

  384.     for (i = 0; i < umcf->upstreams.nelts; i++) {

  385.         /* skip implicit upstreams */
  386.         if (uscfp[i]->srv_conf == NULL) {
  387.             continue;
  388.         }

  389.         kcf = ngx_http_conf_upstream_srv_conf(uscfp[i],
  390.                                             ngx_http_upstream_keepalive_module);

  391.         if (kcf->max_cached == 0) {
  392.             continue;
  393.         }

  394.         ngx_conf_init_msec_value(kcf->time, 3600000);
  395.         ngx_conf_init_msec_value(kcf->timeout, 60000);
  396.         ngx_conf_init_uint_value(kcf->requests, 1000);

  397.         if (kcf->max_cached == NGX_CONF_UNSET_UINT) {
  398.             kcf->local = 1;
  399.             kcf->max_cached = 32;
  400.         }

  401.         kcf->original_init_peer = uscfp[i]->peer.init;

  402.         uscfp[i]->peer.init = ngx_http_upstream_init_keepalive_peer;

  403.         /* allocate cache items and add to free queue */

  404.         cached = ngx_pcalloc(cf->pool,
  405.                  sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached);
  406.         if (cached == NULL) {
  407.             return NGX_CONF_ERROR;
  408.         }

  409.         ngx_queue_init(&kcf->cache);
  410.         ngx_queue_init(&kcf->free);

  411.         for (j = 0; j < kcf->max_cached; j++) {
  412.             ngx_queue_insert_head(&kcf->free, &cached[j].queue);
  413.             cached[j].conf = kcf;
  414.         }
  415.     }

  416.     return NGX_CONF_OK;
  417. }


  418. static char *
  419. ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
  420. {
  421.     ngx_http_upstream_keepalive_srv_conf_t  *kcf = conf;

  422.     ngx_int_t    n;
  423.     ngx_str_t   *value;

  424.     if (kcf->max_cached != NGX_CONF_UNSET_UINT) {
  425.         return "is duplicate";
  426.     }

  427.     /* read options */

  428.     value = cf->args->elts;

  429.     n = ngx_atoi(value[1].data, value[1].len);

  430.     if (n == NGX_ERROR) {
  431.         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
  432.                            "invalid value \"%V\" in \"%V\" directive",
  433.                            &value[1], &cmd->name);
  434.         return NGX_CONF_ERROR;
  435.     }

  436.     kcf->max_cached = n;

  437.     if (cf->args->nelts == 3) {
  438.         if (ngx_strcmp(value[2].data, "local") == 0) {
  439.             kcf->local = 1;

  440.         } else {
  441.             ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
  442.                                "invalid parameter \"%V\"", &value[2]);
  443.             return NGX_CONF_ERROR;
  444.         }
  445.     }

  446.     return NGX_CONF_OK;
  447. }