src/core/ngx_thread_pool.c - nginx source code

Global variables defined

Data types defined

Functions defined

Macros defined

Source code


  1. /*
  2. * Copyright (C) Nginx, Inc.
  3. * Copyright (C) Valentin V. Bartenev
  4. * Copyright (C) Ruslan Ermilov
  5. */


  6. #include <ngx_config.h>
  7. #include <ngx_core.h>
  8. #include <ngx_thread_pool.h>


  9. typedef struct {
  10.     ngx_array_t               pools;
  11. } ngx_thread_pool_conf_t;


  12. typedef struct {
  13.     ngx_thread_task_t        *first;
  14.     ngx_thread_task_t       **last;
  15. } ngx_thread_pool_queue_t;

  16. #define ngx_thread_pool_queue_init(q)                                         \
  17.     (q)->first = NULL;                                                        \
  18.     (q)->last = &(q)->first


  19. struct ngx_thread_pool_s {
  20.     ngx_thread_mutex_t        mtx;
  21.     ngx_thread_pool_queue_t   queue;
  22.     ngx_int_t                 waiting;
  23.     ngx_thread_cond_t         cond;

  24.     ngx_log_t                *log;

  25.     ngx_str_t                 name;
  26.     ngx_uint_t                threads;
  27.     ngx_int_t                 max_queue;

  28.     u_char                   *file;
  29.     ngx_uint_t                line;
  30. };


  31. static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log,
  32.     ngx_pool_t *pool);
  33. static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp);
  34. static void ngx_thread_pool_exit_handler(void *data, ngx_log_t *log);

  35. static void *ngx_thread_pool_cycle(void *data);
  36. static void ngx_thread_pool_handler(ngx_event_t *ev);

  37. static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);

  38. static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle);
  39. static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf);

  40. static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle);
  41. static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle);


  42. static ngx_command_t  ngx_thread_pool_commands[] = {

  43.     { ngx_string("thread_pool"),
  44.       NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23,
  45.       ngx_thread_pool,
  46.       0,
  47.       0,
  48.       NULL },

  49.       ngx_null_command
  50. };


  51. static ngx_core_module_t  ngx_thread_pool_module_ctx = {
  52.     ngx_string("thread_pool"),
  53.     ngx_thread_pool_create_conf,
  54.     ngx_thread_pool_init_conf
  55. };


  56. ngx_module_t  ngx_thread_pool_module = {
  57.     NGX_MODULE_V1,
  58.     &ngx_thread_pool_module_ctx,           /* module context */
  59.     ngx_thread_pool_commands,              /* module directives */
  60.     NGX_CORE_MODULE,                       /* module type */
  61.     NULL,                                  /* init master */
  62.     NULL,                                  /* init module */
  63.     ngx_thread_pool_init_worker,           /* init process */
  64.     NULL,                                  /* init thread */
  65.     NULL,                                  /* exit thread */
  66.     ngx_thread_pool_exit_worker,           /* exit process */
  67.     NULL,                                  /* exit master */
  68.     NGX_MODULE_V1_PADDING
  69. };


  70. static ngx_str_t  ngx_thread_pool_default = ngx_string("default");

  71. static ngx_uint_t               ngx_thread_pool_task_id;
  72. static ngx_atomic_t             ngx_thread_pool_done_lock;
  73. static ngx_thread_pool_queue_t  ngx_thread_pool_done;


  74. static ngx_int_t
  75. ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
  76. {
  77.     int             err;
  78.     pthread_t       tid;
  79.     ngx_uint_t      n;
  80.     pthread_attr_t  attr;

  81.     if (ngx_notify == NULL) {
  82.         ngx_log_error(NGX_LOG_ALERT, log, 0,
  83.                "the configured event method cannot be used with thread pools");
  84.         return NGX_ERROR;
  85.     }

  86.     ngx_thread_pool_queue_init(&tp->queue);

  87.     if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
  88.         return NGX_ERROR;
  89.     }

  90.     if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
  91.         (void) ngx_thread_mutex_destroy(&tp->mtx, log);
  92.         return NGX_ERROR;
  93.     }

  94.     tp->log = log;

  95.     err = pthread_attr_init(&attr);
  96.     if (err) {
  97.         ngx_log_error(NGX_LOG_ALERT, log, err,
  98.                       "pthread_attr_init() failed");
  99.         return NGX_ERROR;
  100.     }

  101.     err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  102.     if (err) {
  103.         ngx_log_error(NGX_LOG_ALERT, log, err,
  104.                       "pthread_attr_setdetachstate() failed");
  105.         return NGX_ERROR;
  106.     }

  107. #if 0
  108.     err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
  109.     if (err) {
  110.         ngx_log_error(NGX_LOG_ALERT, log, err,
  111.                       "pthread_attr_setstacksize() failed");
  112.         return NGX_ERROR;
  113.     }
  114. #endif

  115.     for (n = 0; n < tp->threads; n++) {
  116.         err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
  117.         if (err) {
  118.             ngx_log_error(NGX_LOG_ALERT, log, err,
  119.                           "pthread_create() failed");
  120.             return NGX_ERROR;
  121.         }
  122.     }

  123.     (void) pthread_attr_destroy(&attr);

  124.     return NGX_OK;
  125. }


  126. static void
  127. ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
  128. {
  129.     ngx_uint_t           n;
  130.     ngx_thread_task_t    task;
  131.     volatile ngx_uint_t  lock;

  132.     ngx_memzero(&task, sizeof(ngx_thread_task_t));

  133.     task.handler = ngx_thread_pool_exit_handler;
  134.     task.ctx = (void *) &lock;

  135.     for (n = 0; n < tp->threads; n++) {
  136.         lock = 1;

  137.         if (ngx_thread_task_post(tp, &task) != NGX_OK) {
  138.             return;
  139.         }

  140.         while (lock) {
  141.             ngx_sched_yield();
  142.         }

  143.         task.event.active = 0;
  144.     }

  145.     (void) ngx_thread_cond_destroy(&tp->cond, tp->log);

  146.     (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);
  147. }


  148. static void
  149. ngx_thread_pool_exit_handler(void *data, ngx_log_t *log)
  150. {
  151.     ngx_uint_t *lock = data;

  152.     *lock = 0;

  153.     pthread_exit(0);
  154. }


  155. ngx_thread_task_t *
  156. ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
  157. {
  158.     ngx_thread_task_t  *task;

  159.     task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size);
  160.     if (task == NULL) {
  161.         return NULL;
  162.     }

  163.     task->ctx = task + 1;

  164.     return task;
  165. }


  166. ngx_int_t
  167. ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
  168. {
  169.     if (task->event.active) {
  170.         ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
  171.                       "task #%ui already active", task->id);
  172.         return NGX_ERROR;
  173.     }

  174.     if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
  175.         return NGX_ERROR;
  176.     }

  177.     if (tp->waiting >= tp->max_queue) {
  178.         (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);

  179.         ngx_log_error(NGX_LOG_ERR, tp->log, 0,
  180.                       "thread pool \"%V\" queue overflow: %i tasks waiting",
  181.                       &tp->name, tp->waiting);
  182.         return NGX_ERROR;
  183.     }

  184.     task->event.active = 1;

  185.     task->id = ngx_thread_pool_task_id++;
  186.     task->next = NULL;

  187.     if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
  188.         (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
  189.         return NGX_ERROR;
  190.     }

  191.     *tp->queue.last = task;
  192.     tp->queue.last = &task->next;

  193.     tp->waiting++;

  194.     (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);

  195.     ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
  196.                    "task #%ui added to thread pool \"%V\"",
  197.                    task->id, &tp->name);

  198.     return NGX_OK;
  199. }


  200. static void *
  201. ngx_thread_pool_cycle(void *data)
  202. {
  203.     ngx_thread_pool_t *tp = data;

  204.     int                 err;
  205.     sigset_t            set;
  206.     ngx_thread_task_t  *task;

  207. #if 0
  208.     ngx_time_update();
  209. #endif

  210.     ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
  211.                    "thread in pool \"%V\" started", &tp->name);

  212.     sigfillset(&set);

  213.     sigdelset(&set, SIGILL);
  214.     sigdelset(&set, SIGFPE);
  215.     sigdelset(&set, SIGSEGV);
  216.     sigdelset(&set, SIGBUS);

  217.     err = pthread_sigmask(SIG_BLOCK, &set, NULL);
  218.     if (err) {
  219.         ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
  220.         return NULL;
  221.     }

  222.     for ( ;; ) {
  223.         if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
  224.             return NULL;
  225.         }

  226.         /* the number may become negative */
  227.         tp->waiting--;

  228.         while (tp->queue.first == NULL) {
  229.             if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
  230.                 != NGX_OK)
  231.             {
  232.                 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
  233.                 return NULL;
  234.             }
  235.         }

  236.         task = tp->queue.first;
  237.         tp->queue.first = task->next;

  238.         if (tp->queue.first == NULL) {
  239.             tp->queue.last = &tp->queue.first;
  240.         }

  241.         if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
  242.             return NULL;
  243.         }

  244. #if 0
  245.         ngx_time_update();
  246. #endif

  247.         ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
  248.                        "run task #%ui in thread pool \"%V\"",
  249.                        task->id, &tp->name);

  250.         task->handler(task->ctx, tp->log);

  251.         ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
  252.                        "complete task #%ui in thread pool \"%V\"",
  253.                        task->id, &tp->name);

  254.         task->next = NULL;

  255.         ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);

  256.         *ngx_thread_pool_done.last = task;
  257.         ngx_thread_pool_done.last = &task->next;

  258.         ngx_memory_barrier();

  259.         ngx_unlock(&ngx_thread_pool_done_lock);

  260.         (void) ngx_notify(ngx_thread_pool_handler);
  261.     }
  262. }


  263. static void
  264. ngx_thread_pool_handler(ngx_event_t *ev)
  265. {
  266.     ngx_event_t        *event;
  267.     ngx_thread_task_t  *task;

  268.     ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler");

  269.     ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);

  270.     task = ngx_thread_pool_done.first;
  271.     ngx_thread_pool_done.first = NULL;
  272.     ngx_thread_pool_done.last = &ngx_thread_pool_done.first;

  273.     ngx_memory_barrier();

  274.     ngx_unlock(&ngx_thread_pool_done_lock);

  275.     while (task) {
  276.         ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
  277.                        "run completion handler for task #%ui", task->id);

  278.         event = &task->event;
  279.         task = task->next;

  280.         event->complete = 1;
  281.         event->active = 0;

  282.         event->handler(event);
  283.     }
  284. }


  285. static void *
  286. ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
  287. {
  288.     ngx_thread_pool_conf_t  *tcf;

  289.     tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
  290.     if (tcf == NULL) {
  291.         return NULL;
  292.     }

  293.     if (ngx_array_init(&tcf->pools, cycle->pool, 4,
  294.                        sizeof(ngx_thread_pool_t *))
  295.         != NGX_OK)
  296.     {
  297.         return NULL;
  298.     }

  299.     return tcf;
  300. }


  301. static char *
  302. ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
  303. {
  304.     ngx_thread_pool_conf_t *tcf = conf;

  305.     ngx_uint_t           i;
  306.     ngx_thread_pool_t  **tpp;

  307.     tpp = tcf->pools.elts;

  308.     for (i = 0; i < tcf->pools.nelts; i++) {

  309.         if (tpp[i]->threads) {
  310.             continue;
  311.         }

  312.         if (tpp[i]->name.len == ngx_thread_pool_default.len
  313.             && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data,
  314.                            ngx_thread_pool_default.len)
  315.                == 0)
  316.         {
  317.             tpp[i]->threads = 32;
  318.             tpp[i]->max_queue = 65536;
  319.             continue;
  320.         }

  321.         ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
  322.                       "unknown thread pool \"%V\" in %s:%ui",
  323.                       &tpp[i]->name, tpp[i]->file, tpp[i]->line);

  324.         return NGX_CONF_ERROR;
  325.     }

  326.     return NGX_CONF_OK;
  327. }


  328. static char *
  329. ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
  330. {
  331.     ngx_str_t          *value;
  332.     ngx_uint_t          i;
  333.     ngx_thread_pool_t  *tp;

  334.     value = cf->args->elts;

  335.     tp = ngx_thread_pool_add(cf, &value[1]);

  336.     if (tp == NULL) {
  337.         return NGX_CONF_ERROR;
  338.     }

  339.     if (tp->threads) {
  340.         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
  341.                            "duplicate thread pool \"%V\"", &tp->name);
  342.         return NGX_CONF_ERROR;
  343.     }

  344.     tp->max_queue = 65536;

  345.     for (i = 2; i < cf->args->nelts; i++) {

  346.         if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {

  347.             tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8);

  348.             if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) {
  349.                 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
  350.                                    "invalid threads value \"%V\"", &value[i]);
  351.                 return NGX_CONF_ERROR;
  352.             }

  353.             continue;
  354.         }

  355.         if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {

  356.             tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);

  357.             if (tp->max_queue == NGX_ERROR) {
  358.                 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
  359.                                    "invalid max_queue value \"%V\"", &value[i]);
  360.                 return NGX_CONF_ERROR;
  361.             }

  362.             continue;
  363.         }
  364.     }

  365.     if (tp->threads == 0) {
  366.         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
  367.                            "\"%V\" must have \"threads\" parameter",
  368.                            &cmd->name);
  369.         return NGX_CONF_ERROR;
  370.     }

  371.     return NGX_CONF_OK;
  372. }


  373. ngx_thread_pool_t *
  374. ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name)
  375. {
  376.     ngx_thread_pool_t       *tp, **tpp;
  377.     ngx_thread_pool_conf_t  *tcf;

  378.     if (name == NULL) {
  379.         name = &ngx_thread_pool_default;
  380.     }

  381.     tp = ngx_thread_pool_get(cf->cycle, name);

  382.     if (tp) {
  383.         return tp;
  384.     }

  385.     tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t));
  386.     if (tp == NULL) {
  387.         return NULL;
  388.     }

  389.     tp->name = *name;
  390.     tp->file = cf->conf_file->file.name.data;
  391.     tp->line = cf->conf_file->line;

  392.     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
  393.                                                   ngx_thread_pool_module);

  394.     tpp = ngx_array_push(&tcf->pools);
  395.     if (tpp == NULL) {
  396.         return NULL;
  397.     }

  398.     *tpp = tp;

  399.     return tp;
  400. }


  401. ngx_thread_pool_t *
  402. ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)
  403. {
  404.     ngx_uint_t                i;
  405.     ngx_thread_pool_t       **tpp;
  406.     ngx_thread_pool_conf_t   *tcf;

  407.     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
  408.                                                   ngx_thread_pool_module);

  409.     tpp = tcf->pools.elts;

  410.     for (i = 0; i < tcf->pools.nelts; i++) {

  411.         if (tpp[i]->name.len == name->len
  412.             && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0)
  413.         {
  414.             return tpp[i];
  415.         }
  416.     }

  417.     return NULL;
  418. }


  419. static ngx_int_t
  420. ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
  421. {
  422.     ngx_uint_t                i;
  423.     ngx_thread_pool_t       **tpp;
  424.     ngx_thread_pool_conf_t   *tcf;

  425.     if (ngx_process != NGX_PROCESS_WORKER
  426.         && ngx_process != NGX_PROCESS_SINGLE)
  427.     {
  428.         return NGX_OK;
  429.     }

  430.     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
  431.                                                   ngx_thread_pool_module);

  432.     if (tcf == NULL) {
  433.         return NGX_OK;
  434.     }

  435.     ngx_thread_pool_queue_init(&ngx_thread_pool_done);

  436.     tpp = tcf->pools.elts;

  437.     for (i = 0; i < tcf->pools.nelts; i++) {
  438.         if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
  439.             return NGX_ERROR;
  440.         }
  441.     }

  442.     return NGX_OK;
  443. }


  444. static void
  445. ngx_thread_pool_exit_worker(ngx_cycle_t *cycle)
  446. {
  447.     ngx_uint_t                i;
  448.     ngx_thread_pool_t       **tpp;
  449.     ngx_thread_pool_conf_t   *tcf;

  450.     if (ngx_process != NGX_PROCESS_WORKER
  451.         && ngx_process != NGX_PROCESS_SINGLE)
  452.     {
  453.         return;
  454.     }

  455.     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
  456.                                                   ngx_thread_pool_module);

  457.     if (tcf == NULL) {
  458.         return;
  459.     }

  460.     tpp = tcf->pools.elts;

  461.     for (i = 0; i < tcf->pools.nelts; i++) {
  462.         ngx_thread_pool_destroy(tpp[i]);
  463.     }
  464. }