src/stream/ngx_stream_upstream_least_time_module.c - nginx

Global variables defined

Data types defined

Functions defined

Macros defined

Source code


  1. /*
  2. * Copyright (C) Nginx, Inc.
  3. */


  4. #include <ngx_config.h>
  5. #include <ngx_core.h>
  6. #include <ngx_stream.h>


  7. #define NGX_STREAM_UPSTREAM_LT_CONNECT     0
  8. #define NGX_STREAM_UPSTREAM_LT_FIRST_BYTE  1
  9. #define NGX_STREAM_UPSTREAM_LT_LAST_BYTE   2


  10. typedef struct {
  11.     ngx_uint_t                           mode;
  12.     ngx_uint_t                           use_inflight;
  13.                                                 /* unsigned  use_inflight:1; */
  14. } ngx_stream_upstream_lt_conf_t;


  15. typedef struct {
  16.     /* the round robin data must be first */
  17.     ngx_stream_upstream_rr_peer_data_t   rrp;

  18.     ngx_stream_upstream_lt_conf_t       *conf;
  19.     ngx_uint_t                           inflight;  /* unsigned  inflight:1; */
  20.     ngx_stream_upstream_t               *upstream;
  21. } ngx_stream_upstream_lt_peer_data_t;


  22. static ngx_int_t ngx_stream_upstream_init_least_time_peer(
  23.     ngx_stream_session_t *s, ngx_stream_upstream_srv_conf_t *us);
  24. static ngx_int_t ngx_stream_upstream_get_least_time_peer(
  25.     ngx_peer_connection_t *pc, void *data);
  26. static ngx_uint_t ngx_stream_upstream_least_time_eta(
  27.     ngx_stream_upstream_lt_peer_data_t *ltp,
  28.     ngx_stream_upstream_rr_peer_t *peer);
  29. static void ngx_stream_upstream_least_time_notify(ngx_peer_connection_t *pc,
  30.     void *data, ngx_uint_t type);
  31. static void ngx_stream_upstream_least_time_inflight_done(
  32.     ngx_stream_upstream_lt_peer_data_t *ltp,
  33.     ngx_stream_upstream_rr_peers_t *peers, ngx_stream_upstream_rr_peer_t *peer,
  34.     ngx_msec_t last);
  35. static void ngx_stream_upstream_free_least_time_peer(ngx_peer_connection_t *pc,
  36.     void *data, ngx_uint_t state);

  37. static void *ngx_stream_upstream_least_time_create_conf(ngx_conf_t *cf);
  38. static char *ngx_stream_upstream_least_time(ngx_conf_t *cf, ngx_command_t *cmd,
  39.     void *conf);


  40. static ngx_conf_enum_t  ngx_stream_upstream_least_time_mode[] = {
  41.     { ngx_string("connect"), NGX_STREAM_UPSTREAM_LT_CONNECT },
  42.     { ngx_string("first_byte"), NGX_STREAM_UPSTREAM_LT_FIRST_BYTE },
  43.     { ngx_string("last_byte"), NGX_STREAM_UPSTREAM_LT_LAST_BYTE },
  44.     { ngx_null_string, 0 }
  45. };


  46. static ngx_command_t  ngx_stream_upstream_least_time_commands[] = {

  47.     { ngx_string("least_time"),
  48.       NGX_STREAM_UPS_CONF|NGX_CONF_TAKE12,
  49.       ngx_stream_upstream_least_time,
  50.       NGX_STREAM_SRV_CONF_OFFSET,
  51.       offsetof(ngx_stream_upstream_lt_conf_t, mode),
  52.       &ngx_stream_upstream_least_time_mode },

  53.       ngx_null_command
  54. };


  55. static ngx_stream_module_t  ngx_stream_upstream_least_time_module_ctx = {
  56.     NULL,                                  /* preconfiguration */
  57.     NULL,                                  /* postconfiguration */

  58.     NULL,                                  /* create main configuration */
  59.     NULL,                                  /* init main configuration */

  60.     ngx_stream_upstream_least_time_create_conf,
  61.                                            /* create server configuration */
  62.     NULL,                                  /* merge server configuration */
  63. };


  64. ngx_module_t  ngx_stream_upstream_least_time_module = {
  65.     NGX_MODULE_V1,
  66.     &ngx_stream_upstream_least_time_module_ctx, /* module context */
  67.     ngx_stream_upstream_least_time_commands,    /* module directives */
  68.     NGX_STREAM_MODULE,                          /* module type */
  69.     NULL,                                       /* init master */
  70.     NULL,                                       /* init module */
  71.     NULL,                                       /* init process */
  72.     NULL,                                       /* init thread */
  73.     NULL,                                       /* exit thread */
  74.     NULL,                                       /* exit process */
  75.     NULL,                                       /* exit master */
  76.     NGX_MODULE_V1_PADDING
  77. };


  78. static ngx_int_t
  79. ngx_stream_upstream_init_least_time(ngx_conf_t *cf,
  80.     ngx_stream_upstream_srv_conf_t *us)
  81. {
  82.     ngx_log_debug0(NGX_LOG_DEBUG_STREAM, cf->log, 0,
  83.                    "init least time");

  84.     if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
  85.         return NGX_ERROR;
  86.     }

  87.     us->peer.init = ngx_stream_upstream_init_least_time_peer;

  88.     return NGX_OK;
  89. }


  90. static ngx_int_t
  91. ngx_stream_upstream_init_least_time_peer(ngx_stream_session_t *s,
  92.     ngx_stream_upstream_srv_conf_t *us)
  93. {
  94.     ngx_stream_upstream_lt_conf_t       *ltcf;
  95.     ngx_stream_upstream_lt_peer_data_t  *ltp;

  96.     ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
  97.                    "init least time peer");

  98.     ltp = ngx_pcalloc(s->connection->pool,
  99.                       sizeof(ngx_stream_upstream_lt_peer_data_t));
  100.     if (ltp == NULL) {
  101.         return NGX_ERROR;
  102.     }

  103.     s->upstream->peer.data = &ltp->rrp;

  104.     if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) {
  105.         return NGX_ERROR;
  106.     }

  107.     s->upstream->peer.get = ngx_stream_upstream_get_least_time_peer;
  108.     s->upstream->peer.free = ngx_stream_upstream_free_least_time_peer;

  109.     ltp->upstream = s->upstream;

  110.     ltcf = ngx_stream_conf_upstream_srv_conf(us,
  111.                                         ngx_stream_upstream_least_time_module);

  112.     if (ltcf->use_inflight) {
  113.         s->upstream->peer.notify = ngx_stream_upstream_least_time_notify;
  114.     }

  115.     ltp->conf = ltcf;

  116.     return NGX_OK;
  117. }


  118. static ngx_int_t
  119. ngx_stream_upstream_get_least_time_peer(ngx_peer_connection_t *pc, void *data)
  120. {
  121.     ngx_stream_upstream_lt_peer_data_t *ltp = data;

  122.     time_t                               now;
  123.     uintptr_t                            m;
  124.     ngx_int_t                            rc, total;
  125.     ngx_uint_t                           i, n, p, many, eta, best_eta;
  126.     ngx_msec_t                           ift;
  127.     ngx_stream_upstream_rr_peer_t       *peer, *best;
  128.     ngx_stream_upstream_rr_peers_t      *peers;
  129.     ngx_stream_upstream_rr_peer_data_t  *rrp;

  130.     ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
  131.                    "get least time peer, try: %ui", pc->tries);

  132.     rrp = &ltp->rrp;

  133.     if (rrp->peers->single) {
  134.         return ngx_stream_upstream_get_round_robin_peer(pc, rrp);
  135.     }

  136.     pc->cached = 0;
  137.     pc->connection = NULL;

  138.     now = ngx_time();

  139.     peers = rrp->peers;

  140.     ngx_stream_upstream_rr_peers_wlock(peers);

  141. #if (NGX_STREAM_UPSTREAM_ZONE)
  142.     if (peers->config && rrp->config != *peers->config) {
  143.         goto busy;
  144.     }
  145. #endif

  146.     best = NULL;
  147.     total = 0;

  148. #if (NGX_SUPPRESS_WARN)
  149.     many = 0;
  150.     p = 0;
  151.     best_eta = 0;
  152. #endif

  153.     for (peer = peers->peer, i = 0;
  154.          peer;
  155.          peer = peer->next, i++)
  156.     {
  157.         n = i / (8 * sizeof(uintptr_t));
  158.         m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));

  159.         if (rrp->tried[n] & m) {
  160.             continue;
  161.         }

  162.         if (peer->down) {
  163.             continue;
  164.         }

  165.         if (peer->max_fails
  166.             && peer->fails >= peer->max_fails
  167.             && now - peer->checked <= peer->fail_timeout)
  168.         {
  169.             continue;
  170.         }

  171.         if (peer->max_conns && peer->conns >= peer->max_conns) {
  172.             continue;
  173.         }

  174.         if (peer->inflight_reqs > 0) {

  175.             ift = peer->inflight_last / peer->inflight_reqs
  176.                   + (ngx_current_msec - peer->inflight_reqs_changed);

  177.             ngx_stream_upstream_response_time_avg(&peer->inflight_time, ift);
  178.         }

  179.         /*
  180.          * select peer with least estimated time of processing; if there are
  181.          * multiple peers with the same time, select based on round-robin
  182.          */

  183.         eta = ngx_stream_upstream_least_time_eta(ltp, peer);

  184.         if (best == NULL
  185.             || eta * best->weight < best_eta * peer->weight)
  186.         {
  187.             best = peer;
  188.             best_eta = eta;
  189.             many = 0;
  190.             p = i;

  191.         } else if (eta * best->weight == best_eta * peer->weight) {
  192.             many = 1;
  193.         }
  194.     }

  195.     if (best == NULL) {
  196.         ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
  197.                        "get least time peer, no peer found");

  198.         goto failed;
  199.     }

  200.     if (many) {
  201.         ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
  202.                        "get least time peer, many");

  203.         for (peer = best, i = p;
  204.              peer;
  205.              peer = peer->next, i++)
  206.         {
  207.             n = i / (8 * sizeof(uintptr_t));
  208.             m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));

  209.             if (rrp->tried[n] & m) {
  210.                 continue;
  211.             }

  212.             if (peer->down) {
  213.                 continue;
  214.             }

  215.             eta = ngx_stream_upstream_least_time_eta(ltp, peer);

  216.             if (eta * best->weight != best_eta * peer->weight) {
  217.                 continue;
  218.             }

  219.             if (peer->max_fails
  220.                 && peer->fails >= peer->max_fails
  221.                 && now - peer->checked <= peer->fail_timeout)
  222.             {
  223.                 continue;
  224.             }

  225.             if (peer->max_conns && peer->conns >= peer->max_conns) {
  226.                 continue;
  227.             }

  228.             peer->current_weight += peer->effective_weight;
  229.             total += peer->effective_weight;

  230.             if (peer->effective_weight < peer->weight) {
  231.                 peer->effective_weight++;
  232.             }

  233.             if (peer->current_weight > best->current_weight) {
  234.                 best = peer;
  235.                 p = i;
  236.             }
  237.         }
  238.     }

  239.     best->current_weight -= total;

  240.     if (ltp->conf->use_inflight) {
  241.         if (best->inflight_reqs > 0) {
  242.             /* account time spent by inflight requests */
  243.             best->inflight_last +=
  244.                                (ngx_current_msec - best->inflight_reqs_changed)
  245.                                * best->inflight_reqs;
  246.         }

  247.         best->inflight_reqs_changed = ngx_current_msec;
  248.         best->inflight_reqs++;

  249.         ltp->inflight = 1;
  250.     }

  251.     if (now - best->checked > best->fail_timeout) {
  252.         best->checked = now;
  253.     }

  254.     pc->sockaddr = best->sockaddr;
  255.     pc->socklen = best->socklen;
  256.     pc->name = &best->name;

  257.     best->conns++;

  258.     rrp->current = best;
  259.     ngx_stream_upstream_rr_peer_ref(peers, best);

  260.     n = p / (8 * sizeof(uintptr_t));
  261.     m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));

  262.     rrp->tried[n] |= m;

  263.     ngx_stream_upstream_rr_peers_unlock(peers);

  264.     return NGX_OK;

  265. failed:

  266.     if (peers->next) {
  267.         ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
  268.                        "get least time peer, backup servers");

  269.         rrp->peers = peers->next;

  270.         n = (rrp->peers->number + (8 * sizeof(uintptr_t) - 1))
  271.                 / (8 * sizeof(uintptr_t));

  272.         for (i = 0; i < n; i++) {
  273.              rrp->tried[i] = 0;
  274.         }

  275.         ngx_stream_upstream_rr_peers_unlock(peers);

  276.         rc = ngx_stream_upstream_get_least_time_peer(pc, rrp);

  277.         if (rc != NGX_BUSY) {
  278.             return rc;
  279.         }

  280.         ngx_stream_upstream_rr_peers_wlock(peers);
  281.     }

  282. #if (NGX_STREAM_UPSTREAM_ZONE)
  283. busy:
  284. #endif

  285.     ngx_stream_upstream_rr_peers_unlock(peers);

  286.     pc->name = peers->name;

  287.     return NGX_BUSY;
  288. }


  289. static ngx_uint_t
  290. ngx_stream_upstream_least_time_eta(ngx_stream_upstream_lt_peer_data_t *ltp,
  291.     ngx_stream_upstream_rr_peer_t *peer)
  292. {
  293.     time_t      now;
  294.     ngx_msec_t  rt;

  295.     switch (ltp->conf->mode) {

  296.     case NGX_STREAM_UPSTREAM_LT_FIRST_BYTE:
  297.         rt = peer->first_byte_time;
  298.         break;

  299.     case NGX_STREAM_UPSTREAM_LT_CONNECT:
  300.         rt = peer->connect_time;
  301.         break;

  302.     default: /* NGX_STREAM_UPSTREAM_LT_LAST_BYTE */
  303.         rt = peer->response_time;
  304.     }

  305.     now = ngx_time();

  306.     if (now - peer->checked > peer->fail_timeout) {
  307.         /*
  308.          * once in fail_timeout make response time of a peer 2 times
  309.          * lower to give chances to slow peers
  310.          */
  311.         rt >>= (now - peer->checked) / (peer->fail_timeout + 1);
  312.     }

  313.     if (peer->inflight_reqs > 0) {
  314.         /*
  315.          * average inflight time exceeding average response time indicates
  316.          * bad (low priority) peer
  317.          */
  318.         rt = ngx_max(rt, peer->inflight_time);
  319.     }

  320.     if (rt > 5000) {
  321.         /*
  322.          * consider peers with response time greater than max equally bad
  323.          * and thus fallback to least_conns
  324.          */
  325.         rt = 5000;

  326.     } else {
  327.         /*
  328.          * divide response times into clusters to allow round-robin for peers
  329.          * with close response times
  330.          */
  331.         rt += 20 - rt % 20;
  332.     }

  333.     /*
  334.      * estimated time peer has to spend to finish processing current requests
  335.      */
  336.     return rt * (1 + peer->conns);
  337. }


  338. static void
  339. ngx_stream_upstream_least_time_notify(ngx_peer_connection_t *pc, void *data,
  340.     ngx_uint_t type)
  341. {
  342.     ngx_stream_upstream_lt_peer_data_t *ltp = data;

  343.     ngx_msec_t                           last, *metric;
  344.     ngx_stream_upstream_t               *u;
  345.     ngx_stream_upstream_rr_peer_t       *peer;
  346.     ngx_stream_upstream_rr_peers_t      *peers;
  347.     ngx_stream_upstream_rr_peer_data_t  *rrp;

  348.     rrp = &ltp->rrp;

  349.     peers = rrp->peers;
  350.     peer = rrp->current;

  351.     u = ltp->upstream;

  352.     /*
  353.      * Only update average time here if needed for balancing.
  354.      * Otherwise, it will be updated in peer.free().
  355.      */

  356. #if (NGX_SUPPRESS_WARN)
  357.     last = 0;
  358. #endif
  359.     metric = NULL;

  360.     switch (type) {

  361.     case NGX_STREAM_UPSTREAM_NOTIFY_CONNECT:

  362.         if (ltp->conf->mode == NGX_STREAM_UPSTREAM_LT_CONNECT) {
  363.             last = u->state->connect_time;
  364.             metric = &peer->connect_time;
  365.         }

  366.         break;

  367.     case NGX_STREAM_UPSTREAM_NOTIFY_FIRST_BYTE:

  368.         if (ltp->conf->mode == NGX_STREAM_UPSTREAM_LT_FIRST_BYTE) {
  369.             last = u->state->first_byte_time;
  370.             metric = &peer->first_byte_time;
  371.         }

  372.         break;
  373.     }

  374.     ngx_stream_upstream_rr_peers_rlock(peers);
  375.     ngx_stream_upstream_rr_peer_lock(peers, peer);

  376.     if (metric) {
  377.         ngx_stream_upstream_response_time_avg(metric, last);

  378.         if (ltp->inflight) {
  379.             ngx_stream_upstream_least_time_inflight_done(ltp, peers, peer,
  380.                                                          last);
  381.         }
  382.     }

  383.     ngx_stream_upstream_notify_round_robin_peer_locked(pc, data, type);
  384. }


  385. static void
  386. ngx_stream_upstream_least_time_inflight_done(
  387.     ngx_stream_upstream_lt_peer_data_t *ltp,
  388.     ngx_stream_upstream_rr_peers_t *peers, ngx_stream_upstream_rr_peer_t *peer,
  389.     ngx_msec_t last)
  390. {
  391.     if (peer->inflight_reqs == 1) {
  392.         /* no more inflight requests */
  393.         peer->inflight_last = 0;

  394.     } else {

  395.         /*
  396.          * account time spent by inflight requests and forget about
  397.          * request "completed" right now
  398.          */
  399.         peer->inflight_last += (ngx_current_msec - peer->inflight_reqs_changed)
  400.                                * peer->inflight_reqs - last;
  401.         peer->inflight_reqs_changed = ngx_current_msec;
  402.     }

  403.     peer->inflight_reqs--;
  404.     ltp->inflight = 0;
  405. }


  406. static void
  407. ngx_stream_upstream_free_least_time_peer(ngx_peer_connection_t *pc,
  408.     void *data, ngx_uint_t state)
  409. {
  410.     ngx_stream_upstream_lt_peer_data_t *ltp = data;

  411.     ngx_stream_upstream_t               *u;
  412.     ngx_stream_upstream_rr_peer_t       *peer;
  413.     ngx_stream_upstream_rr_peers_t      *peers;
  414.     ngx_stream_upstream_rr_peer_data_t  *rrp;

  415.     ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "free least time peer");

  416.     rrp = &ltp->rrp;
  417.     peers = rrp->peers;
  418.     peer = rrp->current;

  419.     u = ltp->upstream;

  420.     ngx_stream_upstream_rr_peers_rlock(peers);
  421.     ngx_stream_upstream_rr_peer_lock(peers, peer);

  422.     if (ltp->inflight) {
  423.         ngx_stream_upstream_least_time_inflight_done(ltp, peers, peer,
  424.                                                      u->state->response_time);
  425.     }

  426.     /*
  427.      * only successful attempts are accounted to mitigate preferring
  428.      * of failing peers
  429.      */
  430.     if (!(state & (NGX_PEER_FAILED|NGX_PEER_NEXT))) {
  431.         ngx_stream_upstream_response_time_avg(&peer->response_time,
  432.                                               u->state->response_time);

  433.         if (!ltp->conf->use_inflight
  434.             || ltp->conf->mode != NGX_STREAM_UPSTREAM_LT_CONNECT)
  435.         {
  436.             ngx_stream_upstream_response_time_avg(&peer->connect_time,
  437.                                                   u->state->connect_time);
  438.         }

  439.         if (u->state->first_byte_time != (ngx_msec_t) -1
  440.             && (!ltp->conf->use_inflight
  441.                 || ltp->conf->mode != NGX_STREAM_UPSTREAM_LT_FIRST_BYTE))
  442.         {
  443.             ngx_stream_upstream_response_time_avg(&peer->first_byte_time,
  444.                                                   u->state->first_byte_time);
  445.         }
  446.     }

  447.     ngx_stream_upstream_free_round_robin_peer_locked(pc, rrp, state);
  448. }


  449. static void *
  450. ngx_stream_upstream_least_time_create_conf(ngx_conf_t *cf)
  451. {
  452.     ngx_stream_upstream_lt_conf_t  *conf;

  453.     conf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_lt_conf_t));
  454.     if (conf == NULL) {
  455.         return NULL;
  456.     }

  457.     /*
  458.      * set by ngx_pcalloc():
  459.      *
  460.      *     conf->use_inflight = 0;
  461.      */

  462.     conf->mode = NGX_CONF_UNSET_UINT;

  463.     return conf;
  464. }


  465. static char *
  466. ngx_stream_upstream_least_time(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
  467. {
  468.     ngx_str_t                       *value;
  469.     ngx_stream_upstream_lt_conf_t   *ltcf;
  470.     ngx_stream_upstream_srv_conf_t  *uscf;

  471.     uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module);

  472.     if (uscf->peer.init_upstream) {
  473.         ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
  474.                            "load balancing method redefined");
  475.     }

  476.     uscf->peer.init_upstream = ngx_stream_upstream_init_least_time;

  477.     uscf->flags = NGX_STREAM_UPSTREAM_CREATE
  478.                   |NGX_STREAM_UPSTREAM_MODIFY
  479.                   |NGX_STREAM_UPSTREAM_WEIGHT
  480.                   |NGX_STREAM_UPSTREAM_MAX_CONNS
  481.                   |NGX_STREAM_UPSTREAM_MAX_FAILS
  482.                   |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
  483.                   |NGX_STREAM_UPSTREAM_DOWN
  484.                   |NGX_STREAM_UPSTREAM_BACKUP;

  485.     if (cf->args->nelts == 3) {
  486.         value = cf->args->elts;
  487.         ltcf = ngx_stream_conf_upstream_srv_conf(uscf,
  488.                                         ngx_stream_upstream_least_time_module);
  489.         if (ngx_strcmp(value[2].data, "inflight") == 0) {
  490.             ltcf->use_inflight = 1;

  491.         } else {
  492.             ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
  493.                                "invalid parameter \"%V\"", &value[2]);
  494.             return NGX_CONF_ERROR;
  495.         }
  496.     }

  497.     return ngx_conf_set_enum_slot(cf, cmd, conf);
  498. }