src/event/ngx_event_pipe.c - nginx source code

Functions defined

Source code


  1. /*
  2. * Copyright (C) Igor Sysoev
  3. * Copyright (C) Nginx, Inc.
  4. */


  5. #include <ngx_config.h>
  6. #include <ngx_core.h>
  7. #include <ngx_event.h>
  8. #include <ngx_event_pipe.h>


  9. static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
  10. static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);

  11. static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
  12. static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
  13. static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);


  14. ngx_int_t
  15. ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
  16. {
  17.     ngx_int_t     rc;
  18.     ngx_uint_t    flags;
  19.     ngx_event_t  *rev, *wev;

  20.     for ( ;; ) {
  21.         if (do_write) {
  22.             p->log->action = "sending to client";

  23.             rc = ngx_event_pipe_write_to_downstream(p);

  24.             if (rc == NGX_ABORT) {
  25.                 return NGX_ABORT;
  26.             }

  27.             if (rc == NGX_BUSY) {
  28.                 return NGX_OK;
  29.             }
  30.         }

  31.         p->read = 0;
  32.         p->upstream_blocked = 0;

  33.         p->log->action = "reading upstream";

  34.         if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
  35.             return NGX_ABORT;
  36.         }

  37.         if (!p->read && !p->upstream_blocked) {
  38.             break;
  39.         }

  40.         do_write = 1;
  41.     }

  42.     if (p->upstream
  43.         && p->upstream->fd != (ngx_socket_t) -1)
  44.     {
  45.         rev = p->upstream->read;

  46.         flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;

  47.         if (ngx_handle_read_event(rev, flags) != NGX_OK) {
  48.             return NGX_ABORT;
  49.         }

  50.         if (!rev->delayed) {
  51.             if (rev->active && !rev->ready) {
  52.                 ngx_add_timer(rev, p->read_timeout);

  53.             } else if (rev->timer_set) {
  54.                 ngx_del_timer(rev);
  55.             }
  56.         }
  57.     }

  58.     if (p->downstream->fd != (ngx_socket_t) -1
  59.         && p->downstream->data == p->output_ctx)
  60.     {
  61.         wev = p->downstream->write;
  62.         if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
  63.             return NGX_ABORT;
  64.         }

  65.         if (!wev->delayed) {
  66.             if (wev->active && !wev->ready) {
  67.                 ngx_add_timer(wev, p->send_timeout);

  68.             } else if (wev->timer_set) {
  69.                 ngx_del_timer(wev);
  70.             }
  71.         }
  72.     }

  73.     return NGX_OK;
  74. }


  75. static ngx_int_t
  76. ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
  77. {
  78.     off_t         limit;
  79.     ssize_t       n, size;
  80.     ngx_int_t     rc;
  81.     ngx_buf_t    *b;
  82.     ngx_msec_t    delay;
  83.     ngx_chain_t  *chain, *cl, *ln;

  84.     if (p->upstream_eof || p->upstream_error || p->upstream_done
  85.         || p->upstream == NULL)
  86.     {
  87.         return NGX_OK;
  88.     }

  89. #if (NGX_THREADS)

  90.     if (p->aio) {
  91.         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
  92.                        "pipe read upstream: aio");
  93.         return NGX_AGAIN;
  94.     }

  95.     if (p->writing) {
  96.         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
  97.                        "pipe read upstream: writing");

  98.         rc = ngx_event_pipe_write_chain_to_temp_file(p);

  99.         if (rc != NGX_OK) {
  100.             return rc;
  101.         }
  102.     }

  103. #endif

  104.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
  105.                    "pipe read upstream: %d", p->upstream->read->ready);

  106.     for ( ;; ) {

  107.         if (p->upstream_eof || p->upstream_error || p->upstream_done) {
  108.             break;
  109.         }

  110.         if (p->preread_bufs == NULL && !p->upstream->read->ready) {
  111.             break;
  112.         }

  113.         if (p->preread_bufs) {

  114.             /* use the pre-read bufs if they exist */

  115.             chain = p->preread_bufs;
  116.             p->preread_bufs = NULL;
  117.             n = p->preread_size;

  118.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
  119.                            "pipe preread: %z", n);

  120.             if (n) {
  121.                 p->read = 1;
  122.             }

  123.         } else {

  124. #if (NGX_HAVE_KQUEUE)

  125.             /*
  126.              * kqueue notifies about the end of file or a pending error.
  127.              * This test allows not to allocate a buf on these conditions
  128.              * and not to call c->recv_chain().
  129.              */

  130.             if (p->upstream->read->available == 0
  131.                 && p->upstream->read->pending_eof
  132. #if (NGX_SSL)
  133.                 && !p->upstream->ssl
  134. #endif
  135.                 )
  136.             {
  137.                 p->upstream->read->ready = 0;
  138.                 p->upstream->read->eof = 1;
  139.                 p->upstream_eof = 1;
  140.                 p->read = 1;

  141.                 if (p->upstream->read->kq_errno) {
  142.                     p->upstream->read->error = 1;
  143.                     p->upstream_error = 1;
  144.                     p->upstream_eof = 0;

  145.                     ngx_log_error(NGX_LOG_ERR, p->log,
  146.                                   p->upstream->read->kq_errno,
  147.                                   "kevent() reported that upstream "
  148.                                   "closed connection");
  149.                 }

  150.                 break;
  151.             }
  152. #endif

  153.             if (p->limit_rate) {
  154.                 if (p->upstream->read->delayed) {
  155.                     break;
  156.                 }

  157.                 limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
  158.                         - p->read_length;

  159.                 if (limit <= 0) {
  160.                     p->upstream->read->delayed = 1;
  161.                     delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
  162.                     ngx_add_timer(p->upstream->read, delay);
  163.                     break;
  164.                 }

  165.             } else {
  166.                 limit = 0;
  167.             }

  168.             if (p->free_raw_bufs) {

  169.                 /* use the free bufs if they exist */

  170.                 chain = p->free_raw_bufs;
  171.                 if (p->single_buf) {
  172.                     p->free_raw_bufs = p->free_raw_bufs->next;
  173.                     chain->next = NULL;
  174.                 } else {
  175.                     p->free_raw_bufs = NULL;
  176.                 }

  177.             } else if (p->allocated < p->bufs.num) {

  178.                 /* allocate a new buf if it's still allowed */

  179.                 b = ngx_create_temp_buf(p->pool, p->bufs.size);
  180.                 if (b == NULL) {
  181.                     return NGX_ABORT;
  182.                 }

  183.                 p->allocated++;

  184.                 chain = ngx_alloc_chain_link(p->pool);
  185.                 if (chain == NULL) {
  186.                     return NGX_ABORT;
  187.                 }

  188.                 chain->buf = b;
  189.                 chain->next = NULL;

  190.             } else if (!p->cacheable
  191.                        && p->downstream->data == p->output_ctx
  192.                        && p->downstream->write->ready
  193.                        && !p->downstream->write->delayed)
  194.             {
  195.                 /*
  196.                  * if the bufs are not needed to be saved in a cache and
  197.                  * a downstream is ready then write the bufs to a downstream
  198.                  */

  199.                 p->upstream_blocked = 1;

  200.                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
  201.                                "pipe downstream ready");

  202.                 break;

  203.             } else if (p->cacheable
  204.                        || p->temp_file->offset < p->max_temp_file_size)
  205.             {

  206.                 /*
  207.                  * if it is allowed, then save some bufs from p->in
  208.                  * to a temporary file, and add them to a p->out chain
  209.                  */

  210.                 rc = ngx_event_pipe_write_chain_to_temp_file(p);

  211.                 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
  212.                                "pipe temp offset: %O", p->temp_file->offset);

  213.                 if (rc == NGX_BUSY) {
  214.                     break;
  215.                 }

  216.                 if (rc != NGX_OK) {
  217.                     return rc;
  218.                 }

  219.                 chain = p->free_raw_bufs;
  220.                 if (p->single_buf) {
  221.                     p->free_raw_bufs = p->free_raw_bufs->next;
  222.                     chain->next = NULL;
  223.                 } else {
  224.                     p->free_raw_bufs = NULL;
  225.                 }

  226.             } else {

  227.                 /* there are no bufs to read in */

  228.                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
  229.                                "no pipe bufs to read in");

  230.                 break;
  231.             }

  232.             n = p->upstream->recv_chain(p->upstream, chain, limit);

  233.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
  234.                            "pipe recv chain: %z", n);

  235.             if (p->free_raw_bufs) {
  236.                 chain->next = p->free_raw_bufs;
  237.             }
  238.             p->free_raw_bufs = chain;

  239.             if (n == NGX_ERROR) {
  240.                 p->upstream_error = 1;
  241.                 break;
  242.             }

  243.             if (n == NGX_AGAIN) {
  244.                 if (p->single_buf) {
  245.                     ngx_event_pipe_remove_shadow_links(chain->buf);
  246.                 }

  247.                 break;
  248.             }

  249.             p->read = 1;

  250.             if (n == 0) {
  251.                 p->upstream_eof = 1;
  252.                 break;
  253.             }
  254.         }

  255.         delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;

  256.         p->read_length += n;
  257.         cl = chain;
  258.         p->free_raw_bufs = NULL;

  259.         while (cl && n > 0) {

  260.             ngx_event_pipe_remove_shadow_links(cl->buf);

  261.             size = cl->buf->end - cl->buf->last;

  262.             if (n >= size) {
  263.                 cl->buf->last = cl->buf->end;

  264.                 /* STUB */ cl->buf->num = p->num++;

  265.                 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
  266.                     return NGX_ABORT;
  267.                 }

  268.                 n -= size;
  269.                 ln = cl;
  270.                 cl = cl->next;
  271.                 ngx_free_chain(p->pool, ln);

  272.             } else {
  273.                 cl->buf->last += n;
  274.                 n = 0;
  275.             }
  276.         }

  277.         if (cl) {
  278.             for (ln = cl; ln->next; ln = ln->next) { /* void */ }

  279.             ln->next = p->free_raw_bufs;
  280.             p->free_raw_bufs = cl;
  281.         }

  282.         if (delay > 0) {
  283.             p->upstream->read->delayed = 1;
  284.             ngx_add_timer(p->upstream->read, delay);
  285.             break;
  286.         }
  287.     }

  288. #if (NGX_DEBUG)

  289.     for (cl = p->busy; cl; cl = cl->next) {
  290.         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
  291.                        "pipe buf busy s:%d t:%d f:%d "
  292.                        "%p, pos %p, size: %z "
  293.                        "file: %O, size: %O",
  294.                        (cl->buf->shadow ? 1 : 0),
  295.                        cl->buf->temporary, cl->buf->in_file,
  296.                        cl->buf->start, cl->buf->pos,
  297.                        cl->buf->last - cl->buf->pos,
  298.                        cl->buf->file_pos,
  299.                        cl->buf->file_last - cl->buf->file_pos);
  300.     }

  301.     for (cl = p->out; cl; cl = cl->next) {
  302.         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
  303.                        "pipe buf out  s:%d t:%d f:%d "
  304.                        "%p, pos %p, size: %z "
  305.                        "file: %O, size: %O",
  306.                        (cl->buf->shadow ? 1 : 0),
  307.                        cl->buf->temporary, cl->buf->in_file,
  308.                        cl->buf->start, cl->buf->pos,
  309.                        cl->buf->last - cl->buf->pos,
  310.                        cl->buf->file_pos,
  311.                        cl->buf->file_last - cl->buf->file_pos);
  312.     }

  313.     for (cl = p->in; cl; cl = cl->next) {
  314.         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
  315.                        "pipe buf in   s:%d t:%d f:%d "
  316.                        "%p, pos %p, size: %z "
  317.                        "file: %O, size: %O",
  318.                        (cl->buf->shadow ? 1 : 0),
  319.                        cl->buf->temporary, cl->buf->in_file,
  320.                        cl->buf->start, cl->buf->pos,
  321.                        cl->buf->last - cl->buf->pos,
  322.                        cl->buf->file_pos,
  323.                        cl->buf->file_last - cl->buf->file_pos);
  324.     }

  325.     for (cl = p->free_raw_bufs; cl; cl = cl->next) {
  326.         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
  327.                        "pipe buf free s:%d t:%d f:%d "
  328.                        "%p, pos %p, size: %z "
  329.                        "file: %O, size: %O",
  330.                        (cl->buf->shadow ? 1 : 0),
  331.                        cl->buf->temporary, cl->buf->in_file,
  332.                        cl->buf->start, cl->buf->pos,
  333.                        cl->buf->last - cl->buf->pos,
  334.                        cl->buf->file_pos,
  335.                        cl->buf->file_last - cl->buf->file_pos);
  336.     }

  337.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
  338.                    "pipe length: %O", p->length);

  339. #endif

  340.     if (p->free_raw_bufs && p->length != -1) {
  341.         cl = p->free_raw_bufs;

  342.         if (cl->buf->last - cl->buf->pos >= p->length) {

  343.             p->free_raw_bufs = cl->next;

  344.             /* STUB */ cl->buf->num = p->num++;

  345.             if (p->input_filter(p, cl->buf) == NGX_ERROR) {
  346.                 return NGX_ABORT;
  347.             }

  348.             ngx_free_chain(p->pool, cl);
  349.         }
  350.     }

  351.     if (p->length == 0) {
  352.         p->upstream_done = 1;
  353.         p->read = 1;
  354.     }

  355.     if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {

  356.         /* STUB */ p->free_raw_bufs->buf->num = p->num++;

  357.         if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
  358.             return NGX_ABORT;
  359.         }

  360.         p->free_raw_bufs = p->free_raw_bufs->next;

  361.         if (p->free_bufs && p->buf_to_file == NULL) {
  362.             for (cl = p->free_raw_bufs; cl; cl = cl->next) {
  363.                 if (cl->buf->shadow == NULL) {
  364.                     ngx_pfree(p->pool, cl->buf->start);
  365.                 }
  366.             }
  367.         }
  368.     }

  369.     if (p->cacheable && (p->in || p->buf_to_file)) {

  370.         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
  371.                        "pipe write chain");

  372.         rc = ngx_event_pipe_write_chain_to_temp_file(p);

  373.         if (rc != NGX_OK) {
  374.             return rc;
  375.         }
  376.     }

  377.     return NGX_OK;
  378. }


  379. static ngx_int_t
  380. ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
  381. {
  382.     u_char            *prev;
  383.     size_t             bsize;
  384.     ngx_int_t          rc;
  385.     ngx_uint_t         flush, flushed, prev_last_shadow;
  386.     ngx_chain_t       *out, **ll, *cl;
  387.     ngx_connection_t  *downstream;

  388.     downstream = p->downstream;

  389.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
  390.                    "pipe write downstream: %d", downstream->write->ready);

  391. #if (NGX_THREADS)

  392.     if (p->writing) {
  393.         rc = ngx_event_pipe_write_chain_to_temp_file(p);

  394.         if (rc == NGX_ABORT) {
  395.             return NGX_ABORT;
  396.         }
  397.     }

  398. #endif

  399.     flushed = 0;

  400.     for ( ;; ) {
  401.         if (p->downstream_error) {
  402.             return ngx_event_pipe_drain_chains(p);
  403.         }

  404.         if (p->upstream_eof || p->upstream_error || p->upstream_done) {

  405.             /* pass the p->out and p->in chains to the output filter */

  406.             for (cl = p->busy; cl; cl = cl->next) {
  407.                 cl->buf->recycled = 0;
  408.             }

  409.             if (p->out) {
  410.                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
  411.                                "pipe write downstream flush out");

  412.                 for (cl = p->out; cl; cl = cl->next) {
  413.                     cl->buf->recycled = 0;
  414.                 }

  415.                 rc = p->output_filter(p->output_ctx, p->out);

  416.                 if (rc == NGX_ERROR) {
  417.                     p->downstream_error = 1;
  418.                     return ngx_event_pipe_drain_chains(p);
  419.                 }

  420.                 p->out = NULL;
  421.             }

  422.             if (p->writing) {
  423.                 break;
  424.             }

  425.             if (p->in) {
  426.                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
  427.                                "pipe write downstream flush in");

  428.                 for (cl = p->in; cl; cl = cl->next) {
  429.                     cl->buf->recycled = 0;
  430.                 }

  431.                 rc = p->output_filter(p->output_ctx, p->in);

  432.                 if (rc == NGX_ERROR) {
  433.                     p->downstream_error = 1;
  434.                     return ngx_event_pipe_drain_chains(p);
  435.                 }

  436.                 p->in = NULL;
  437.             }

  438.             ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
  439.                            "pipe write downstream done");

  440.             /* TODO: free unused bufs */

  441.             p->downstream_done = 1;
  442.             break;
  443.         }

  444.         if (downstream->data != p->output_ctx
  445.             || !downstream->write->ready
  446.             || downstream->write->delayed)
  447.         {
  448.             break;
  449.         }

  450.         /* bsize is the size of the busy recycled bufs */

  451.         prev = NULL;
  452.         bsize = 0;

  453.         for (cl = p->busy; cl; cl = cl->next) {

  454.             if (cl->buf->recycled) {
  455.                 if (prev == cl->buf->start) {
  456.                     continue;
  457.                 }

  458.                 bsize += cl->buf->end - cl->buf->start;
  459.                 prev = cl->buf->start;
  460.             }
  461.         }

  462.         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
  463.                        "pipe write busy: %uz", bsize);

  464.         out = NULL;

  465.         if (bsize >= (size_t) p->busy_size) {
  466.             flush = 1;
  467.             goto flush;
  468.         }

  469.         flush = 0;
  470.         ll = NULL;
  471.         prev_last_shadow = 1;

  472.         for ( ;; ) {
  473.             if (p->out) {
  474.                 cl = p->out;

  475.                 if (cl->buf->recycled) {
  476.                     ngx_log_error(NGX_LOG_ALERT, p->log, 0,
  477.                                   "recycled buffer in pipe out chain");
  478.                 }

  479.                 p->out = p->out->next;

  480.             } else if (!p->cacheable && !p->writing && p->in) {
  481.                 cl = p->in;

  482.                 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
  483.                                "pipe write buf ls:%d %p %z",
  484.                                cl->buf->last_shadow,
  485.                                cl->buf->pos,
  486.                                cl->buf->last - cl->buf->pos);

  487.                 if (cl->buf->recycled && prev_last_shadow) {
  488.                     if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
  489.                         flush = 1;
  490.                         break;
  491.                     }

  492.                     bsize += cl->buf->end - cl->buf->start;
  493.                 }

  494.                 prev_last_shadow = cl->buf->last_shadow;

  495.                 p->in = p->in->next;

  496.             } else {
  497.                 break;
  498.             }

  499.             cl->next = NULL;

  500.             if (out) {
  501.                 *ll = cl;
  502.             } else {
  503.                 out = cl;
  504.             }
  505.             ll = &cl->next;
  506.         }

  507.     flush:

  508.         ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
  509.                        "pipe write: out:%p, f:%ui", out, flush);

  510.         if (out == NULL) {

  511.             if (!flush) {
  512.                 break;
  513.             }

  514.             /* a workaround for AIO */
  515.             if (flushed++ > 10) {
  516.                 return NGX_BUSY;
  517.             }
  518.         }

  519.         rc = p->output_filter(p->output_ctx, out);

  520.         ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);

  521.         if (rc == NGX_ERROR) {
  522.             p->downstream_error = 1;
  523.             return ngx_event_pipe_drain_chains(p);
  524.         }

  525.         for (cl = p->free; cl; cl = cl->next) {

  526.             if (cl->buf->temp_file) {
  527.                 if (p->cacheable || !p->cyclic_temp_file) {
  528.                     continue;
  529.                 }

  530.                 /* reset p->temp_offset if all bufs had been sent */

  531.                 if (cl->buf->file_last == p->temp_file->offset) {
  532.                     p->temp_file->offset = 0;
  533.                 }
  534.             }

  535.             /* TODO: free buf if p->free_bufs && upstream done */

  536.             /* add the free shadow raw buf to p->free_raw_bufs */

  537.             if (cl->buf->last_shadow) {
  538.                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
  539.                     return NGX_ABORT;
  540.                 }

  541.                 cl->buf->last_shadow = 0;
  542.             }

  543.             cl->buf->shadow = NULL;
  544.         }
  545.     }

  546.     return NGX_OK;
  547. }


  548. static ngx_int_t
  549. ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
  550. {
  551.     ssize_t       size, bsize, n;
  552.     ngx_buf_t    *b;
  553.     ngx_uint_t    prev_last_shadow;
  554.     ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free;

  555. #if (NGX_THREADS)

  556.     if (p->writing) {

  557.         if (p->aio) {
  558.             return NGX_AGAIN;
  559.         }

  560.         out = p->writing;
  561.         p->writing = NULL;

  562.         n = ngx_write_chain_to_temp_file(p->temp_file, NULL);

  563.         if (n == NGX_ERROR) {
  564.             return NGX_ABORT;
  565.         }

  566.         goto done;
  567.     }

  568. #endif

  569.     if (p->buf_to_file) {
  570.         out = ngx_alloc_chain_link(p->pool);
  571.         if (out == NULL) {
  572.             return NGX_ABORT;
  573.         }

  574.         out->buf = p->buf_to_file;
  575.         out->next = p->in;

  576.     } else {
  577.         out = p->in;
  578.     }

  579.     if (!p->cacheable) {

  580.         size = 0;
  581.         cl = out;
  582.         ll = NULL;
  583.         prev_last_shadow = 1;

  584.         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
  585.                        "pipe offset: %O", p->temp_file->offset);

  586.         do {
  587.             bsize = cl->buf->last - cl->buf->pos;

  588.             ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
  589.                            "pipe buf ls:%d %p, pos %p, size: %z",
  590.                            cl->buf->last_shadow, cl->buf->start,
  591.                            cl->buf->pos, bsize);

  592.             if (prev_last_shadow
  593.                 && ((size + bsize > p->temp_file_write_size)
  594.                     || (p->temp_file->offset + size + bsize
  595.                         > p->max_temp_file_size)))
  596.             {
  597.                 break;
  598.             }

  599.             prev_last_shadow = cl->buf->last_shadow;

  600.             size += bsize;
  601.             ll = &cl->next;
  602.             cl = cl->next;

  603.         } while (cl);

  604.         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);

  605.         if (ll == NULL) {
  606.             return NGX_BUSY;
  607.         }

  608.         if (cl) {
  609.             p->in = cl;
  610.             *ll = NULL;

  611.         } else {
  612.             p->in = NULL;
  613.             p->last_in = &p->in;
  614.         }

  615.     } else {
  616.         p->in = NULL;
  617.         p->last_in = &p->in;
  618.     }

  619. #if (NGX_THREADS)
  620.     if (p->thread_handler) {
  621.         p->temp_file->thread_write = 1;
  622.         p->temp_file->file.thread_task = p->thread_task;
  623.         p->temp_file->file.thread_handler = p->thread_handler;
  624.         p->temp_file->file.thread_ctx = p->thread_ctx;
  625.     }
  626. #endif

  627.     n = ngx_write_chain_to_temp_file(p->temp_file, out);

  628.     if (n == NGX_ERROR) {
  629.         return NGX_ABORT;
  630.     }

  631. #if (NGX_THREADS)

  632.     if (n == NGX_AGAIN) {
  633.         p->writing = out;
  634.         p->thread_task = p->temp_file->file.thread_task;
  635.         return NGX_AGAIN;
  636.     }

  637. done:

  638. #endif

  639.     if (p->buf_to_file) {
  640.         p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
  641.         n -= p->buf_to_file->last - p->buf_to_file->pos;
  642.         p->buf_to_file = NULL;
  643.         out = out->next;
  644.     }

  645.     if (n > 0) {
  646.         /* update previous buffer or add new buffer */

  647.         if (p->out) {
  648.             for (cl = p->out; cl->next; cl = cl->next) { /* void */ }

  649.             b = cl->buf;

  650.             if (b->file_last == p->temp_file->offset) {
  651.                 p->temp_file->offset += n;
  652.                 b->file_last = p->temp_file->offset;
  653.                 goto free;
  654.             }

  655.             last_out = &cl->next;

  656.         } else {
  657.             last_out = &p->out;
  658.         }

  659.         cl = ngx_chain_get_free_buf(p->pool, &p->free);
  660.         if (cl == NULL) {
  661.             return NGX_ABORT;
  662.         }

  663.         b = cl->buf;

  664.         ngx_memzero(b, sizeof(ngx_buf_t));

  665.         b->tag = p->tag;

  666.         b->file = &p->temp_file->file;
  667.         b->file_pos = p->temp_file->offset;
  668.         p->temp_file->offset += n;
  669.         b->file_last = p->temp_file->offset;

  670.         b->in_file = 1;
  671.         b->temp_file = 1;

  672.         *last_out = cl;
  673.     }

  674. free:

  675.     for (last_free = &p->free_raw_bufs;
  676.          *last_free != NULL;
  677.          last_free = &(*last_free)->next)
  678.     {
  679.         /* void */
  680.     }

  681.     for (cl = out; cl; cl = next) {
  682.         next = cl->next;

  683.         cl->next = p->free;
  684.         p->free = cl;

  685.         b = cl->buf;

  686.         if (b->last_shadow) {

  687.             tl = ngx_alloc_chain_link(p->pool);
  688.             if (tl == NULL) {
  689.                 return NGX_ABORT;
  690.             }

  691.             tl->buf = b->shadow;
  692.             tl->next = NULL;

  693.             *last_free = tl;
  694.             last_free = &tl->next;

  695.             b->shadow->pos = b->shadow->start;
  696.             b->shadow->last = b->shadow->start;

  697.             ngx_event_pipe_remove_shadow_links(b->shadow);
  698.         }
  699.     }

  700.     return NGX_OK;
  701. }


  702. /* the copy input filter */

  703. ngx_int_t
  704. ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
  705. {
  706.     ngx_buf_t    *b;
  707.     ngx_chain_t  *cl;

  708.     if (buf->pos == buf->last) {
  709.         return NGX_OK;
  710.     }

  711.     if (p->upstream_done) {
  712.         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
  713.                        "input data after close");
  714.         return NGX_OK;
  715.     }

  716.     if (p->length == 0) {
  717.         p->upstream_done = 1;

  718.         ngx_log_error(NGX_LOG_WARN, p->log, 0,
  719.                       "upstream sent more data than specified in "
  720.                       "\"Content-Length\" header");

  721.         return NGX_OK;
  722.     }

  723.     cl = ngx_chain_get_free_buf(p->pool, &p->free);
  724.     if (cl == NULL) {
  725.         return NGX_ERROR;
  726.     }

  727.     b = cl->buf;

  728.     ngx_memcpy(b, buf, sizeof(ngx_buf_t));
  729.     b->shadow = buf;
  730.     b->tag = p->tag;
  731.     b->last_shadow = 1;
  732.     b->recycled = 1;
  733.     buf->shadow = b;

  734.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);

  735.     if (p->in) {
  736.         *p->last_in = cl;
  737.     } else {
  738.         p->in = cl;
  739.     }
  740.     p->last_in = &cl->next;

  741.     if (p->length == -1) {
  742.         return NGX_OK;
  743.     }

  744.     if (b->last - b->pos > p->length) {

  745.         ngx_log_error(NGX_LOG_WARN, p->log, 0,
  746.                       "upstream sent more data than specified in "
  747.                       "\"Content-Length\" header");

  748.         b->last = b->pos + p->length;
  749.         p->upstream_done = 1;

  750.         return NGX_OK;
  751.     }

  752.     p->length -= b->last - b->pos;

  753.     return NGX_OK;
  754. }


  755. static ngx_inline void
  756. ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
  757. {
  758.     ngx_buf_t  *b, *next;

  759.     b = buf->shadow;

  760.     if (b == NULL) {
  761.         return;
  762.     }

  763.     while (!b->last_shadow) {
  764.         next = b->shadow;

  765.         b->temporary = 0;
  766.         b->recycled = 0;

  767.         b->shadow = NULL;
  768.         b = next;
  769.     }

  770.     b->temporary = 0;
  771.     b->recycled = 0;
  772.     b->last_shadow = 0;

  773.     b->shadow = NULL;

  774.     buf->shadow = NULL;
  775. }


  776. ngx_int_t
  777. ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
  778. {
  779.     ngx_chain_t  *cl;

  780.     cl = ngx_alloc_chain_link(p->pool);
  781.     if (cl == NULL) {
  782.         return NGX_ERROR;
  783.     }

  784.     if (p->buf_to_file && b->start == p->buf_to_file->start) {
  785.         b->pos = p->buf_to_file->last;
  786.         b->last = p->buf_to_file->last;

  787.     } else {
  788.         b->pos = b->start;
  789.         b->last = b->start;
  790.     }

  791.     b->shadow = NULL;

  792.     cl->buf = b;

  793.     if (p->free_raw_bufs == NULL) {
  794.         p->free_raw_bufs = cl;
  795.         cl->next = NULL;

  796.         return NGX_OK;
  797.     }

  798.     if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {

  799.         /* add the free buf to the list start */

  800.         cl->next = p->free_raw_bufs;
  801.         p->free_raw_bufs = cl;

  802.         return NGX_OK;
  803.     }

  804.     /* the first free buf is partially filled, thus add the free buf after it */

  805.     cl->next = p->free_raw_bufs->next;
  806.     p->free_raw_bufs->next = cl;

  807.     return NGX_OK;
  808. }


  809. static ngx_int_t
  810. ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
  811. {
  812.     ngx_chain_t  *cl, *tl;

  813.     for ( ;; ) {
  814.         if (p->busy) {
  815.             cl = p->busy;
  816.             p->busy = NULL;

  817.         } else if (p->out) {
  818.             cl = p->out;
  819.             p->out = NULL;

  820.         } else if (p->in) {
  821.             cl = p->in;
  822.             p->in = NULL;

  823.         } else {
  824.             return NGX_OK;
  825.         }

  826.         while (cl) {
  827.             if (cl->buf->last_shadow) {
  828.                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
  829.                     return NGX_ABORT;
  830.                 }

  831.                 cl->buf->last_shadow = 0;
  832.             }

  833.             cl->buf->shadow = NULL;
  834.             tl = cl->next;
  835.             cl->next = p->free;
  836.             p->free = cl;
  837.             cl = tl;
  838.         }
  839.     }
  840. }