src/event/quic/ngx_event_quic_streams.c - nginx

Functions defined

Macros defined

Source code


  1. /*
  2. * Copyright (C) Nginx, Inc.
  3. */


  4. #include <ngx_config.h>
  5. #include <ngx_core.h>
  6. #include <ngx_event.h>
  7. #include <ngx_event_quic_connection.h>


  8. #define NGX_QUIC_STREAM_GONE     (void *) -1


  9. static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs,
  10.     ngx_uint_t err);
  11. static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c);
  12. static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c);
  13. static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
  14. static ngx_int_t ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id);
  15. static void ngx_quic_init_stream_handler(ngx_event_t *ev);
  16. static void ngx_quic_init_streams_handler(ngx_connection_t *c);
  17. static ngx_int_t ngx_quic_do_init_streams(ngx_connection_t *c);
  18. static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
  19.     uint64_t id);
  20. static void ngx_quic_empty_handler(ngx_event_t *ev);
  21. static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
  22.     size_t size);
  23. static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
  24.     size_t size);
  25. static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
  26.     ngx_chain_t *in, off_t limit);
  27. static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs);
  28. static void ngx_quic_stream_cleanup_handler(void *data);
  29. static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs);
  30. static ngx_int_t ngx_quic_can_shutdown(ngx_connection_t *c);
  31. static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last);
  32. static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last);
  33. static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs);
  34. static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c);
  35. static void ngx_quic_set_event(ngx_event_t *ev);


  36. ngx_connection_t *
  37. ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi)
  38. {
  39.     uint64_t                id;
  40.     ngx_connection_t       *pc, *sc;
  41.     ngx_quic_stream_t      *qs;
  42.     ngx_quic_connection_t  *qc;

  43.     pc = c->quic ? c->quic->parent : c;
  44.     qc = ngx_quic_get_connection(pc);

  45.     if (qc->closing) {
  46.         return NULL;
  47.     }

  48.     if (bidi) {
  49.         if (qc->streams.server_streams_bidi
  50.             >= qc->streams.server_max_streams_bidi)
  51.         {
  52.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  53.                            "quic too many server bidi streams:%uL",
  54.                            qc->streams.server_streams_bidi);
  55.             return NULL;
  56.         }

  57.         id = (qc->streams.server_streams_bidi << 2)
  58.              | NGX_QUIC_STREAM_SERVER_INITIATED;

  59.         ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
  60.                        "quic creating server bidi stream"
  61.                        " streams:%uL max:%uL id:0x%xL",
  62.                        qc->streams.server_streams_bidi,
  63.                        qc->streams.server_max_streams_bidi, id);

  64.         qc->streams.server_streams_bidi++;

  65.     } else {
  66.         if (qc->streams.server_streams_uni
  67.             >= qc->streams.server_max_streams_uni)
  68.         {
  69.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  70.                            "quic too many server uni streams:%uL",
  71.                            qc->streams.server_streams_uni);
  72.             return NULL;
  73.         }

  74.         id = (qc->streams.server_streams_uni << 2)
  75.              | NGX_QUIC_STREAM_SERVER_INITIATED
  76.              | NGX_QUIC_STREAM_UNIDIRECTIONAL;

  77.         ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
  78.                        "quic creating server uni stream"
  79.                        " streams:%uL max:%uL id:0x%xL",
  80.                        qc->streams.server_streams_uni,
  81.                        qc->streams.server_max_streams_uni, id);

  82.         qc->streams.server_streams_uni++;
  83.     }

  84.     qs = ngx_quic_create_stream(pc, id);
  85.     if (qs == NULL) {
  86.         return NULL;
  87.     }

  88.     sc = qs->connection;

  89.     sc->write->active = 1;
  90.     sc->write->ready = 1;

  91.     if (bidi) {
  92.         sc->read->active = 1;
  93.     }

  94.     return sc;
  95. }


  96. void
  97. ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
  98.     ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
  99. {
  100.     ngx_rbtree_node_t  **p;
  101.     ngx_quic_stream_t   *qn, *qnt;

  102.     for ( ;; ) {
  103.         qn = (ngx_quic_stream_t *) node;
  104.         qnt = (ngx_quic_stream_t *) temp;

  105.         p = (qn->id < qnt->id) ? &temp->left : &temp->right;

  106.         if (*p == sentinel) {
  107.             break;
  108.         }

  109.         temp = *p;
  110.     }

  111.     *p = node;
  112.     node->parent = temp;
  113.     node->left = sentinel;
  114.     node->right = sentinel;
  115.     ngx_rbt_red(node);
  116. }


  117. ngx_quic_stream_t *
  118. ngx_quic_find_stream(ngx_rbtree_t *rbtree, uint64_t id)
  119. {
  120.     ngx_rbtree_node_t  *node, *sentinel;
  121.     ngx_quic_stream_t  *qn;

  122.     node = rbtree->root;
  123.     sentinel = rbtree->sentinel;

  124.     while (node != sentinel) {
  125.         qn = (ngx_quic_stream_t *) node;

  126.         if (id == qn->id) {
  127.             return qn;
  128.         }

  129.         node = (id < qn->id) ? node->left : node->right;
  130.     }

  131.     return NULL;
  132. }


  133. ngx_int_t
  134. ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc)
  135. {
  136.     ngx_pool_t         *pool;
  137.     ngx_queue_t        *q, posted_events;
  138.     ngx_rbtree_t       *tree;
  139.     ngx_connection_t   *sc;
  140.     ngx_rbtree_node_t  *node;
  141.     ngx_quic_stream_t  *qs;

  142.     while (!ngx_queue_empty(&qc->streams.uninitialized)) {
  143.         q = ngx_queue_head(&qc->streams.uninitialized);
  144.         ngx_queue_remove(q);

  145.         qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
  146.         pool = qs->connection->pool;

  147.         ngx_close_connection(qs->connection);
  148.         ngx_destroy_pool(pool);
  149.     }

  150.     tree = &qc->streams.tree;

  151.     if (tree->root == tree->sentinel) {
  152.         return NGX_OK;
  153.     }

  154.     ngx_queue_init(&posted_events);

  155.     node = ngx_rbtree_min(tree->root, tree->sentinel);

  156.     while (node) {
  157.         qs = (ngx_quic_stream_t *) node;
  158.         node = ngx_rbtree_next(tree, node);
  159.         sc = qs->connection;

  160.         qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
  161.         qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;

  162.         if (sc == NULL) {
  163.             ngx_quic_close_stream(qs);
  164.             continue;
  165.         }

  166.         sc->read->error = 1;
  167.         sc->read->ready = 1;
  168.         sc->write->error = 1;
  169.         sc->write->ready = 1;

  170.         sc->close = 1;

  171.         if (sc->read->posted) {
  172.             ngx_delete_posted_event(sc->read);
  173.         }

  174.         ngx_post_event(sc->read, &posted_events);
  175.     }

  176.     ngx_event_process_posted((ngx_cycle_t *) ngx_cycle, &posted_events);

  177.     if (tree->root == tree->sentinel) {
  178.         return NGX_OK;
  179.     }

  180.     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
  181.                    "quic connection has active streams");

  182.     return NGX_AGAIN;
  183. }


  184. ngx_int_t
  185. ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
  186. {
  187.     return ngx_quic_do_reset_stream(c->quic, err);
  188. }


  189. static ngx_int_t
  190. ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err)
  191. {
  192.     ngx_connection_t       *pc;
  193.     ngx_quic_frame_t       *frame;
  194.     ngx_quic_connection_t  *qc;

  195.     if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
  196.         || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
  197.         || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
  198.     {
  199.         return NGX_OK;
  200.     }

  201.     qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
  202.     qs->send_final_size = qs->send_offset;

  203.     if (qs->connection) {
  204.         qs->connection->write->error = 1;
  205.     }

  206.     pc = qs->parent;
  207.     qc = ngx_quic_get_connection(pc);

  208.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  209.                    "quic stream id:0x%xL reset", qs->id);

  210.     frame = ngx_quic_alloc_frame(pc);
  211.     if (frame == NULL) {
  212.         return NGX_ERROR;
  213.     }

  214.     frame->level = NGX_QUIC_ENCRYPTION_APPLICATION;
  215.     frame->type = NGX_QUIC_FT_RESET_STREAM;
  216.     frame->u.reset_stream.id = qs->id;
  217.     frame->u.reset_stream.error_code = err;
  218.     frame->u.reset_stream.final_size = qs->send_offset;

  219.     ngx_quic_queue_frame(qc, frame);

  220.     ngx_quic_free_buffer(pc, &qs->send);

  221.     return NGX_OK;
  222. }


  223. ngx_int_t
  224. ngx_quic_shutdown_stream(ngx_connection_t *c, int how)
  225. {
  226.     if (how == NGX_RDWR_SHUTDOWN || how == NGX_WRITE_SHUTDOWN) {
  227.         if (ngx_quic_shutdown_stream_send(c) != NGX_OK) {
  228.             return NGX_ERROR;
  229.         }
  230.     }

  231.     if (how == NGX_RDWR_SHUTDOWN || how == NGX_READ_SHUTDOWN) {
  232.         if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) {
  233.             return NGX_ERROR;
  234.         }
  235.     }

  236.     return NGX_OK;
  237. }


  238. static ngx_int_t
  239. ngx_quic_shutdown_stream_send(ngx_connection_t *c)
  240. {
  241.     ngx_quic_stream_t  *qs;

  242.     qs = c->quic;

  243.     if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
  244.         && qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
  245.     {
  246.         return NGX_OK;
  247.     }

  248.     qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
  249.     qs->send_final_size = c->sent;

  250.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
  251.                    "quic stream id:0x%xL send shutdown", qs->id);

  252.     return ngx_quic_stream_flush(qs);
  253. }


  254. static ngx_int_t
  255. ngx_quic_shutdown_stream_recv(ngx_connection_t *c)
  256. {
  257.     ngx_connection_t       *pc;
  258.     ngx_quic_frame_t       *frame;
  259.     ngx_quic_stream_t      *qs;
  260.     ngx_quic_connection_t  *qc;

  261.     qs = c->quic;

  262.     if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
  263.         && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
  264.     {
  265.         return NGX_OK;
  266.     }

  267.     pc = qs->parent;
  268.     qc = ngx_quic_get_connection(pc);

  269.     if (qc->conf->stream_close_code == 0) {
  270.         return NGX_OK;
  271.     }

  272.     frame = ngx_quic_alloc_frame(pc);
  273.     if (frame == NULL) {
  274.         return NGX_ERROR;
  275.     }

  276.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  277.                    "quic stream id:0x%xL recv shutdown", qs->id);

  278.     frame->level = NGX_QUIC_ENCRYPTION_APPLICATION;
  279.     frame->type = NGX_QUIC_FT_STOP_SENDING;
  280.     frame->u.stop_sending.id = qs->id;
  281.     frame->u.stop_sending.error_code = qc->conf->stream_close_code;

  282.     ngx_quic_queue_frame(qc, frame);

  283.     return NGX_OK;
  284. }


  285. static ngx_quic_stream_t *
  286. ngx_quic_get_stream(ngx_connection_t *c, uint64_t id)
  287. {
  288.     uint64_t                min_id;
  289.     ngx_event_t            *rev;
  290.     ngx_quic_stream_t      *qs;
  291.     ngx_quic_connection_t  *qc;

  292.     qc = ngx_quic_get_connection(c);

  293.     qs = ngx_quic_find_stream(&qc->streams.tree, id);

  294.     if (qs) {
  295.         return qs;
  296.     }

  297.     if (qc->shutdown || qc->closing) {
  298.         return NGX_QUIC_STREAM_GONE;
  299.     }

  300.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  301.                    "quic stream id:0x%xL is missing", id);

  302.     if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {

  303.         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
  304.             if ((id >> 2) < qc->streams.server_streams_uni) {
  305.                 return NGX_QUIC_STREAM_GONE;
  306.             }

  307.             qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  308.             return NULL;
  309.         }

  310.         if ((id >> 2) < qc->streams.client_streams_uni) {
  311.             return NGX_QUIC_STREAM_GONE;
  312.         }

  313.         if ((id >> 2) >= qc->streams.client_max_streams_uni) {
  314.             qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
  315.             return NULL;
  316.         }

  317.         min_id = (qc->streams.client_streams_uni << 2)
  318.                  | NGX_QUIC_STREAM_UNIDIRECTIONAL;
  319.         qc->streams.client_streams_uni = (id >> 2) + 1;

  320.     } else {

  321.         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
  322.             if ((id >> 2) < qc->streams.server_streams_bidi) {
  323.                 return NGX_QUIC_STREAM_GONE;
  324.             }

  325.             qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  326.             return NULL;
  327.         }

  328.         if ((id >> 2) < qc->streams.client_streams_bidi) {
  329.             return NGX_QUIC_STREAM_GONE;
  330.         }

  331.         if ((id >> 2) >= qc->streams.client_max_streams_bidi) {
  332.             qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
  333.             return NULL;
  334.         }

  335.         min_id = (qc->streams.client_streams_bidi << 2);
  336.         qc->streams.client_streams_bidi = (id >> 2) + 1;
  337.     }

  338.     /*
  339.      * RFC 9000, 2.1.  Stream Types and Identifiers
  340.      *
  341.      * successive streams of each type are created with numerically increasing
  342.      * stream IDs.  A stream ID that is used out of order results in all
  343.      * streams of that type with lower-numbered stream IDs also being opened.
  344.      */

  345. #if (NGX_SUPPRESS_WARN)
  346.     qs = NULL;
  347. #endif

  348.     for ( /* void */ ; min_id <= id; min_id += 0x04) {

  349.         qs = ngx_quic_create_stream(c, min_id);

  350.         if (qs == NULL) {
  351.             if (ngx_quic_reject_stream(c, min_id) != NGX_OK) {
  352.                 return NULL;
  353.             }

  354.             continue;
  355.         }

  356.         ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue);

  357.         rev = qs->connection->read;
  358.         rev->handler = ngx_quic_init_stream_handler;

  359.         if (qc->streams.initialized) {
  360.             ngx_post_event(rev, &ngx_posted_events);

  361.             if (qc->push.posted) {
  362.                 /*
  363.                  * The posted stream can produce output immediately.
  364.                  * By postponing the push event, we coalesce the stream
  365.                  * output with queued frames in one UDP datagram.
  366.                  */

  367.                 ngx_delete_posted_event(&qc->push);
  368.                 ngx_post_event(&qc->push, &ngx_posted_events);
  369.             }
  370.         }
  371.     }

  372.     if (qs == NULL) {
  373.         return NGX_QUIC_STREAM_GONE;
  374.     }

  375.     return qs;
  376. }


  377. static ngx_int_t
  378. ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id)
  379. {
  380.     uint64_t                code;
  381.     ngx_quic_frame_t       *frame;
  382.     ngx_quic_connection_t  *qc;

  383.     qc = ngx_quic_get_connection(c);

  384.     code = (id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  385.            ? qc->conf->stream_reject_code_uni
  386.            : qc->conf->stream_reject_code_bidi;

  387.     if (code == 0) {
  388.         return NGX_DECLINED;
  389.     }

  390.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
  391.                    "quic stream id:0x%xL reject err:0x%xL", id, code);

  392.     frame = ngx_quic_alloc_frame(c);
  393.     if (frame == NULL) {
  394.         return NGX_ERROR;
  395.     }

  396.     frame->level = NGX_QUIC_ENCRYPTION_APPLICATION;
  397.     frame->type = NGX_QUIC_FT_RESET_STREAM;
  398.     frame->u.reset_stream.id = id;
  399.     frame->u.reset_stream.error_code = code;
  400.     frame->u.reset_stream.final_size = 0;

  401.     ngx_quic_queue_frame(qc, frame);

  402.     frame = ngx_quic_alloc_frame(c);
  403.     if (frame == NULL) {
  404.         return NGX_ERROR;
  405.     }

  406.     frame->level = NGX_QUIC_ENCRYPTION_APPLICATION;
  407.     frame->type = NGX_QUIC_FT_STOP_SENDING;
  408.     frame->u.stop_sending.id = id;
  409.     frame->u.stop_sending.error_code = code;

  410.     ngx_quic_queue_frame(qc, frame);

  411.     return NGX_OK;
  412. }


  413. static void
  414. ngx_quic_init_stream_handler(ngx_event_t *ev)
  415. {
  416.     ngx_connection_t   *c;
  417.     ngx_quic_stream_t  *qs;

  418.     c = ev->data;
  419.     qs = c->quic;

  420.     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic init stream");

  421.     if ((qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
  422.         c->write->active = 1;
  423.         c->write->ready = 1;
  424.     }

  425.     c->read->active = 1;

  426.     ngx_queue_remove(&qs->queue);

  427.     c->listening->handler(c);
  428. }


  429. ngx_int_t
  430. ngx_quic_init_streams(ngx_connection_t *c)
  431. {
  432.     ngx_int_t               rc;
  433.     ngx_quic_connection_t  *qc;

  434.     qc = ngx_quic_get_connection(c);

  435.     if (qc->streams.initialized) {
  436.         return NGX_OK;
  437.     }

  438.     rc = ngx_ssl_ocsp_validate(c);

  439.     if (rc == NGX_ERROR) {
  440.         return NGX_ERROR;
  441.     }

  442.     if (rc == NGX_AGAIN) {
  443.         c->ssl->handler = ngx_quic_init_streams_handler;
  444.         return NGX_OK;
  445.     }

  446.     return ngx_quic_do_init_streams(c);
  447. }


  448. static void
  449. ngx_quic_init_streams_handler(ngx_connection_t *c)
  450. {
  451.     if (ngx_quic_do_init_streams(c) != NGX_OK) {
  452.         ngx_quic_close_connection(c, NGX_ERROR);
  453.     }
  454. }


  455. static ngx_int_t
  456. ngx_quic_do_init_streams(ngx_connection_t *c)
  457. {
  458.     ngx_queue_t            *q;
  459.     ngx_quic_stream_t      *qs;
  460.     ngx_quic_connection_t  *qc;

  461.     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic init streams");

  462.     qc = ngx_quic_get_connection(c);

  463.     if (qc->conf->init) {
  464.         if (qc->conf->init(c) != NGX_OK) {
  465.             return NGX_ERROR;
  466.         }
  467.     }

  468.     for (q = ngx_queue_head(&qc->streams.uninitialized);
  469.          q != ngx_queue_sentinel(&qc->streams.uninitialized);
  470.          q = ngx_queue_next(q))
  471.     {
  472.         qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
  473.         ngx_post_event(qs->connection->read, &ngx_posted_events);
  474.     }

  475.     qc->streams.initialized = 1;

  476.     if (!qc->closing && qc->close.timer_set) {
  477.         ngx_del_timer(&qc->close);
  478.     }

  479.     return NGX_OK;
  480. }


  481. static ngx_quic_stream_t *
  482. ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
  483. {
  484.     ngx_str_t               addr_text;
  485.     ngx_log_t              *log;
  486.     ngx_pool_t             *pool;
  487.     ngx_uint_t              reusable;
  488.     ngx_queue_t            *q;
  489.     struct sockaddr        *sockaddr;
  490.     ngx_connection_t       *sc;
  491.     ngx_quic_stream_t      *qs;
  492.     ngx_pool_cleanup_t     *cln;
  493.     ngx_quic_connection_t  *qc;

  494.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  495.                    "quic stream id:0x%xL create", id);

  496.     qc = ngx_quic_get_connection(c);

  497.     if (!ngx_queue_empty(&qc->streams.free)) {
  498.         q = ngx_queue_head(&qc->streams.free);
  499.         qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
  500.         ngx_queue_remove(&qs->queue);

  501.     } else {
  502.         /*
  503.          * the number of streams is limited by transport
  504.          * parameters and application requirements
  505.          */

  506.         qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t));
  507.         if (qs == NULL) {
  508.             return NULL;
  509.         }
  510.     }

  511.     ngx_memzero(qs, sizeof(ngx_quic_stream_t));

  512.     qs->node.key = id;
  513.     qs->parent = c;
  514.     qs->id = id;
  515.     qs->send_final_size = (uint64_t) -1;
  516.     qs->recv_final_size = (uint64_t) -1;

  517.     pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
  518.     if (pool == NULL) {
  519.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  520.         return NULL;
  521.     }

  522.     log = ngx_palloc(pool, sizeof(ngx_log_t));
  523.     if (log == NULL) {
  524.         ngx_destroy_pool(pool);
  525.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  526.         return NULL;
  527.     }

  528.     *log = *c->log;
  529.     pool->log = log;

  530.     sockaddr = ngx_palloc(pool, c->socklen);
  531.     if (sockaddr == NULL) {
  532.         ngx_destroy_pool(pool);
  533.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  534.         return NULL;
  535.     }

  536.     ngx_memcpy(sockaddr, c->sockaddr, c->socklen);

  537.     if (c->addr_text.data) {
  538.         addr_text.data = ngx_pnalloc(pool, c->addr_text.len);
  539.         if (addr_text.data == NULL) {
  540.             ngx_destroy_pool(pool);
  541.             ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  542.             return NULL;
  543.         }

  544.         ngx_memcpy(addr_text.data, c->addr_text.data, c->addr_text.len);
  545.         addr_text.len = c->addr_text.len;

  546.     } else {
  547.         addr_text.len = 0;
  548.         addr_text.data = NULL;
  549.     }

  550.     reusable = c->reusable;
  551.     ngx_reusable_connection(c, 0);

  552.     sc = ngx_get_connection(c->fd, log);
  553.     if (sc == NULL) {
  554.         ngx_destroy_pool(pool);
  555.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  556.         ngx_reusable_connection(c, reusable);
  557.         return NULL;
  558.     }

  559.     qs->connection = sc;

  560.     sc->quic = qs;
  561.     sc->shared = 1;
  562.     sc->type = SOCK_STREAM;
  563.     sc->pool = pool;
  564.     sc->ssl = c->ssl;
  565.     sc->sockaddr = sockaddr;
  566.     sc->socklen = c->socklen;
  567.     sc->listening = c->listening;
  568.     sc->addr_text = addr_text;
  569.     sc->local_sockaddr = c->local_sockaddr;
  570.     sc->local_socklen = c->local_socklen;
  571.     sc->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
  572.     sc->start_time = c->start_time;
  573.     sc->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;

  574.     sc->recv = ngx_quic_stream_recv;
  575.     sc->send = ngx_quic_stream_send;
  576.     sc->send_chain = ngx_quic_stream_send_chain;

  577.     sc->read->log = log;
  578.     sc->write->log = log;

  579.     sc->read->handler = ngx_quic_empty_handler;
  580.     sc->write->handler = ngx_quic_empty_handler;

  581.     log->connection = sc->number;

  582.     if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
  583.         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
  584.             qs->send_max_data = qc->ctp.initial_max_stream_data_uni;
  585.             qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
  586.             qs->send_state = NGX_QUIC_STREAM_SEND_READY;

  587.         } else {
  588.             qs->recv_max_data = qc->tp.initial_max_stream_data_uni;
  589.             qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
  590.             qs->send_state = NGX_QUIC_STREAM_SEND_DATA_RECVD;
  591.         }

  592.     } else {
  593.         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
  594.             qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
  595.             qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_local;

  596.         } else {
  597.             qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
  598.             qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote;
  599.         }

  600.         qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
  601.         qs->send_state = NGX_QUIC_STREAM_SEND_READY;
  602.     }

  603.     qs->recv_window = qs->recv_max_data;

  604.     cln = ngx_pool_cleanup_add(pool, 0);
  605.     if (cln == NULL) {
  606.         ngx_close_connection(sc);
  607.         ngx_destroy_pool(pool);
  608.         ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
  609.         ngx_reusable_connection(c, reusable);
  610.         return NULL;
  611.     }

  612.     cln->handler = ngx_quic_stream_cleanup_handler;
  613.     cln->data = sc;

  614.     ngx_rbtree_insert(&qc->streams.tree, &qs->node);

  615.     return qs;
  616. }


  617. void
  618. ngx_quic_cancelable_stream(ngx_connection_t *c)
  619. {
  620.     ngx_connection_t       *pc;
  621.     ngx_quic_stream_t      *qs;
  622.     ngx_quic_connection_t  *qc;

  623.     qs = c->quic;
  624.     pc = qs->parent;
  625.     qc = ngx_quic_get_connection(pc);

  626.     if (!qs->cancelable) {
  627.         qs->cancelable = 1;

  628.         if (ngx_quic_can_shutdown(pc) == NGX_OK) {
  629.             ngx_reusable_connection(pc, 1);

  630.             if (qc->shutdown) {
  631.                 ngx_quic_shutdown_quic(pc);
  632.             }
  633.         }
  634.     }
  635. }


  636. static void
  637. ngx_quic_empty_handler(ngx_event_t *ev)
  638. {
  639. }


  640. static ssize_t
  641. ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
  642. {
  643.     ssize_t             len;
  644.     ngx_buf_t          *b;
  645.     ngx_chain_t        *cl, *in;
  646.     ngx_event_t        *rev;
  647.     ngx_connection_t   *pc;
  648.     ngx_quic_stream_t  *qs;

  649.     qs = c->quic;
  650.     pc = qs->parent;
  651.     rev = c->read;

  652.     if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
  653.         || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
  654.     {
  655.         qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_READ;
  656.         return NGX_ERROR;
  657.     }

  658.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  659.                    "quic stream id:0x%xL recv buf:%uz", qs->id, size);

  660.     if (size == 0) {
  661.         return 0;
  662.     }

  663.     in = ngx_quic_read_buffer(pc, &qs->recv, size);
  664.     if (in == NGX_CHAIN_ERROR) {
  665.         return NGX_ERROR;
  666.     }

  667.     len = 0;

  668.     for (cl = in; cl; cl = cl->next) {
  669.         b = cl->buf;
  670.         len += b->last - b->pos;
  671.         buf = ngx_cpymem(buf, b->pos, b->last - b->pos);
  672.     }

  673.     ngx_quic_free_chain(pc, in);

  674.     if (len == 0) {
  675.         rev->ready = 0;

  676.         if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD
  677.             && qs->recv_offset == qs->recv_final_size)
  678.         {
  679.             qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
  680.         }

  681.         if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_READ) {
  682.             rev->eof = 1;
  683.             return 0;
  684.         }

  685.         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  686.                        "quic stream id:0x%xL recv() not ready", qs->id);
  687.         return NGX_AGAIN;
  688.     }

  689.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
  690.                    "quic stream id:0x%xL recv len:%z", qs->id, len);

  691.     if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) {
  692.         return NGX_ERROR;
  693.     }

  694.     return len;
  695. }


  696. static ssize_t
  697. ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
  698. {
  699.     ngx_buf_t    b;
  700.     ngx_chain_t  cl;

  701.     ngx_memzero(&b, sizeof(ngx_buf_t));

  702.     b.memory = 1;
  703.     b.pos = buf;
  704.     b.last = buf + size;

  705.     cl.buf = &b;
  706.     cl.next = NULL;

  707.     if (ngx_quic_stream_send_chain(c, &cl, 0) == NGX_CHAIN_ERROR) {
  708.         return NGX_ERROR;
  709.     }

  710.     if (b.pos == buf) {
  711.         return NGX_AGAIN;
  712.     }

  713.     return b.pos - buf;
  714. }


  715. static ngx_chain_t *
  716. ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
  717. {
  718.     uint64_t                n, flow;
  719.     ngx_event_t            *wev;
  720.     ngx_connection_t       *pc;
  721.     ngx_quic_stream_t      *qs;
  722.     ngx_quic_connection_t  *qc;

  723.     qs = c->quic;
  724.     pc = qs->parent;
  725.     qc = ngx_quic_get_connection(pc);
  726.     wev = c->write;

  727.     if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
  728.         && qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
  729.     {
  730.         wev->error = 1;
  731.         return NGX_CHAIN_ERROR;
  732.     }

  733.     qs->send_state = NGX_QUIC_STREAM_SEND_SEND;

  734.     flow = qs->acked + qc->conf->stream_buffer_size - qs->sent;

  735.     if (flow == 0) {
  736.         wev->ready = 0;
  737.         return in;
  738.     }

  739.     if (limit == 0 || limit > (off_t) flow) {
  740.         limit = flow;
  741.     }

  742.     n = qs->send.size;

  743.     in = ngx_quic_write_buffer(pc, &qs->send, in, limit, qs->sent);
  744.     if (in == NGX_CHAIN_ERROR) {
  745.         return NGX_CHAIN_ERROR;
  746.     }

  747.     n = qs->send.size - n;
  748.     c->sent += n;
  749.     qs->sent += n;
  750.     qc->streams.sent += n;

  751.     if (flow == n) {
  752.         wev->ready = 0;
  753.     }

  754.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  755.                    "quic send_chain sent:%uL", n);

  756.     if (ngx_quic_stream_flush(qs) != NGX_OK) {
  757.         return NGX_CHAIN_ERROR;
  758.     }

  759.     return in;
  760. }


  761. static ngx_int_t
  762. ngx_quic_stream_flush(ngx_quic_stream_t *qs)
  763. {
  764.     off_t                   limit, len;
  765.     ngx_uint_t              last;
  766.     ngx_chain_t            *out;
  767.     ngx_quic_frame_t       *frame;
  768.     ngx_connection_t       *pc;
  769.     ngx_quic_connection_t  *qc;

  770.     if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
  771.         return NGX_OK;
  772.     }

  773.     pc = qs->parent;
  774.     qc = ngx_quic_get_connection(pc);

  775.     if (qc->streams.send_max_data == 0) {
  776.         qc->streams.send_max_data = qc->ctp.initial_max_data;
  777.     }

  778.     limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset,
  779.                     qs->send_max_data - qs->send_offset);

  780.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  781.                    "quic stream id:0x%xL flush limit:%O", qs->id, limit);

  782.     len = qs->send.offset;

  783.     out = ngx_quic_read_buffer(pc, &qs->send, limit);
  784.     if (out == NGX_CHAIN_ERROR) {
  785.         return NGX_ERROR;
  786.     }

  787.     len = qs->send.offset - len;
  788.     last = 0;

  789.     if (qs->send_final_size != (uint64_t) -1
  790.         && qs->send_final_size == qs->send.offset)
  791.     {
  792.         qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
  793.         last = 1;
  794.     }

  795.     if (len == 0 && !last) {
  796.         return NGX_OK;
  797.     }

  798.     frame = ngx_quic_alloc_frame(pc);
  799.     if (frame == NULL) {
  800.         return NGX_ERROR;
  801.     }

  802.     frame->level = NGX_QUIC_ENCRYPTION_APPLICATION;
  803.     frame->type = NGX_QUIC_FT_STREAM;
  804.     frame->data = out;

  805.     frame->u.stream.off = 1;
  806.     frame->u.stream.len = 1;
  807.     frame->u.stream.fin = last;

  808.     frame->u.stream.stream_id = qs->id;
  809.     frame->u.stream.offset = qs->send_offset;
  810.     frame->u.stream.length = len;

  811.     ngx_quic_queue_frame(qc, frame);

  812.     qs->send_offset += len;
  813.     qc->streams.send_offset += len;

  814.     ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  815.                    "quic stream id:0x%xL flush len:%O last:%ui",
  816.                    qs->id, len, last);

  817.     if (qs->connection == NULL) {
  818.         return ngx_quic_close_stream(qs);
  819.     }

  820.     return NGX_OK;
  821. }


  822. static void
  823. ngx_quic_stream_cleanup_handler(void *data)
  824. {
  825.     ngx_connection_t *c = data;

  826.     ngx_quic_stream_t      *qs;
  827.     ngx_quic_connection_t  *qc;

  828.     qs = c->quic;

  829.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
  830.                    "quic stream id:0x%xL cleanup", qs->id);

  831.     if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) {
  832.         qs->connection = NULL;
  833.         goto failed;
  834.     }

  835.     qs->connection = NULL;

  836.     if (ngx_quic_close_stream(qs) != NGX_OK) {
  837.         goto failed;
  838.     }

  839.     return;

  840. failed:

  841.     qc = ngx_quic_get_connection(qs->parent);
  842.     qc->error = NGX_QUIC_ERR_INTERNAL_ERROR;

  843.     ngx_post_event(&qc->close, &ngx_posted_events);
  844. }


  845. static ngx_int_t
  846. ngx_quic_close_stream(ngx_quic_stream_t *qs)
  847. {
  848.     ngx_connection_t       *pc;
  849.     ngx_quic_frame_t       *frame;
  850.     ngx_quic_connection_t  *qc;

  851.     pc = qs->parent;
  852.     qc = ngx_quic_get_connection(pc);

  853.     if (!qc->closing) {
  854.         /* make sure everything is sent and final size is received */

  855.         if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV) {
  856.             return NGX_OK;
  857.         }

  858.         if (qs->send_state != NGX_QUIC_STREAM_SEND_DATA_RECVD
  859.             && qs->send_state != NGX_QUIC_STREAM_SEND_RESET_RECVD)
  860.         {
  861.             return NGX_OK;
  862.         }
  863.     }

  864.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  865.                    "quic stream id:0x%xL close", qs->id);

  866.     ngx_quic_free_buffer(pc, &qs->send);
  867.     ngx_quic_free_buffer(pc, &qs->recv);

  868.     ngx_rbtree_delete(&qc->streams.tree, &qs->node);
  869.     ngx_queue_insert_tail(&qc->streams.free, &qs->queue);

  870.     if (qc->closing) {
  871.         /* schedule handler call to continue ngx_quic_close_connection() */
  872.         ngx_post_event(&qc->close, &ngx_posted_events);
  873.         return NGX_OK;
  874.     }

  875.     if (!pc->reusable && ngx_quic_can_shutdown(pc) == NGX_OK) {
  876.         ngx_reusable_connection(pc, 1);
  877.     }

  878.     if (qc->shutdown) {
  879.         ngx_quic_shutdown_quic(pc);
  880.         return NGX_OK;
  881.     }

  882.     if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
  883.         frame = ngx_quic_alloc_frame(pc);
  884.         if (frame == NULL) {
  885.             return NGX_ERROR;
  886.         }

  887.         frame->level = NGX_QUIC_ENCRYPTION_APPLICATION;
  888.         frame->type = NGX_QUIC_FT_MAX_STREAMS;

  889.         if (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
  890.             frame->u.max_streams.limit = ++qc->streams.client_max_streams_uni;
  891.             frame->u.max_streams.bidi = 0;

  892.         } else {
  893.             frame->u.max_streams.limit = ++qc->streams.client_max_streams_bidi;
  894.             frame->u.max_streams.bidi = 1;
  895.         }

  896.         ngx_quic_queue_frame(qc, frame);
  897.     }

  898.     return NGX_OK;
  899. }


  900. static ngx_int_t
  901. ngx_quic_can_shutdown(ngx_connection_t *c)
  902. {
  903.     ngx_rbtree_t           *tree;
  904.     ngx_rbtree_node_t      *node;
  905.     ngx_quic_stream_t      *qs;
  906.     ngx_quic_connection_t  *qc;

  907.     qc = ngx_quic_get_connection(c);

  908.     tree = &qc->streams.tree;

  909.     if (tree->root != tree->sentinel) {
  910.         for (node = ngx_rbtree_min(tree->root, tree->sentinel);
  911.              node;
  912.              node = ngx_rbtree_next(tree, node))
  913.         {
  914.             qs = (ngx_quic_stream_t *) node;

  915.             if (!qs->cancelable) {
  916.                 return NGX_DECLINED;
  917.             }
  918.         }
  919.     }

  920.     return NGX_OK;
  921. }


  922. ngx_int_t
  923. ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
  924.     ngx_quic_frame_t *frame)
  925. {
  926.     uint64_t                  last;
  927.     ngx_quic_stream_t        *qs;
  928.     ngx_quic_connection_t    *qc;
  929.     ngx_quic_stream_frame_t  *f;

  930.     qc = ngx_quic_get_connection(c);
  931.     f = &frame->u.stream;

  932.     if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  933.         && (f->stream_id & NGX_QUIC_STREAM_SERVER_INITIATED))
  934.     {
  935.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  936.         return NGX_ERROR;
  937.     }

  938.     /* no overflow since both values are 62-bit */
  939.     last = f->offset + f->length;

  940.     qs = ngx_quic_get_stream(c, f->stream_id);

  941.     if (qs == NULL) {
  942.         return NGX_ERROR;
  943.     }

  944.     if (qs == NGX_QUIC_STREAM_GONE) {
  945.         return NGX_OK;
  946.     }

  947.     if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
  948.         && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
  949.     {
  950.         return NGX_OK;
  951.     }

  952.     if (ngx_quic_control_flow(qs, last) != NGX_OK) {
  953.         return NGX_ERROR;
  954.     }

  955.     if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) {
  956.         qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  957.         return NGX_ERROR;
  958.     }

  959.     if (last < qs->recv_offset) {
  960.         return NGX_OK;
  961.     }

  962.     if (f->fin) {
  963.         if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last)
  964.         {
  965.             qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  966.             return NGX_ERROR;
  967.         }

  968.         if (qs->recv_last > last) {
  969.             qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  970.             return NGX_ERROR;
  971.         }

  972.         qs->recv_final_size = last;
  973.         qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
  974.     }

  975.     if (ngx_quic_write_buffer(c, &qs->recv, frame->data, f->length, f->offset)
  976.         == NGX_CHAIN_ERROR)
  977.     {
  978.         return NGX_ERROR;
  979.     }

  980.     if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
  981.         && qs->recv.size == qs->recv_final_size)
  982.     {
  983.         qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD;
  984.     }

  985.     if (qs->connection == NULL) {
  986.         return ngx_quic_close_stream(qs);
  987.     }

  988.     if (f->offset <= qs->recv_offset) {
  989.         ngx_quic_set_event(qs->connection->read);
  990.     }

  991.     return NGX_OK;
  992. }


  993. ngx_int_t
  994. ngx_quic_handle_max_data_frame(ngx_connection_t *c,
  995.     ngx_quic_max_data_frame_t *f)
  996. {
  997.     ngx_rbtree_t           *tree;
  998.     ngx_rbtree_node_t      *node;
  999.     ngx_quic_stream_t      *qs;
  1000.     ngx_quic_connection_t  *qc;

  1001.     qc = ngx_quic_get_connection(c);
  1002.     tree = &qc->streams.tree;

  1003.     if (f->max_data <= qc->streams.send_max_data) {
  1004.         return NGX_OK;
  1005.     }

  1006.     if (tree->root == tree->sentinel
  1007.         || qc->streams.send_offset < qc->streams.send_max_data)
  1008.     {
  1009.         /* not blocked on MAX_DATA */
  1010.         qc->streams.send_max_data = f->max_data;
  1011.         return NGX_OK;
  1012.     }

  1013.     qc->streams.send_max_data = f->max_data;
  1014.     node = ngx_rbtree_min(tree->root, tree->sentinel);

  1015.     while (node && qc->streams.send_offset < qc->streams.send_max_data) {

  1016.         qs = (ngx_quic_stream_t *) node;
  1017.         node = ngx_rbtree_next(tree, node);

  1018.         if (ngx_quic_stream_flush(qs) != NGX_OK) {
  1019.             return NGX_ERROR;
  1020.         }
  1021.     }

  1022.     return NGX_OK;
  1023. }


  1024. ngx_int_t
  1025. ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
  1026.     ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f)
  1027. {
  1028.     return NGX_OK;
  1029. }


  1030. ngx_int_t
  1031. ngx_quic_handle_data_blocked_frame(ngx_connection_t *c,
  1032.     ngx_quic_header_t *pkt, ngx_quic_data_blocked_frame_t *f)
  1033. {
  1034.     return ngx_quic_update_max_data(c);
  1035. }


  1036. ngx_int_t
  1037. ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
  1038.     ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
  1039. {
  1040.     ngx_quic_stream_t      *qs;
  1041.     ngx_quic_connection_t  *qc;

  1042.     qc = ngx_quic_get_connection(c);

  1043.     if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  1044.         && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
  1045.     {
  1046.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  1047.         return NGX_ERROR;
  1048.     }

  1049.     qs = ngx_quic_get_stream(c, f->id);

  1050.     if (qs == NULL) {
  1051.         return NGX_ERROR;
  1052.     }

  1053.     if (qs == NGX_QUIC_STREAM_GONE) {
  1054.         return NGX_OK;
  1055.     }

  1056.     return ngx_quic_update_max_stream_data(qs);
  1057. }


  1058. ngx_int_t
  1059. ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
  1060.     ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
  1061. {
  1062.     ngx_quic_stream_t      *qs;
  1063.     ngx_quic_connection_t  *qc;

  1064.     qc = ngx_quic_get_connection(c);

  1065.     if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  1066.         && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
  1067.     {
  1068.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  1069.         return NGX_ERROR;
  1070.     }

  1071.     qs = ngx_quic_get_stream(c, f->id);

  1072.     if (qs == NULL) {
  1073.         return NGX_ERROR;
  1074.     }

  1075.     if (qs == NGX_QUIC_STREAM_GONE) {
  1076.         return NGX_OK;
  1077.     }

  1078.     if (f->limit <= qs->send_max_data) {
  1079.         return NGX_OK;
  1080.     }

  1081.     if (qs->send_offset < qs->send_max_data) {
  1082.         /* not blocked on MAX_STREAM_DATA */
  1083.         qs->send_max_data = f->limit;
  1084.         return NGX_OK;
  1085.     }

  1086.     qs->send_max_data = f->limit;

  1087.     return ngx_quic_stream_flush(qs);
  1088. }


  1089. ngx_int_t
  1090. ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
  1091.     ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
  1092. {
  1093.     ngx_event_t            *rev;
  1094.     ngx_quic_stream_t      *qs;
  1095.     ngx_quic_connection_t  *qc;

  1096.     qc = ngx_quic_get_connection(c);

  1097.     if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  1098.         && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED))
  1099.     {
  1100.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  1101.         return NGX_ERROR;
  1102.     }

  1103.     qs = ngx_quic_get_stream(c, f->id);

  1104.     if (qs == NULL) {
  1105.         return NGX_ERROR;
  1106.     }

  1107.     if (qs == NGX_QUIC_STREAM_GONE) {
  1108.         return NGX_OK;
  1109.     }

  1110.     if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
  1111.         || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
  1112.     {
  1113.         return NGX_OK;
  1114.     }

  1115.     qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;

  1116.     if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) {
  1117.         return NGX_ERROR;
  1118.     }

  1119.     if (qs->recv_final_size != (uint64_t) -1
  1120.         && qs->recv_final_size != f->final_size)
  1121.     {
  1122.         qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  1123.         return NGX_ERROR;
  1124.     }

  1125.     if (qs->recv_last > f->final_size) {
  1126.         qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
  1127.         return NGX_ERROR;
  1128.     }

  1129.     qs->recv_final_size = f->final_size;

  1130.     if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) {
  1131.         return NGX_ERROR;
  1132.     }

  1133.     if (qs->connection == NULL) {
  1134.         return ngx_quic_close_stream(qs);
  1135.     }

  1136.     rev = qs->connection->read;
  1137.     rev->error = 1;

  1138.     ngx_quic_set_event(rev);

  1139.     return NGX_OK;
  1140. }


  1141. ngx_int_t
  1142. ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
  1143.     ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f)
  1144. {
  1145.     ngx_quic_stream_t      *qs;
  1146.     ngx_quic_connection_t  *qc;

  1147.     qc = ngx_quic_get_connection(c);

  1148.     if ((f->id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
  1149.         && (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0)
  1150.     {
  1151.         qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
  1152.         return NGX_ERROR;
  1153.     }

  1154.     qs = ngx_quic_get_stream(c, f->id);

  1155.     if (qs == NULL) {
  1156.         return NGX_ERROR;
  1157.     }

  1158.     if (qs == NGX_QUIC_STREAM_GONE) {
  1159.         return NGX_OK;
  1160.     }

  1161.     if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) {
  1162.         return NGX_ERROR;
  1163.     }

  1164.     if (qs->connection == NULL) {
  1165.         return ngx_quic_close_stream(qs);
  1166.     }

  1167.     ngx_quic_set_event(qs->connection->write);

  1168.     return NGX_OK;
  1169. }


  1170. ngx_int_t
  1171. ngx_quic_handle_max_streams_frame(ngx_connection_t *c,
  1172.     ngx_quic_header_t *pkt, ngx_quic_max_streams_frame_t *f)
  1173. {
  1174.     ngx_quic_connection_t  *qc;

  1175.     qc = ngx_quic_get_connection(c);

  1176.     if (f->bidi) {
  1177.         if (qc->streams.server_max_streams_bidi < f->limit) {
  1178.             qc->streams.server_max_streams_bidi = f->limit;

  1179.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1180.                            "quic max_streams_bidi:%uL", f->limit);
  1181.         }

  1182.     } else {
  1183.         if (qc->streams.server_max_streams_uni < f->limit) {
  1184.             qc->streams.server_max_streams_uni = f->limit;

  1185.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1186.                            "quic max_streams_uni:%uL", f->limit);
  1187.         }
  1188.     }

  1189.     return NGX_OK;
  1190. }


  1191. void
  1192. ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
  1193. {
  1194.     uint64_t                acked;
  1195.     ngx_quic_stream_t      *qs;
  1196.     ngx_quic_connection_t  *qc;

  1197.     qc = ngx_quic_get_connection(c);

  1198.     switch (f->type) {

  1199.     case NGX_QUIC_FT_RESET_STREAM:

  1200.         qs = ngx_quic_find_stream(&qc->streams.tree, f->u.reset_stream.id);
  1201.         if (qs == NULL) {
  1202.             return;
  1203.         }

  1204.         qs->send_state = NGX_QUIC_STREAM_SEND_RESET_RECVD;

  1205.         ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1206.                        "quic stream id:0x%xL ack reset final_size:%uL",
  1207.                        qs->id, f->u.reset_stream.final_size);

  1208.         break;

  1209.     case NGX_QUIC_FT_STREAM:

  1210.         qs = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
  1211.         if (qs == NULL) {
  1212.             return;
  1213.         }

  1214.         acked = qs->acked;
  1215.         qs->acked += f->u.stream.length;

  1216.         if (f->u.stream.fin) {
  1217.             qs->fin_acked = 1;
  1218.         }

  1219.         if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_SENT
  1220.             && qs->acked == qs->sent && qs->fin_acked)
  1221.         {
  1222.             qs->send_state = NGX_QUIC_STREAM_SEND_DATA_RECVD;
  1223.         }

  1224.         ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1225.                        "quic stream id:0x%xL ack len:%uL fin:%d unacked:%uL",
  1226.                        qs->id, f->u.stream.length, f->u.stream.fin,
  1227.                        qs->sent - qs->acked);

  1228.         if (qs->connection
  1229.             && qs->sent - acked == qc->conf->stream_buffer_size
  1230.             && f->u.stream.length > 0)
  1231.         {
  1232.             ngx_quic_set_event(qs->connection->write);
  1233.         }

  1234.         break;

  1235.     default:
  1236.         return;
  1237.     }

  1238.     if (qs->connection == NULL) {
  1239.         ngx_quic_close_stream(qs);
  1240.     }
  1241. }


  1242. static ngx_int_t
  1243. ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last)
  1244. {
  1245.     uint64_t                len;
  1246.     ngx_connection_t       *pc;
  1247.     ngx_quic_connection_t  *qc;

  1248.     pc = qs->parent;
  1249.     qc = ngx_quic_get_connection(pc);

  1250.     if (last <= qs->recv_last) {
  1251.         return NGX_OK;
  1252.     }

  1253.     len = last - qs->recv_last;

  1254.     ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  1255.                    "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL",
  1256.                    qs->id, last, qs->recv_max_data, qc->streams.recv_last + len,
  1257.                    qc->streams.recv_max_data);

  1258.     qs->recv_last += len;

  1259.     if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
  1260.         && qs->recv_last > qs->recv_max_data)
  1261.     {
  1262.         qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
  1263.         return NGX_ERROR;
  1264.     }

  1265.     qc->streams.recv_last += len;

  1266.     if (qc->streams.recv_last > qc->streams.recv_max_data) {
  1267.         qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
  1268.         return NGX_ERROR;
  1269.     }

  1270.     return NGX_OK;
  1271. }


  1272. static ngx_int_t
  1273. ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last)
  1274. {
  1275.     uint64_t                len;
  1276.     ngx_connection_t       *pc;
  1277.     ngx_quic_connection_t  *qc;

  1278.     pc = qs->parent;
  1279.     qc = ngx_quic_get_connection(pc);

  1280.     if (last <= qs->recv_offset) {
  1281.         return NGX_OK;
  1282.     }

  1283.     len = last - qs->recv_offset;

  1284.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  1285.                    "quic stream id:0x%xL flow update %uL", qs->id, last);

  1286.     qs->recv_offset += len;

  1287.     if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
  1288.         if (ngx_quic_update_max_stream_data(qs) != NGX_OK) {
  1289.             return NGX_ERROR;
  1290.         }
  1291.     }

  1292.     qc->streams.recv_offset += len;

  1293.     if (qc->streams.recv_max_data
  1294.         <= qc->streams.recv_offset + qc->streams.recv_window / 2)
  1295.     {
  1296.         if (ngx_quic_update_max_data(pc) != NGX_OK) {
  1297.             return NGX_ERROR;
  1298.         }
  1299.     }

  1300.     return NGX_OK;
  1301. }


  1302. static ngx_int_t
  1303. ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs)
  1304. {
  1305.     uint64_t                recv_max_data;
  1306.     ngx_connection_t       *pc;
  1307.     ngx_quic_frame_t       *frame;
  1308.     ngx_quic_connection_t  *qc;

  1309.     pc = qs->parent;
  1310.     qc = ngx_quic_get_connection(pc);

  1311.     if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV) {
  1312.         return NGX_OK;
  1313.     }

  1314.     recv_max_data = qs->recv_offset + qs->recv_window;

  1315.     if (qs->recv_max_data == recv_max_data) {
  1316.         return NGX_OK;
  1317.     }

  1318.     qs->recv_max_data = recv_max_data;

  1319.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
  1320.                    "quic stream id:0x%xL flow update msd:%uL",
  1321.                    qs->id, qs->recv_max_data);

  1322.     frame = ngx_quic_alloc_frame(pc);
  1323.     if (frame == NULL) {
  1324.         return NGX_ERROR;
  1325.     }

  1326.     frame->level = NGX_QUIC_ENCRYPTION_APPLICATION;
  1327.     frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
  1328.     frame->u.max_stream_data.id = qs->id;
  1329.     frame->u.max_stream_data.limit = qs->recv_max_data;

  1330.     ngx_quic_queue_frame(qc, frame);

  1331.     return NGX_OK;
  1332. }


  1333. static ngx_int_t
  1334. ngx_quic_update_max_data(ngx_connection_t *c)
  1335. {
  1336.     uint64_t                recv_max_data;
  1337.     ngx_quic_frame_t       *frame;
  1338.     ngx_quic_connection_t  *qc;

  1339.     qc = ngx_quic_get_connection(c);

  1340.     recv_max_data = qc->streams.recv_offset + qc->streams.recv_window;

  1341.     if (qc->streams.recv_max_data == recv_max_data) {
  1342.         return NGX_OK;
  1343.     }

  1344.     qc->streams.recv_max_data = recv_max_data;

  1345.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
  1346.                    "quic flow update md:%uL", qc->streams.recv_max_data);

  1347.     frame = ngx_quic_alloc_frame(c);
  1348.     if (frame == NULL) {
  1349.         return NGX_ERROR;
  1350.     }

  1351.     frame->level = NGX_QUIC_ENCRYPTION_APPLICATION;
  1352.     frame->type = NGX_QUIC_FT_MAX_DATA;
  1353.     frame->u.max_data.max_data = qc->streams.recv_max_data;

  1354.     ngx_quic_queue_frame(qc, frame);

  1355.     return NGX_OK;
  1356. }


  1357. static void
  1358. ngx_quic_set_event(ngx_event_t *ev)
  1359. {
  1360.     ev->ready = 1;

  1361.     if (ev->active) {
  1362.         ngx_post_event(ev, &ngx_posted_events);
  1363.     }
  1364. }