src/event/quic/ngx_event_quic_streams.c - nginx source code

Functions defined

Macros defined

Source code


  1. /*
  2. * Copyright (C) Nginx, Inc.
  3. */


  4. #include <ngx_config.h>
  5. #include <ngx_core.h>
  6. #include <ngx_event.h>
  7. #include <ngx_event_quic_connection.h>


  8. #define NGX_QUIC_STREAM_GONE     (void *) -1


  9. static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs,
  10.     ngx_uint_t err);
  11. static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c);
  12. static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c);
  13. static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
  14. static ngx_int_t ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id);
  15. static void ngx_quic_init_stream_handler(ngx_event_t *ev);
  16. static void ngx_quic_init_streams_handler(ngx_connection_t *c);
  17. static ngx_int_t ngx_quic_do_init_streams(ngx_connection_t *c);
  18. static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
  19.     uint64_t id);
  20. static void ngx_quic_empty_handler(ngx_event_t *ev);
  21. static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
  22.     size_t size);
  23. static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
  24.     size_t size);
  25. static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
  26.     ngx_chain_t *in, off_t limit);
  27. static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs);
  28. static void ngx_quic_stream_cleanup_handler(void *data);
  29. static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs);
  30. static ngx_int_t ngx_quic_can_shutdown(ngx_connection_t *c);
  31. static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last);
  32. static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last);
  33. static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs);
  34. static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c);
  35. static void ngx_quic_set_event(ngx_event_t *ev);


  36. ngx_connection_t *
  37. ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
  38. {
  39.     uint64_t                id;
  40.     ngx_connection_t       *pc, *sc;
  41.     ngx_quic_stream_t      *qs;
  42.     ngx_quic_connection_t  *qc;

  43.     pc = c->quic ? c->quic->parent : c;
  44.     qc = ngx_quic_get_connection(pc);

  45.     if (qc->closing) {
  46.         return NULL;
  47.     }

  48.     if (bidi) {
  49.         if (qc->streams.server_streams_bidi
  50.             >= qc->streams.server_max_streams_bidi)
  51.         {
  52.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  53.                            "quic too many server bidi streams:%uL",
  54.                            qc->streams.server_streams_bidi);
  55.             return NULL;
  56.         }

  57.         id = (qc->streams.server_streams_bidi << 2)
  58.              | NGX_QUIC_STREAM_SERVER_INITIATED;

  59.         ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
  60.                        "quic creating server bidi stream"
  61.                        " streams:%uL max:%uL id:0x%xL",
  62.                        qc->streams.server_streams_bidi,
  63.                        qc->streams.server_max_streams_bidi, id);

  64.         qc->streams.server_streams_bidi++;

  65.     } else {
  66.         if (qc->streams.server_streams_uni
  67.             >= qc->streams.server_max_streams_uni)
  68.         {
  69.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  70.                            "quic too many server uni streams:%uL",
  71.                            qc->streams.server_streams_uni);
  72.             return NULL;
  73.         }

  74.         id = (qc->streams.server_streams_uni << 2)
  75.              | NGX_QUIC_STREAM_SERVER_INITIATED
  76.              | NGX_QUIC_STREAM_UNIDIRECTIONAL;

  77.         ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
  78.                        "quic creating server uni stream"
  79.                        " streams:%uL max:%uL id:0x%xL",
  80.                        qc->streams.server_streams_uni,
  81.                        qc->streams.server_max_streams_uni, id);

  82.         qc->streams.server_streams_uni++;
  83.     }

  84.     qs = ngx_quic_create_stream(pc, id);
  85.     if (qs == NULL) {
  86.         return NULL;
  87.     }

  88.     sc = qs->connection;

  89.     sc->write->active = 1;
  90.     sc->write->ready = 1;

  91.     if (bidi) {
  92.         sc->read->active = 1;
  93.     }

  94.     return sc;
  95. }


  96. void
  97. ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
  98.     ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
  99. {
  100.     ngx_rbtree_node_t  **p;
  101.     ngx_quic_stream_t   *qn, *qnt;

  102.     for ( ;; ) {
  103.         qn = (ngx_quic_stream_t *) node;
  104.         qnt = (ngx_quic_stream_t *) temp;

  105.         p = (qn->id < qnt->id) ? &temp->left : &temp->right;

  106.         if (*p == sentinel) {
  107.             break;
  108.         }

  109.         temp = *p;
  110.     }

  111.     *p = node;
  112.     node->parent = temp;
  113.     node->left = sentinel;
  114.     node->right = sentinel;
  115.     ngx_rbt_red(node);
  116. }


  117. ngx_quic_stream_t *
  118. ngx_quic_find_stream(ngx_rbtree_t *rbtree, uint64_t id)
  119. {
  120.     ngx_rbtree_node_t  *node, *sentinel;
  121.     ngx_quic_stream_t  *qn;

  122.     node = rbtree->root;
  123.     sentinel = rbtree->sentinel;

  124.     while (node != sentinel) {
  125.         qn = (ngx_quic_stream_t *) node;

  126.         if (id == qn->id) {
  127.             return qn;
  128.         }

  129.         node = (id < qn->id) ? node->left : node->right;
  130.     }

  131.     return NULL;
  132. }


  133. ngx_int_t
  134. ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc)
  135. {
  136.     ngx_pool_t         *pool;
  137.     ngx_queue_t        *q;
  138.     ngx_rbtree_t       *tree;
  139.     ngx_connection_t   *sc;
  140.     ngx_rbtree_node_t  *node;
  141.     ngx_quic_stream_t  *qs;

  142.     while (!ngx_queue_empty(&qc->streams.uninitialized)) {
  143.         q = ngx_queue_head(&qc->streams.uninitialized);
  144.         ngx_queue_remove(q);

  145.         qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
  146.         pool = qs->connection->pool;

  147.         ngx_close_connection(qs->connection);
  148.         ngx_destroy_pool(pool);
  149.     }

  150.     tree = &qc->streams.tree;

  151.     if (tree->root == tree->sentinel) {
  152.         return NGX_OK;
  153.     }

  154.     node = ngx_rbtree_min(tree->root, tree->sentinel);

  155.     while (node) {
  156.         qs = (ngx_quic_stream_t *) node;
  157.         node = ngx_rbtree_next(tree, node);
  158.         sc = qs->connection;

  159.         qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
  160.         qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;

  161.         if (sc == NULL) {
  162.             ngx_quic_close_stream(qs);
  163.             continue;
  164.         }

  165.         sc->read->error = 1;
  166.         sc->write->error = 1;

  167.         ngx_quic_set_event(sc->read);
  168.         ngx_quic_set_event(sc->write);

  169.         sc->close = 1;
  170.         sc->read->handler(sc->read);
  171.     }

  172.     if (tree->root == tree->sentinel) {
  173.         return NGX_OK;
  174.     }

  175.     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
  176.                    "quic connection has active streams");

  177.     return NGX_AGAIN;
  178. }


  179. ngx_int_t
  180. ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
  181. {
  182.     return ngx_quic_do_reset_stream(c->quic, err);
  183. }


  184. static ngx_int_t
  185. ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err)
  186. {
  187.     ngx_connection_t       *pc;
  188.     ngx_quic_frame_t       *frame;
  189.     ngx_quic_connection_t  *qc;

  190.     if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
  191.         || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
  192.         || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
  193.     {
  194.         return NGX_OK;
  195.     }

  196.     qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
  197.     qs->send_final_size = qs->send_offset;

  198.     if (qs->connection) {
  199.         qs->connection->write->error = 1;
  200.     }

  201.     pc = qs->parent;
  202.     qc = ngx_quic_get_connection(pc);

  203.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  204.                    "quic stream id:0x%xL reset", qs->id);

  205.     frame = ngx_quic_alloc_frame(pc);
  206.     if (frame == NULL) {
  207.         return NGX_ERROR;
  208.     }

  209.     frame->level = ssl_encryption_application;
  210.     frame->type = NGX_QUIC_FT_RESET_STREAM;
  211.     frame->u.reset_stream.id = qs->id;
  212.     frame->u.reset_stream.error_code = err;
  213.     frame->u.reset_stream.final_size = qs->send_offset;

  214.     ngx_quic_queue_frame(qc, frame);

  215.     ngx_quic_free_buffer(pc, &qs->send);

  216.     return NGX_OK;
  217. }


  218. ngx_int_t
  219. ngx_quic_shutdown_stream(ngx_connection_t *c, int how)
  220. {
  221.     if (how == NGX_RDWR_SHUTDOWN || how == NGX_WRITE_SHUTDOWN) {
  222.         if (ngx_quic_shutdown_stream_send(c) != NGX_OK) {
  223.             return NGX_ERROR;
  224.         }
  225.     }

  226.     if (how == NGX_RDWR_SHUTDOWN || how == NGX_READ_SHUTDOWN) {
  227.         if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) {
  228.             return NGX_ERROR;
  229.         }
  230.     }

  231.     return NGX_OK;
  232. }


  233. static ngx_int_t
  234. ngx_quic_shutdown_stream_send(ngx_connection_t *c)
  235. {
  236.     ngx_quic_stream_t  *qs;

  237.     qs = c->quic;

  238.     if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
  239.         && qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
  240.     {
  241.         return NGX_OK;
  242.     }

  243.     qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
  244.     qs->send_final_size = c->sent;

  245.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
  246.                    "quic stream id:0x%xL send shutdown", qs->id);

  247.     return ngx_quic_stream_flush(qs);
  248. }


  249. static ngx_int_t
  250. ngx_quic_shutdown_stream_recv(ngx_connection_t *c)
  251. {
  252.     ngx_connection_t       *pc;
  253.     ngx_quic_frame_t       *frame;
  254.     ngx_quic_stream_t      *qs;
  255.     ngx_quic_connection_t  *qc;

  256.     qs = c->quic;

  257.     if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
  258.         && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
  259.     {
  260.         return NGX_OK;
  261.     }

  262.     pc = qs->parent;
  263.     qc = ngx_quic_get_connection(pc);

  264.     if (qc->conf->stream_close_code == 0) {
  265.         return NGX_OK;
  266.     }

  267.     frame = ngx_quic_alloc_frame(pc);
  268.     if (frame == NULL) {
  269.         return NGX_ERROR;
  270.     }

  271.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  272.                    "quic stream id:0x%xL recv shutdown", qs->id);

  273.     frame->level = ssl_encryption_application;
  274.     frame->type = NGX_QUIC_FT_STOP_SENDING;
  275.     frame->u.stop_sending.id = qs->id;
  276.     frame->u.stop_sending.error_code = qc->conf->stream_close_code;

  277.     ngx_quic_queue_frame(qc, frame);

  278.     return NGX_OK;
  279. }


  280. static ngx_quic_stream_t *
  281. ngx_quic_get_stream(ngx_connection_t *c, uint64_t id)
  282. {
  283.     uint64_t                min_id;
  284.     ngx_event_t            *rev;
  285.     ngx_quic_stream_t      *qs;
  286.     ngx_quic_connection_t  *qc;

  287.     qc = ngx_quic_get_connection(c);

  288.     qs = ngx_quic_find_stream(&qc->streams.tree, id);

  289.     if (qs) {
  290.         return qs;
  291.     }

  292.     if (qc->shutdown || qc->closing) {
  293.         return NGX_QUIC_STREAM_GONE;
  294.     }

  295.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  296.                    "quic stream id:0x%xL is missing", id);

  297.     if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {

  298.         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
  299.             if ((id >> 2) < qc->streams.server_streams_uni) {
  300.                 return NGX_QUIC_STREAM_GONE;
  301.             }

  302.             qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  303.             return NULL;
  304.         }

  305.         if ((id >> 2) < qc->streams.client_streams_uni) {
  306.             return NGX_QUIC_STREAM_GONE;
  307.         }

  308.         if ((id >> 2) >= qc->streams.client_max_streams_uni) {
  309.             qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
  310.             return NULL;
  311.         }

  312.         min_id = (qc->streams.client_streams_uni << 2)
  313.                  | NGX_QUIC_STREAM_UNIDIRECTIONAL;
  314.         qc->streams.client_streams_uni = (id >> 2) + 1;

  315.     } else {

  316.         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
  317.             if ((id >> 2) < qc->streams.server_streams_bidi) {
  318.                 return NGX_QUIC_STREAM_GONE;
  319.             }

  320.             qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  321.             return NULL;
  322.         }

  323.         if ((id >> 2) < qc->streams.client_streams_bidi) {
  324.             return NGX_QUIC_STREAM_GONE;
  325.         }

  326.         if ((id >> 2) >= qc->streams.client_max_streams_bidi) {
  327.             qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
  328.             return NULL;
  329.         }

  330.         min_id = (qc->streams.client_streams_bidi << 2);
  331.         qc->streams.client_streams_bidi = (id >> 2) + 1;
  332.     }

  333.     /*
  334.      * RFC 9000, 2.1.  Stream Types and Identifiers
  335.      *
  336.      * successive streams of each type are created with numerically increasing
  337.      * stream IDs.  A stream ID that is used out of order results in all
  338.      * streams of that type with lower-numbered stream IDs also being opened.
  339.      */

  340. #if (NGX_SUPPRESS_WARN)
  341.     qs = NULL;
  342. #endif

  343.     for ( /* void */ ; min_id <= id; min_id += 0x04) {

  344.         qs = ngx_quic_create_stream(c, min_id);

  345.         if (qs == NULL) {
  346.             if (ngx_quic_reject_stream(c, min_id) != NGX_OK) {
  347.                 return NULL;
  348.             }

  349.             continue;
  350.         }

  351.         ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue);

  352.         rev = qs->connection->read;
  353.         rev->handler = ngx_quic_init_stream_handler;

  354.         if (qc->streams.initialized) {
  355.             ngx_post_event(rev, &ngx_posted_events);

  356.             if (qc->push.posted) {
  357.                 /*
  358.                  * The posted stream can produce output immediately.
  359.                  * By postponing the push event, we coalesce the stream
  360.                  * output with queued frames in one UDP datagram.
  361.                  */

  362.                 ngx_delete_posted_event(&qc->push);
  363.                 ngx_post_event(&qc->push, &ngx_posted_events);
  364.             }
  365.         }
  366.     }

  367.     if (qs == NULL) {
  368.         return NGX_QUIC_STREAM_GONE;
  369.     }

  370.     return qs;
  371. }


  372. static ngx_int_t
  373. ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id)
  374. {
  375.     uint64_t                code;
  376.     ngx_quic_frame_t       *frame;
  377.     ngx_quic_connection_t  *qc;

  378.     qc = ngx_quic_get_connection(c);

  379.     code = (id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  380.            ? qc->conf->stream_reject_code_uni
  381.            : qc->conf->stream_reject_code_bidi;

  382.     if (code == 0) {
  383.         return NGX_DECLINED;
  384.     }

  385.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
  386.                    "quic stream id:0x%xL reject err:0x%xL", id, code);

  387.     frame = ngx_quic_alloc_frame(c);
  388.     if (frame == NULL) {
  389.         return NGX_ERROR;
  390.     }

  391.     frame->level = ssl_encryption_application;
  392.     frame->type = NGX_QUIC_FT_RESET_STREAM;
  393.     frame->u.reset_stream.id = id;
  394.     frame->u.reset_stream.error_code = code;
  395.     frame->u.reset_stream.final_size = 0;

  396.     ngx_quic_queue_frame(qc, frame);

  397.     frame = ngx_quic_alloc_frame(c);
  398.     if (frame == NULL) {
  399.         return NGX_ERROR;
  400.     }

  401.     frame->level = ssl_encryption_application;
  402.     frame->type = NGX_QUIC_FT_STOP_SENDING;
  403.     frame->u.stop_sending.id = id;
  404.     frame->u.stop_sending.error_code = code;

  405.     ngx_quic_queue_frame(qc, frame);

  406.     return NGX_OK;
  407. }


  408. static void
  409. ngx_quic_init_stream_handler(ngx_event_t *ev)
  410. {
  411.     ngx_connection_t   *c;
  412.     ngx_quic_stream_t  *qs;

  413.     c = ev->data;
  414.     qs = c->quic;

  415.     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic init stream");

  416.     if ((qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
  417.         c->write->active = 1;
  418.         c->write->ready = 1;
  419.     }

  420.     c->read->active = 1;

  421.     ngx_queue_remove(&qs->queue);

  422.     c->listening->handler(c);
  423. }


  424. ngx_int_t
  425. ngx_quic_init_streams(ngx_connection_t *c)
  426. {
  427.     ngx_int_t               rc;
  428.     ngx_quic_connection_t  *qc;

  429.     qc = ngx_quic_get_connection(c);

  430.     if (qc->streams.initialized) {
  431.         return NGX_OK;
  432.     }

  433.     rc = ngx_ssl_ocsp_validate(c);

  434.     if (rc == NGX_ERROR) {
  435.         return NGX_ERROR;
  436.     }

  437.     if (rc == NGX_AGAIN) {
  438.         c->ssl->handler = ngx_quic_init_streams_handler;
  439.         return NGX_OK;
  440.     }

  441.     return ngx_quic_do_init_streams(c);
  442. }


  443. static void
  444. ngx_quic_init_streams_handler(ngx_connection_t *c)
  445. {
  446.     if (ngx_quic_do_init_streams(c) != NGX_OK) {
  447.         ngx_quic_close_connection(c, NGX_ERROR);
  448.     }
  449. }


  450. static ngx_int_t
  451. ngx_quic_do_init_streams(ngx_connection_t *c)
  452. {
  453.     ngx_queue_t            *q;
  454.     ngx_quic_stream_t      *qs;
  455.     ngx_quic_connection_t  *qc;

  456.     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic init streams");

  457.     qc = ngx_quic_get_connection(c);

  458.     if (qc->conf->init) {
  459.         if (qc->conf->init(c) != NGX_OK) {
  460.             return NGX_ERROR;
  461.         }
  462.     }

  463.     for (q = ngx_queue_head(&qc->streams.uninitialized);
  464.          q != ngx_queue_sentinel(&qc->streams.uninitialized);
  465.          q = ngx_queue_next(q))
  466.     {
  467.         qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
  468.         ngx_post_event(qs->connection->read, &ngx_posted_events);
  469.     }

  470.     qc->streams.initialized = 1;

  471.     if (!qc->closing && qc->close.timer_set) {
  472.         ngx_del_timer(&qc->close);
  473.     }

  474.     return NGX_OK;
  475. }


  476. static ngx_quic_stream_t *
  477. ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
  478. {
  479.     ngx_str_t               addr_text;
  480.     ngx_log_t              *log;
  481.     ngx_pool_t             *pool;
  482.     ngx_uint_t              reusable;
  483.     ngx_queue_t            *q;
  484.     struct sockaddr        *sockaddr;
  485.     ngx_connection_t       *sc;
  486.     ngx_quic_stream_t      *qs;
  487.     ngx_pool_cleanup_t     *cln;
  488.     ngx_quic_connection_t  *qc;

  489.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  490.                    "quic stream id:0x%xL create", id);

  491.     qc = ngx_quic_get_connection(c);

  492.     if (!ngx_queue_empty(&qc->streams.free)) {
  493.         q = ngx_queue_head(&qc->streams.free);
  494.         qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
  495.         ngx_queue_remove(&qs->queue);

  496.     } else {
  497.         /*
  498.          * the number of streams is limited by transport
  499.          * parameters and application requirements
  500.          */

  501.         qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t));
  502.         if (qs == NULL) {
  503.             return NULL;
  504.         }
  505.     }

  506.     ngx_memzero(qs, sizeof(ngx_quic_stream_t));

  507.     qs->node.key = id;
  508.     qs->parent = c;
  509.     qs->id = id;
  510.     qs->send_final_size = (uint64_t) -1;
  511.     qs->recv_final_size = (uint64_t) -1;

  512.     pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
  513.     if (pool == NULL) {
  514.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  515.         return NULL;
  516.     }

  517.     log = ngx_palloc(pool, sizeof(ngx_log_t));
  518.     if (log == NULL) {
  519.         ngx_destroy_pool(pool);
  520.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  521.         return NULL;
  522.     }

  523.     *log = *c->log;
  524.     pool->log = log;

  525.     sockaddr = ngx_palloc(pool, c->socklen);
  526.     if (sockaddr == NULL) {
  527.         ngx_destroy_pool(pool);
  528.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  529.         return NULL;
  530.     }

  531.     ngx_memcpy(sockaddr, c->sockaddr, c->socklen);

  532.     if (c->addr_text.data) {
  533.         addr_text.data = ngx_pnalloc(pool, c->addr_text.len);
  534.         if (addr_text.data == NULL) {
  535.             ngx_destroy_pool(pool);
  536.             ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  537.             return NULL;
  538.         }

  539.         ngx_memcpy(addr_text.data, c->addr_text.data, c->addr_text.len);
  540.         addr_text.len = c->addr_text.len;

  541.     } else {
  542.         addr_text.len = 0;
  543.         addr_text.data = NULL;
  544.     }

  545.     reusable = c->reusable;
  546.     ngx_reusable_connection(c, 0);

  547.     sc = ngx_get_connection(c->fd, log);
  548.     if (sc == NULL) {
  549.         ngx_destroy_pool(pool);
  550.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  551.         ngx_reusable_connection(c, reusable);
  552.         return NULL;
  553.     }

  554.     qs->connection = sc;

  555.     sc->quic = qs;
  556.     sc->shared = 1;
  557.     sc->type = SOCK_STREAM;
  558.     sc->pool = pool;
  559.     sc->ssl = c->ssl;
  560.     sc->sockaddr = sockaddr;
  561.     sc->socklen = c->socklen;
  562.     sc->listening = c->listening;
  563.     sc->addr_text = addr_text;
  564.     sc->local_sockaddr = c->local_sockaddr;
  565.     sc->local_socklen = c->local_socklen;
  566.     sc->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
  567.     sc->start_time = c->start_time;
  568.     sc->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;

  569.     sc->recv = ngx_quic_stream_recv;
  570.     sc->send = ngx_quic_stream_send;
  571.     sc->send_chain = ngx_quic_stream_send_chain;

  572.     sc->read->log = log;
  573.     sc->write->log = log;

  574.     sc->read->handler = ngx_quic_empty_handler;
  575.     sc->write->handler = ngx_quic_empty_handler;

  576.     log->connection = sc->number;

  577.     if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
  578.         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
  579.             qs->send_max_data = qc->ctp.initial_max_stream_data_uni;
  580.             qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
  581.             qs->send_state = NGX_QUIC_STREAM_SEND_READY;

  582.         } else {
  583.             qs->recv_max_data = qc->tp.initial_max_stream_data_uni;
  584.             qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
  585.             qs->send_state = NGX_QUIC_STREAM_SEND_DATA_RECVD;
  586.         }

  587.     } else {
  588.         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
  589.             qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
  590.             qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_local;

  591.         } else {
  592.             qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
  593.             qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote;
  594.         }

  595.         qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
  596.         qs->send_state = NGX_QUIC_STREAM_SEND_READY;
  597.     }

  598.     qs->recv_window = qs->recv_max_data;

  599.     cln = ngx_pool_cleanup_add(pool, 0);
  600.     if (cln == NULL) {
  601.         ngx_close_connection(sc);
  602.         ngx_destroy_pool(pool);
  603.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  604.         ngx_reusable_connection(c, reusable);
  605.         return NULL;
  606.     }

  607.     cln->handler = ngx_quic_stream_cleanup_handler;
  608.     cln->data = sc;

  609.     ngx_rbtree_insert(&qc->streams.tree, &qs->node);

  610.     return qs;
  611. }


  612. void
  613. ngx_quic_cancelable_stream(ngx_connection_t *c)
  614. {
  615.     ngx_connection_t       *pc;
  616.     ngx_quic_stream_t      *qs;
  617.     ngx_quic_connection_t  *qc;

  618.     qs = c->quic;
  619.     pc = qs->parent;
  620.     qc = ngx_quic_get_connection(pc);

  621.     if (!qs->cancelable) {
  622.         qs->cancelable = 1;

  623.         if (ngx_quic_can_shutdown(pc) == NGX_OK) {
  624.             ngx_reusable_connection(pc, 1);

  625.             if (qc->shutdown) {
  626.                 ngx_quic_shutdown_quic(pc);
  627.             }
  628.         }
  629.     }
  630. }


  631. static void
  632. ngx_quic_empty_handler(ngx_event_t *ev)
  633. {
  634. }


  635. static ssize_t
  636. ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
  637. {
  638.     ssize_t             len;
  639.     ngx_buf_t          *b;
  640.     ngx_chain_t        *cl, *in;
  641.     ngx_event_t        *rev;
  642.     ngx_connection_t   *pc;
  643.     ngx_quic_stream_t  *qs;

  644.     qs = c->quic;
  645.     pc = qs->parent;
  646.     rev = c->read;

  647.     if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
  648.         || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
  649.     {
  650.         qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_READ;
  651.         return NGX_ERROR;
  652.     }

  653.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  654.                    "quic stream id:0x%xL recv buf:%uz", qs->id, size);

  655.     if (size == 0) {
  656.         return 0;
  657.     }

  658.     in = ngx_quic_read_buffer(pc, &qs->recv, size);
  659.     if (in == NGX_CHAIN_ERROR) {
  660.         return NGX_ERROR;
  661.     }

  662.     len = 0;

  663.     for (cl = in; cl; cl = cl->next) {
  664.         b = cl->buf;
  665.         len += b->last - b->pos;
  666.         buf = ngx_cpymem(buf, b->pos, b->last - b->pos);
  667.     }

  668.     ngx_quic_free_chain(pc, in);

  669.     if (len == 0) {
  670.         rev->ready = 0;

  671.         if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD
  672.             && qs->recv_offset == qs->recv_final_size)
  673.         {
  674.             qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
  675.         }

  676.         if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_READ) {
  677.             rev->eof = 1;
  678.             return 0;
  679.         }

  680.         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  681.                        "quic stream id:0x%xL recv() not ready", qs->id);
  682.         return NGX_AGAIN;
  683.     }

  684.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
  685.                    "quic stream id:0x%xL recv len:%z", qs->id, len);

  686.     if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) {
  687.         return NGX_ERROR;
  688.     }

  689.     return len;
  690. }


  691. static ssize_t
  692. ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
  693. {
  694.     ngx_buf_t    b;
  695.     ngx_chain_t  cl;

  696.     ngx_memzero(&b, sizeof(ngx_buf_t));

  697.     b.memory = 1;
  698.     b.pos = buf;
  699.     b.last = buf + size;

  700.     cl.buf = &b;
  701.     cl.next = NULL;

  702.     if (ngx_quic_stream_send_chain(c, &cl, 0) == NGX_CHAIN_ERROR) {
  703.         return NGX_ERROR;
  704.     }

  705.     if (b.pos == buf) {
  706.         return NGX_AGAIN;
  707.     }

  708.     return b.pos - buf;
  709. }


  710. static ngx_chain_t *
  711. ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
  712. {
  713.     uint64_t                n, flow;
  714.     ngx_event_t            *wev;
  715.     ngx_connection_t       *pc;
  716.     ngx_quic_stream_t      *qs;
  717.     ngx_quic_connection_t  *qc;

  718.     qs = c->quic;
  719.     pc = qs->parent;
  720.     qc = ngx_quic_get_connection(pc);
  721.     wev = c->write;

  722.     if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
  723.         && qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
  724.     {
  725.         wev->error = 1;
  726.         return NGX_CHAIN_ERROR;
  727.     }

  728.     qs->send_state = NGX_QUIC_STREAM_SEND_SEND;

  729.     flow = qs->acked + qc->conf->stream_buffer_size - qs->sent;

  730.     if (flow == 0) {
  731.         wev->ready = 0;
  732.         return in;
  733.     }

  734.     if (limit == 0 || limit > (off_t) flow) {
  735.         limit = flow;
  736.     }

  737.     n = qs->send.size;

  738.     in = ngx_quic_write_buffer(pc, &qs->send, in, limit, qs->sent);
  739.     if (in == NGX_CHAIN_ERROR) {
  740.         return NGX_CHAIN_ERROR;
  741.     }

  742.     n = qs->send.size - n;
  743.     c->sent += n;
  744.     qs->sent += n;
  745.     qc->streams.sent += n;

  746.     if (flow == n) {
  747.         wev->ready = 0;
  748.     }

  749.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  750.                    "quic send_chain sent:%uL", n);

  751.     if (ngx_quic_stream_flush(qs) != NGX_OK) {
  752.         return NGX_CHAIN_ERROR;
  753.     }

  754.     return in;
  755. }


  756. static ngx_int_t
  757. ngx_quic_stream_flush(ngx_quic_stream_t *qs)
  758. {
  759.     off_t                   limit, len;
  760.     ngx_uint_t              last;
  761.     ngx_chain_t            *out;
  762.     ngx_quic_frame_t       *frame;
  763.     ngx_connection_t       *pc;
  764.     ngx_quic_connection_t  *qc;

  765.     if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
  766.         return NGX_OK;
  767.     }

  768.     pc = qs->parent;
  769.     qc = ngx_quic_get_connection(pc);

  770.     if (qc->streams.send_max_data == 0) {
  771.         qc->streams.send_max_data = qc->ctp.initial_max_data;
  772.     }

  773.     limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset,
  774.                     qs->send_max_data - qs->send_offset);

  775.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  776.                    "quic stream id:0x%xL flush limit:%O", qs->id, limit);

  777.     len = qs->send.offset;

  778.     out = ngx_quic_read_buffer(pc, &qs->send, limit);
  779.     if (out == NGX_CHAIN_ERROR) {
  780.         return NGX_ERROR;
  781.     }

  782.     len = qs->send.offset - len;
  783.     last = 0;

  784.     if (qs->send_final_size != (uint64_t) -1
  785.         && qs->send_final_size == qs->send.offset)
  786.     {
  787.         qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
  788.         last = 1;
  789.     }

  790.     if (len == 0 && !last) {
  791.         return NGX_OK;
  792.     }

  793.     frame = ngx_quic_alloc_frame(pc);
  794.     if (frame == NULL) {
  795.         return NGX_ERROR;
  796.     }

  797.     frame->level = ssl_encryption_application;
  798.     frame->type = NGX_QUIC_FT_STREAM;
  799.     frame->data = out;

  800.     frame->u.stream.off = 1;
  801.     frame->u.stream.len = 1;
  802.     frame->u.stream.fin = last;

  803.     frame->u.stream.stream_id = qs->id;
  804.     frame->u.stream.offset = qs->send_offset;
  805.     frame->u.stream.length = len;

  806.     ngx_quic_queue_frame(qc, frame);

  807.     qs->send_offset += len;
  808.     qc->streams.send_offset += len;

  809.     ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  810.                    "quic stream id:0x%xL flush len:%O last:%ui",
  811.                    qs->id, len, last);

  812.     if (qs->connection == NULL) {
  813.         return ngx_quic_close_stream(qs);
  814.     }

  815.     return NGX_OK;
  816. }


  817. static void
  818. ngx_quic_stream_cleanup_handler(void *data)
  819. {
  820.     ngx_connection_t *c = data;

  821.     ngx_quic_stream_t      *qs;
  822.     ngx_quic_connection_t  *qc;

  823.     qs = c->quic;

  824.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
  825.                    "quic stream id:0x%xL cleanup", qs->id);

  826.     if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) {
  827.         qs->connection = NULL;
  828.         goto failed;
  829.     }

  830.     qs->connection = NULL;

  831.     if (ngx_quic_close_stream(qs) != NGX_OK) {
  832.         goto failed;
  833.     }

  834.     return;

  835. failed:

  836.     qc = ngx_quic_get_connection(qs->parent);
  837.     qc->error = NGX_QUIC_ERR_INTERNAL_ERROR;

  838.     ngx_post_event(&qc->close, &ngx_posted_events);
  839. }


  840. static ngx_int_t
  841. ngx_quic_close_stream(ngx_quic_stream_t *qs)
  842. {
  843.     ngx_connection_t       *pc;
  844.     ngx_quic_frame_t       *frame;
  845.     ngx_quic_connection_t  *qc;

  846.     pc = qs->parent;
  847.     qc = ngx_quic_get_connection(pc);

  848.     if (!qc->closing) {
  849.         /* make sure everything is sent and final size is received */

  850.         if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV) {
  851.             return NGX_OK;
  852.         }

  853.         if (qs->send_state != NGX_QUIC_STREAM_SEND_DATA_RECVD
  854.             && qs->send_state != NGX_QUIC_STREAM_SEND_RESET_RECVD)
  855.         {
  856.             return NGX_OK;
  857.         }
  858.     }

  859.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  860.                    "quic stream id:0x%xL close", qs->id);

  861.     ngx_quic_free_buffer(pc, &qs->send);
  862.     ngx_quic_free_buffer(pc, &qs->recv);

  863.     ngx_rbtree_delete(&qc->streams.tree, &qs->node);
  864.     ngx_queue_insert_tail(&qc->streams.free, &qs->queue);

  865.     if (qc->closing) {
  866.         /* schedule handler call to continue ngx_quic_close_connection() */
  867.         ngx_post_event(&qc->close, &ngx_posted_events);
  868.         return NGX_OK;
  869.     }

  870.     if (!pc->reusable && ngx_quic_can_shutdown(pc) == NGX_OK) {
  871.         ngx_reusable_connection(pc, 1);
  872.     }

  873.     if (qc->shutdown) {
  874.         ngx_quic_shutdown_quic(pc);
  875.         return NGX_OK;
  876.     }

  877.     if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
  878.         frame = ngx_quic_alloc_frame(pc);
  879.         if (frame == NULL) {
  880.             return NGX_ERROR;
  881.         }

  882.         frame->level = ssl_encryption_application;
  883.         frame->type = NGX_QUIC_FT_MAX_STREAMS;

  884.         if (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
  885.             frame->u.max_streams.limit = ++qc->streams.client_max_streams_uni;
  886.             frame->u.max_streams.bidi = 0;

  887.         } else {
  888.             frame->u.max_streams.limit = ++qc->streams.client_max_streams_bidi;
  889.             frame->u.max_streams.bidi = 1;
  890.         }

  891.         ngx_quic_queue_frame(qc, frame);
  892.     }

  893.     return NGX_OK;
  894. }


  895. static ngx_int_t
  896. ngx_quic_can_shutdown(ngx_connection_t *c)
  897. {
  898.     ngx_rbtree_t           *tree;
  899.     ngx_rbtree_node_t      *node;
  900.     ngx_quic_stream_t      *qs;
  901.     ngx_quic_connection_t  *qc;

  902.     qc = ngx_quic_get_connection(c);

  903.     tree = &qc->streams.tree;

  904.     if (tree->root != tree->sentinel) {
  905.         for (node = ngx_rbtree_min(tree->root, tree->sentinel);
  906.              node;
  907.              node = ngx_rbtree_next(tree, node))
  908.         {
  909.             qs = (ngx_quic_stream_t *) node;

  910.             if (!qs->cancelable) {
  911.                 return NGX_DECLINED;
  912.             }
  913.         }
  914.     }

  915.     return NGX_OK;
  916. }


  917. ngx_int_t
  918. ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
  919.     ngx_quic_frame_t *frame)
  920. {
  921.     uint64_t                  last;
  922.     ngx_quic_stream_t        *qs;
  923.     ngx_quic_connection_t    *qc;
  924.     ngx_quic_stream_frame_t  *f;

  925.     qc = ngx_quic_get_connection(c);
  926.     f = &frame->u.stream;

  927.     if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  928.         && (f->stream_id & NGX_QUIC_STREAM_SERVER_INITIATED))
  929.     {
  930.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  931.         return NGX_ERROR;
  932.     }

  933.     /* no overflow since both values are 62-bit */
  934.     last = f->offset + f->length;

  935.     qs = ngx_quic_get_stream(c, f->stream_id);

  936.     if (qs == NULL) {
  937.         return NGX_ERROR;
  938.     }

  939.     if (qs == NGX_QUIC_STREAM_GONE) {
  940.         return NGX_OK;
  941.     }

  942.     if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
  943.         && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
  944.     {
  945.         return NGX_OK;
  946.     }

  947.     if (ngx_quic_control_flow(qs, last) != NGX_OK) {
  948.         return NGX_ERROR;
  949.     }

  950.     if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) {
  951.         qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  952.         return NGX_ERROR;
  953.     }

  954.     if (last < qs->recv_offset) {
  955.         return NGX_OK;
  956.     }

  957.     if (f->fin) {
  958.         if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last)
  959.         {
  960.             qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  961.             return NGX_ERROR;
  962.         }

  963.         if (qs->recv_last > last) {
  964.             qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  965.             return NGX_ERROR;
  966.         }

  967.         qs->recv_final_size = last;
  968.         qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
  969.     }

  970.     if (ngx_quic_write_buffer(c, &qs->recv, frame->data, f->length, f->offset)
  971.         == NGX_CHAIN_ERROR)
  972.     {
  973.         return NGX_ERROR;
  974.     }

  975.     if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
  976.         && qs->recv.size == qs->recv_final_size)
  977.     {
  978.         qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD;
  979.     }

  980.     if (qs->connection == NULL) {
  981.         return ngx_quic_close_stream(qs);
  982.     }

  983.     if (f->offset <= qs->recv_offset) {
  984.         ngx_quic_set_event(qs->connection->read);
  985.     }

  986.     return NGX_OK;
  987. }


  988. ngx_int_t
  989. ngx_quic_handle_max_data_frame(ngx_connection_t *c,
  990.     ngx_quic_max_data_frame_t *f)
  991. {
  992.     ngx_rbtree_t           *tree;
  993.     ngx_rbtree_node_t      *node;
  994.     ngx_quic_stream_t      *qs;
  995.     ngx_quic_connection_t  *qc;

  996.     qc = ngx_quic_get_connection(c);
  997.     tree = &qc->streams.tree;

  998.     if (f->max_data <= qc->streams.send_max_data) {
  999.         return NGX_OK;
  1000.     }

  1001.     if (tree->root == tree->sentinel
  1002.         || qc->streams.send_offset < qc->streams.send_max_data)
  1003.     {
  1004.         /* not blocked on MAX_DATA */
  1005.         qc->streams.send_max_data = f->max_data;
  1006.         return NGX_OK;
  1007.     }

  1008.     qc->streams.send_max_data = f->max_data;
  1009.     node = ngx_rbtree_min(tree->root, tree->sentinel);

  1010.     while (node && qc->streams.send_offset < qc->streams.send_max_data) {

  1011.         qs = (ngx_quic_stream_t *) node;
  1012.         node = ngx_rbtree_next(tree, node);

  1013.         if (ngx_quic_stream_flush(qs) != NGX_OK) {
  1014.             return NGX_ERROR;
  1015.         }
  1016.     }

  1017.     return NGX_OK;
  1018. }


  1019. ngx_int_t
  1020. ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
  1021.     ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f)
  1022. {
  1023.     return NGX_OK;
  1024. }


  1025. ngx_int_t
  1026. ngx_quic_handle_data_blocked_frame(ngx_connection_t *c,
  1027.     ngx_quic_header_t *pkt, ngx_quic_data_blocked_frame_t *f)
  1028. {
  1029.     return ngx_quic_update_max_data(c);
  1030. }


  1031. ngx_int_t
  1032. ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
  1033.     ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
  1034. {
  1035.     ngx_quic_stream_t      *qs;
  1036.     ngx_quic_connection_t  *qc;

  1037.     qc = ngx_quic_get_connection(c);

  1038.     if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  1039.         && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
  1040.     {
  1041.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  1042.         return NGX_ERROR;
  1043.     }

  1044.     qs = ngx_quic_get_stream(c, f->id);

  1045.     if (qs == NULL) {
  1046.         return NGX_ERROR;
  1047.     }

  1048.     if (qs == NGX_QUIC_STREAM_GONE) {
  1049.         return NGX_OK;
  1050.     }

  1051.     return ngx_quic_update_max_stream_data(qs);
  1052. }


  1053. ngx_int_t
  1054. ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
  1055.     ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
  1056. {
  1057.     ngx_quic_stream_t      *qs;
  1058.     ngx_quic_connection_t  *qc;

  1059.     qc = ngx_quic_get_connection(c);

  1060.     if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  1061.         && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
  1062.     {
  1063.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  1064.         return NGX_ERROR;
  1065.     }

  1066.     qs = ngx_quic_get_stream(c, f->id);

  1067.     if (qs == NULL) {
  1068.         return NGX_ERROR;
  1069.     }

  1070.     if (qs == NGX_QUIC_STREAM_GONE) {
  1071.         return NGX_OK;
  1072.     }

  1073.     if (f->limit <= qs->send_max_data) {
  1074.         return NGX_OK;
  1075.     }

  1076.     if (qs->send_offset < qs->send_max_data) {
  1077.         /* not blocked on MAX_STREAM_DATA */
  1078.         qs->send_max_data = f->limit;
  1079.         return NGX_OK;
  1080.     }

  1081.     qs->send_max_data = f->limit;

  1082.     return ngx_quic_stream_flush(qs);
  1083. }


  1084. ngx_int_t
  1085. ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
  1086.     ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
  1087. {
  1088.     ngx_event_t            *rev;
  1089.     ngx_quic_stream_t      *qs;
  1090.     ngx_quic_connection_t  *qc;

  1091.     qc = ngx_quic_get_connection(c);

  1092.     if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  1093.         && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
  1094.     {
  1095.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  1096.         return NGX_ERROR;
  1097.     }

  1098.     qs = ngx_quic_get_stream(c, f->id);

  1099.     if (qs == NULL) {
  1100.         return NGX_ERROR;
  1101.     }

  1102.     if (qs == NGX_QUIC_STREAM_GONE) {
  1103.         return NGX_OK;
  1104.     }

  1105.     if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
  1106.         || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
  1107.     {
  1108.         return NGX_OK;
  1109.     }

  1110.     qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;

  1111.     if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) {
  1112.         return NGX_ERROR;
  1113.     }

  1114.     if (qs->recv_final_size != (uint64_t) -1
  1115.         && qs->recv_final_size != f->final_size)
  1116.     {
  1117.         qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  1118.         return NGX_ERROR;
  1119.     }

  1120.     if (qs->recv_last > f->final_size) {
  1121.         qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  1122.         return NGX_ERROR;
  1123.     }

  1124.     qs->recv_final_size = f->final_size;

  1125.     if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) {
  1126.         return NGX_ERROR;
  1127.     }

  1128.     if (qs->connection == NULL) {
  1129.         return ngx_quic_close_stream(qs);
  1130.     }

  1131.     rev = qs->connection->read;
  1132.     rev->error = 1;

  1133.     ngx_quic_set_event(rev);

  1134.     return NGX_OK;
  1135. }


  1136. ngx_int_t
  1137. ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
  1138.     ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f)
  1139. {
  1140.     ngx_quic_stream_t      *qs;
  1141.     ngx_quic_connection_t  *qc;

  1142.     qc = ngx_quic_get_connection(c);

  1143.     if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  1144.         && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
  1145.     {
  1146.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  1147.         return NGX_ERROR;
  1148.     }

  1149.     qs = ngx_quic_get_stream(c, f->id);

  1150.     if (qs == NULL) {
  1151.         return NGX_ERROR;
  1152.     }

  1153.     if (qs == NGX_QUIC_STREAM_GONE) {
  1154.         return NGX_OK;
  1155.     }

  1156.     if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) {
  1157.         return NGX_ERROR;
  1158.     }

  1159.     if (qs->connection == NULL) {
  1160.         return ngx_quic_close_stream(qs);
  1161.     }

  1162.     ngx_quic_set_event(qs->connection->write);

  1163.     return NGX_OK;
  1164. }


  1165. ngx_int_t
  1166. ngx_quic_handle_max_streams_frame(ngx_connection_t *c,
  1167.     ngx_quic_header_t *pkt, ngx_quic_max_streams_frame_t *f)
  1168. {
  1169.     ngx_quic_connection_t  *qc;

  1170.     qc = ngx_quic_get_connection(c);

  1171.     if (f->bidi) {
  1172.         if (qc->streams.server_max_streams_bidi < f->limit) {
  1173.             qc->streams.server_max_streams_bidi = f->limit;

  1174.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1175.                            "quic max_streams_bidi:%uL", f->limit);
  1176.         }

  1177.     } else {
  1178.         if (qc->streams.server_max_streams_uni < f->limit) {
  1179.             qc->streams.server_max_streams_uni = f->limit;

  1180.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1181.                            "quic max_streams_uni:%uL", f->limit);
  1182.         }
  1183.     }

  1184.     return NGX_OK;
  1185. }


  1186. void
  1187. ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
  1188. {
  1189.     uint64_t                acked;
  1190.     ngx_quic_stream_t      *qs;
  1191.     ngx_quic_connection_t  *qc;

  1192.     qc = ngx_quic_get_connection(c);

  1193.     switch (f->type) {

  1194.     case NGX_QUIC_FT_RESET_STREAM:

  1195.         qs = ngx_quic_find_stream(&qc->streams.tree, f->u.reset_stream.id);
  1196.         if (qs == NULL) {
  1197.             return;
  1198.         }

  1199.         qs->send_state = NGX_QUIC_STREAM_SEND_RESET_RECVD;

  1200.         ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1201.                        "quic stream id:0x%xL ack reset final_size:%uL",
  1202.                        qs->id, f->u.reset_stream.final_size);

  1203.         break;

  1204.     case NGX_QUIC_FT_STREAM:

  1205.         qs = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
  1206.         if (qs == NULL) {
  1207.             return;
  1208.         }

  1209.         acked = qs->acked;
  1210.         qs->acked += f->u.stream.length;

  1211.         if (f->u.stream.fin) {
  1212.             qs->fin_acked = 1;
  1213.         }

  1214.         if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_SENT
  1215.             && qs->acked == qs->sent && qs->fin_acked)
  1216.         {
  1217.             qs->send_state = NGX_QUIC_STREAM_SEND_DATA_RECVD;
  1218.         }

  1219.         ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1220.                        "quic stream id:0x%xL ack len:%uL fin:%d unacked:%uL",
  1221.                        qs->id, f->u.stream.length, f->u.stream.fin,
  1222.                        qs->sent - qs->acked);

  1223.         if (qs->connection
  1224.             && qs->sent - acked == qc->conf->stream_buffer_size
  1225.             && f->u.stream.length > 0)
  1226.         {
  1227.             ngx_quic_set_event(qs->connection->write);
  1228.         }

  1229.         break;

  1230.     default:
  1231.         return;
  1232.     }

  1233.     if (qs->connection == NULL) {
  1234.         ngx_quic_close_stream(qs);
  1235.     }
  1236. }


  1237. static ngx_int_t
  1238. ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last)
  1239. {
  1240.     uint64_t                len;
  1241.     ngx_connection_t       *pc;
  1242.     ngx_quic_connection_t  *qc;

  1243.     pc = qs->parent;
  1244.     qc = ngx_quic_get_connection(pc);

  1245.     if (last <= qs->recv_last) {
  1246.         return NGX_OK;
  1247.     }

  1248.     len = last - qs->recv_last;

  1249.     ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  1250.                    "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL",
  1251.                    qs->id, last, qs->recv_max_data, qc->streams.recv_last + len,
  1252.                    qc->streams.recv_max_data);

  1253.     qs->recv_last += len;

  1254.     if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
  1255.         && qs->recv_last > qs->recv_max_data)
  1256.     {
  1257.         qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
  1258.         return NGX_ERROR;
  1259.     }

  1260.     qc->streams.recv_last += len;

  1261.     if (qc->streams.recv_last > qc->streams.recv_max_data) {
  1262.         qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
  1263.         return NGX_ERROR;
  1264.     }

  1265.     return NGX_OK;
  1266. }


  1267. static ngx_int_t
  1268. ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last)
  1269. {
  1270.     uint64_t                len;
  1271.     ngx_connection_t       *pc;
  1272.     ngx_quic_connection_t  *qc;

  1273.     pc = qs->parent;
  1274.     qc = ngx_quic_get_connection(pc);

  1275.     if (last <= qs->recv_offset) {
  1276.         return NGX_OK;
  1277.     }

  1278.     len = last - qs->recv_offset;

  1279.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  1280.                    "quic stream id:0x%xL flow update %uL", qs->id, last);

  1281.     qs->recv_offset += len;

  1282.     if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
  1283.         if (ngx_quic_update_max_stream_data(qs) != NGX_OK) {
  1284.             return NGX_ERROR;
  1285.         }
  1286.     }

  1287.     qc->streams.recv_offset += len;

  1288.     if (qc->streams.recv_max_data
  1289.         <= qc->streams.recv_offset + qc->streams.recv_window / 2)
  1290.     {
  1291.         if (ngx_quic_update_max_data(pc) != NGX_OK) {
  1292.             return NGX_ERROR;
  1293.         }
  1294.     }

  1295.     return NGX_OK;
  1296. }


  1297. static ngx_int_t
  1298. ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs)
  1299. {
  1300.     uint64_t                recv_max_data;
  1301.     ngx_connection_t       *pc;
  1302.     ngx_quic_frame_t       *frame;
  1303.     ngx_quic_connection_t  *qc;

  1304.     pc = qs->parent;
  1305.     qc = ngx_quic_get_connection(pc);

  1306.     if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV) {
  1307.         return NGX_OK;
  1308.     }

  1309.     recv_max_data = qs->recv_offset + qs->recv_window;

  1310.     if (qs->recv_max_data == recv_max_data) {
  1311.         return NGX_OK;
  1312.     }

  1313.     qs->recv_max_data = recv_max_data;

  1314.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  1315.                    "quic stream id:0x%xL flow update msd:%uL",
  1316.                    qs->id, qs->recv_max_data);

  1317.     frame = ngx_quic_alloc_frame(pc);
  1318.     if (frame == NULL) {
  1319.         return NGX_ERROR;
  1320.     }

  1321.     frame->level = ssl_encryption_application;
  1322.     frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
  1323.     frame->u.max_stream_data.id = qs->id;
  1324.     frame->u.max_stream_data.limit = qs->recv_max_data;

  1325.     ngx_quic_queue_frame(qc, frame);

  1326.     return NGX_OK;
  1327. }


  1328. static ngx_int_t
  1329. ngx_quic_update_max_data(ngx_connection_t *c)
  1330. {
  1331.     uint64_t                recv_max_data;
  1332.     ngx_quic_frame_t       *frame;
  1333.     ngx_quic_connection_t  *qc;

  1334.     qc = ngx_quic_get_connection(c);

  1335.     recv_max_data = qc->streams.recv_offset + qc->streams.recv_window;

  1336.     if (qc->streams.recv_max_data == recv_max_data) {
  1337.         return NGX_OK;
  1338.     }

  1339.     qc->streams.recv_max_data = recv_max_data;

  1340.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1341.                    "quic flow update md:%uL", qc->streams.recv_max_data);

  1342.     frame = ngx_quic_alloc_frame(c);
  1343.     if (frame == NULL) {
  1344.         return NGX_ERROR;
  1345.     }

  1346.     frame->level = ssl_encryption_application;
  1347.     frame->type = NGX_QUIC_FT_MAX_DATA;
  1348.     frame->u.max_data.max_data = qc->streams.recv_max_data;

  1349.     ngx_quic_queue_frame(qc, frame);

  1350.     return NGX_OK;
  1351. }


  1352. static void
  1353. ngx_quic_set_event(ngx_event_t *ev)
  1354. {
  1355.     ev->ready = 1;

  1356.     if (ev->active) {
  1357.         ngx_post_event(ev, &ngx_posted_events);
  1358.     }
  1359. }