[PATCH] QUIC: stream lingering

Vladimir Homutov vl at nginx.com
Tue Feb 8 11:45:19 UTC 2022


On Mon, Feb 07, 2022 at 05:16:17PM +0300, Roman Arutyunyan wrote:
> Hi,
>
> On Fri, Feb 04, 2022 at 04:56:23PM +0300, Vladimir Homutov wrote:
> > On Tue, Feb 01, 2022 at 04:39:59PM +0300, Roman Arutyunyan wrote:
> > > # HG changeset patch
> > > # User Roman Arutyunyan <arut at nginx.com>
> > > # Date 1643722727 -10800
> > > #      Tue Feb 01 16:38:47 2022 +0300
> > > # Branch quic
> > > # Node ID db31ae16c1f2050be9c9f6b1f117ab6725b97dd4
> > > # Parent  308ac307b3e6952ef0c5ccf10cc82904c59fa4c3
> > > QUIC: stream lingering.
> > >
> > > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
> > > can persist after connection is closed by application.  During this period,
> > > server is expecting stream final size from client for correct flow control.
> > > Also, buffered output is sent to client as more flow control credit is granted.
> > >
> > [..]
> >
> > > +static ngx_int_t
> > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > > +{
> > > +    size_t                  limit, len;
> > > +    ngx_uint_t              last;
> > > +    ngx_chain_t            *out, *cl;
> > > +    ngx_quic_frame_t       *frame;
> > > +    ngx_connection_t       *pc;
> > > +    ngx_quic_connection_t  *qc;
> > > +
> > > +    if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
> > > +        return NGX_OK;
> > > +    }
> > > +
> > > +    pc = qs->parent;
> > > +    qc = ngx_quic_get_connection(pc);
> > > +
> > > +    limit = ngx_quic_max_stream_flow(qs);
> > > +    last = 0;
> > > +
> > > +    out = ngx_quic_read_chain(pc, &qs->out, limit);
> > > +    if (out == NGX_CHAIN_ERROR) {
> > > +        return NGX_ERROR;
> > > +    }
> > > +
> > > +    len = 0;
> > > +    last = 0;
> >
> > this assignment looks duplicate.
>
> Thanks, fixed.
>
> > [..]
> >
> > > +static ngx_int_t
> > > +ngx_quic_close_stream(ngx_quic_stream_t *qs)
> > > +{
> > >      ngx_connection_t       *pc;
> > >      ngx_quic_frame_t       *frame;
> > > -    ngx_quic_stream_t      *qs;
> > >      ngx_quic_connection_t  *qc;
> > >
> > > -    qs = c->quic;
> > >      pc = qs->parent;
> > >      qc = ngx_quic_get_connection(pc);
> > >
> > > -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > > -                   "quic stream id:0x%xL cleanup", qs->id);
> > > +    if (!qc->closing) {
> > > +        if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
> > > +            || qs->send_state == NGX_QUIC_STREAM_SEND_READY
> > > +            || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
> > > +        {
> >
> > so basically this are the states where we need to wait for FIN?
> > and thus avoid closing till we get it.
> > I would add a comment here.
>
> On the receiving end we wait either for fin or for reset to have final size.
> On the sending end we wait for everything that's buffered to be sent.
> Added a comment about that.
>
> > [..]
> > > +    if (qs->connection == NULL) {
> > > +        return ngx_quic_close_stream(qs);
> > > +    }
> > > +
> > >      ngx_quic_set_event(qs->connection->write);
> >
> > this pattern - check connection, close if NULL and set event seem to
> > repeat. Maybe it's worth to try to put this check/action into
> > ngx_quic_set_event somehow ? we could instead have
> > set_read_event/set_write_event maybe.
>
> I thought about this too, but it's not always that simple.  And even if it was,
> the new function/macro would have unclear semantics.  Let's just remember this
> as a possible future optimiation.
>
> > > +static ngx_int_t
> > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > > +
> > [..]
> > > +    if (len == 0 && !last) {
> > > +        return NGX_OK;
> > > +    }
> > > +
> > > +    frame = ngx_quic_alloc_frame(pc);
> > > +    if (frame == NULL) {
> > > +        return NGX_ERROR;
> > > +    }
> > > +
> > > +    frame = ngx_quic_alloc_frame(pc);
> > > +    if (frame == NULL) {
> > > +        return NGX_ERROR;
> > > +    }
> >
> > one more dup here.
>
> Yes, thanks.
>
> > Overal, it looks good, but the testing revealed another issue: with big
> > buffer sizes we run into issue of too long chains in ngx_quic_write_chain().
> > As discussed, this certainly needs optimization - probably adding some
> > pointer to the end to facilitate appending, or something else.
>
> It's true ngx_quic_write_chain() needs to be optimized.  When the buffered
> chain is big, it takes too much time to find the write point.  I'll address
> this is a separate patch.  Meanwhile, attached is an updated version of the
> current one.
>
> In the new version of the patch I also eliminated the
> ngx_quic_max_stream_flow() function and embedded its content in
> ngx_quic_stream_flush().

yes, this looks correct - flow limit should not consider buffer as it
was before.

I think we should check for limit == 0 before doing read_chain and this
is good place for debug logging about 'hit MAX_DATA/MAX_STREAM_DATA' that
was removed by update.

>
> --
> Roman Arutyunyan

> # HG changeset patch
> # User Roman Arutyunyan <arut at nginx.com>
> # Date 1644054894 -10800
> #      Sat Feb 05 12:54:54 2022 +0300
> # Branch quic
> # Node ID 6e1674c257709341a7508ae4bdab6f7f7d2e9284
> # Parent  6c1dfd072859022f830aeea49db7cbe3c9f7fb55
> QUIC: stream lingering.
>
> Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
> can persist after connection is closed by application.  During this period,
> server is expecting stream final size from client for correct flow control.
> Also, buffered output is sent to client as more flow control credit is granted.
>
> diff --git a/src/event/quic/ngx_event_quic.c b/src/event/quic/ngx_event_quic.c
> --- a/src/event/quic/ngx_event_quic.c
> +++ b/src/event/quic/ngx_event_quic.c
> @@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t
>      ctp->active_connection_id_limit = 2;
>
>      ngx_queue_init(&qc->streams.uninitialized);
> +    ngx_queue_init(&qc->streams.free);
>
>      qc->streams.recv_max_data = qc->tp.initial_max_data;
>      qc->streams.recv_window = qc->streams.recv_max_data;
> diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h
> --- a/src/event/quic/ngx_event_quic.h
> +++ b/src/event/quic/ngx_event_quic.h
> @@ -78,12 +78,14 @@ struct ngx_quic_stream_s {
>      uint64_t                       id;
>      uint64_t                       acked;
>      uint64_t                       send_max_data;
> +    uint64_t                       send_offset;
> +    uint64_t                       send_final_size;
>      uint64_t                       recv_max_data;
>      uint64_t                       recv_offset;
>      uint64_t                       recv_window;
>      uint64_t                       recv_last;
>      uint64_t                       recv_size;
> -    uint64_t                       final_size;
> +    uint64_t                       recv_final_size;
>      ngx_chain_t                   *in;
>      ngx_chain_t                   *out;
>      ngx_uint_t                     cancelable;  /* unsigned  cancelable:1; */
> diff --git a/src/event/quic/ngx_event_quic_connection.h b/src/event/quic/ngx_event_quic_connection.h
> --- a/src/event/quic/ngx_event_quic_connection.h
> +++ b/src/event/quic/ngx_event_quic_connection.h
> @@ -114,13 +114,16 @@ struct ngx_quic_socket_s {
>  typedef struct {
>      ngx_rbtree_t                      tree;
>      ngx_rbtree_node_t                 sentinel;
> +
>      ngx_queue_t                       uninitialized;
> +    ngx_queue_t                       free;
>
>      uint64_t                          sent;
>      uint64_t                          recv_offset;
>      uint64_t                          recv_window;
>      uint64_t                          recv_last;
>      uint64_t                          recv_max_data;
> +    uint64_t                          send_offset;
>      uint64_t                          send_max_data;
>
>      uint64_t                          server_max_streams_uni;
> diff --git a/src/event/quic/ngx_event_quic_frames.c b/src/event/quic/ngx_event_quic_frames.c
> --- a/src/event/quic/ngx_event_quic_frames.c
> +++ b/src/event/quic/ngx_event_quic_frames.c
> @@ -391,6 +391,10 @@ ngx_quic_split_frame(ngx_connection_t *c
>          return NGX_ERROR;
>      }
>
> +    if (f->type == NGX_QUIC_FT_STREAM) {
> +        f->u.stream.fin = 0;
> +    }
> +
>      ngx_queue_insert_after(&f->queue, &nf->queue);
>
>      return NGX_OK;
> diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
> --- a/src/event/quic/ngx_event_quic_streams.c
> +++ b/src/event/quic/ngx_event_quic_streams.c
> @@ -13,6 +13,8 @@
>  #define NGX_QUIC_STREAM_GONE     (void *) -1
>
>
> +static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs,
> +    ngx_uint_t err);
>  static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c);
>  static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c);
>  static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
> @@ -28,11 +30,12 @@ static ssize_t ngx_quic_stream_send(ngx_
>      size_t size);
>  static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
>      ngx_chain_t *in, off_t limit);
> -static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
> +static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs);
>  static void ngx_quic_stream_cleanup_handler(void *data);
> -static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
> -static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
> -static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c);
> +static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs);
> +static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last);
> +static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last);
> +static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs);
>  static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c);
>  static void ngx_quic_set_event(ngx_event_t *ev);
>
> @@ -186,15 +189,20 @@ ngx_quic_close_streams(ngx_connection_t
>      ns = 0;
>  #endif
>
> -    for (node = ngx_rbtree_min(tree->root, tree->sentinel);
> -         node;
> -         node = ngx_rbtree_next(tree, node))
> -    {
> +    node = ngx_rbtree_min(tree->root, tree->sentinel);
> +
> +    while (node) {
>          qs = (ngx_quic_stream_t *) node;
> +        node = ngx_rbtree_next(tree, node);
>
>          qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
>          qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
>
> +        if (qs->connection == NULL) {
> +            ngx_quic_close_stream(qs);
> +            continue;
> +        }
> +
>          ngx_quic_set_event(qs->connection->read);
>          ngx_quic_set_event(qs->connection->write);
>
> @@ -213,13 +221,17 @@ ngx_quic_close_streams(ngx_connection_t
>  ngx_int_t
>  ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
>  {
> +    return ngx_quic_do_reset_stream(c->quic, err);
> +}
> +
> +
> +static ngx_int_t
> +ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err)
> +{
>      ngx_connection_t       *pc;
>      ngx_quic_frame_t       *frame;
> -    ngx_quic_stream_t      *qs;
>      ngx_quic_connection_t  *qc;
>
> -    qs = c->quic;
> -
>      if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
>          || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
>          || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
> @@ -228,10 +240,14 @@ ngx_quic_reset_stream(ngx_connection_t *
>      }
>
>      qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
> +    qs->send_final_size = qs->send_offset;
>
>      pc = qs->parent;
>      qc = ngx_quic_get_connection(pc);
>
> +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> +                   "quic stream id:0x%xL reset", qs->id);
> +
>      frame = ngx_quic_alloc_frame(pc);
>      if (frame == NULL) {
>          return NGX_ERROR;
> @@ -241,10 +257,13 @@ ngx_quic_reset_stream(ngx_connection_t *
>      frame->type = NGX_QUIC_FT_RESET_STREAM;
>      frame->u.reset_stream.id = qs->id;
>      frame->u.reset_stream.error_code = err;
> -    frame->u.reset_stream.final_size = c->sent;
> +    frame->u.reset_stream.final_size = qs->send_offset;
>
>      ngx_quic_queue_frame(qc, frame);
>
> +    ngx_quic_free_chain(pc, qs->out);
> +    qs->out = NULL;
> +
>      return NGX_OK;
>  }
>
> @@ -271,10 +290,7 @@ ngx_quic_shutdown_stream(ngx_connection_
>  static ngx_int_t
>  ngx_quic_shutdown_stream_send(ngx_connection_t *c)
>  {
> -    ngx_connection_t       *pc;
> -    ngx_quic_frame_t       *frame;
> -    ngx_quic_stream_t      *qs;
> -    ngx_quic_connection_t  *qc;
> +    ngx_quic_stream_t  *qs;
>
>      qs = c->quic;
>
> @@ -284,32 +300,13 @@ ngx_quic_shutdown_stream_send(ngx_connec
>          return NGX_OK;
>      }
>
> -    qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
> -
> -    pc = qs->parent;
> -    qc = ngx_quic_get_connection(pc);
> +    qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
> +    qs->send_final_size = c->sent;
>
> -    frame = ngx_quic_alloc_frame(pc);
> -    if (frame == NULL) {
> -        return NGX_ERROR;
> -    }
> -
> -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
>                     "quic stream id:0x%xL send shutdown", qs->id);
>
> -    frame->level = ssl_encryption_application;
> -    frame->type = NGX_QUIC_FT_STREAM;
> -    frame->u.stream.off = 1;
> -    frame->u.stream.len = 1;
> -    frame->u.stream.fin = 1;
> -
> -    frame->u.stream.stream_id = qs->id;
> -    frame->u.stream.offset = c->sent;
> -    frame->u.stream.length = 0;
> -
> -    ngx_quic_queue_frame(qc, frame);
> -
> -    return NGX_OK;
> +    return ngx_quic_stream_flush(qs);
>  }
>
>
> @@ -341,7 +338,7 @@ ngx_quic_shutdown_stream_recv(ngx_connec
>          return NGX_ERROR;
>      }
>
> -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
>                     "quic stream id:0x%xL recv shutdown", qs->id);
>
>      frame->level = ssl_encryption_application;
> @@ -591,6 +588,7 @@ ngx_quic_create_stream(ngx_connection_t
>  {
>      ngx_log_t              *log;
>      ngx_pool_t             *pool;
> +    ngx_queue_t            *q;
>      ngx_connection_t       *sc;
>      ngx_quic_stream_t      *qs;
>      ngx_pool_cleanup_t     *cln;
> @@ -601,25 +599,41 @@ ngx_quic_create_stream(ngx_connection_t
>
>      qc = ngx_quic_get_connection(c);
>
> -    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
> -    if (pool == NULL) {
> -        return NULL;
> +    if (!ngx_queue_empty(&qc->streams.free)) {
> +        q = ngx_queue_head(&qc->streams.free);
> +        qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
> +        ngx_queue_remove(&qs->queue);
> +
> +    } else {
> +        /*
> +         * the number of streams is limited by transport
> +         * parameters and application requirements
> +         */
> +
> +        qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t));
> +        if (qs == NULL) {
> +            return NULL;
> +        }
>      }
>
> -    qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
> -    if (qs == NULL) {
> -        ngx_destroy_pool(pool);
> -        return NULL;
> -    }
> +    ngx_memzero(qs, sizeof(ngx_quic_stream_t));
>
>      qs->node.key = id;
>      qs->parent = c;
>      qs->id = id;
> -    qs->final_size = (uint64_t) -1;
> +    qs->send_final_size = (uint64_t) -1;
> +    qs->recv_final_size = (uint64_t) -1;
> +
> +    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
> +    if (pool == NULL) {
> +        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> +        return NULL;
> +    }
>
>      log = ngx_palloc(pool, sizeof(ngx_log_t));
>      if (log == NULL) {
>          ngx_destroy_pool(pool);
> +        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
>          return NULL;
>      }
>
> @@ -629,6 +643,7 @@ ngx_quic_create_stream(ngx_connection_t
>      sc = ngx_get_connection(c->fd, log);
>      if (sc == NULL) {
>          ngx_destroy_pool(pool);
> +        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
>          return NULL;
>      }
>
> @@ -697,6 +712,7 @@ ngx_quic_create_stream(ngx_connection_t
>      if (cln == NULL) {
>          ngx_close_connection(sc);
>          ngx_destroy_pool(pool);
> +        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
>          return NULL;
>      }
>
> @@ -737,7 +753,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
>          return NGX_ERROR;
>      }
>
> -    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
> +    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
>                     "quic stream id:0x%xL recv buf:%uz", qs->id, size);
>
>      if (size == 0) {
> @@ -763,7 +779,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
>          rev->ready = 0;
>
>          if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD
> -            && qs->recv_offset == qs->final_size)
> +            && qs->recv_offset == qs->recv_final_size)
>          {
>              qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
>          }
> @@ -781,7 +797,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
>      ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
>                     "quic stream id:0x%xL recv len:%z", qs->id, len);
>
> -    if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
> +    if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) {
>          return NGX_ERROR;
>      }
>
> @@ -822,9 +838,7 @@ ngx_quic_stream_send_chain(ngx_connectio
>      off_t                   flow;
>      size_t                  n;
>      ngx_event_t            *wev;
> -    ngx_chain_t            *out;
>      ngx_connection_t       *pc;
> -    ngx_quic_frame_t       *frame;
>      ngx_quic_stream_t      *qs;
>      ngx_quic_connection_t  *qc;
>
> @@ -842,7 +856,8 @@ ngx_quic_stream_send_chain(ngx_connectio
>
>      qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
>
> -    flow = ngx_quic_max_stream_flow(c);
> +    flow = qs->acked + qc->conf->stream_buffer_size - c->sent;
> +
>      if (flow == 0) {
>          wev->ready = 0;
>          return in;
> @@ -852,37 +867,15 @@ ngx_quic_stream_send_chain(ngx_connectio
>          limit = flow;
>      }
>
> -    in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n);
> +    in = ngx_quic_write_chain(pc, &qs->out, in, limit,
> +                              c->sent - qs->send_offset, &n);
>      if (in == NGX_CHAIN_ERROR) {
>          return NGX_CHAIN_ERROR;
>      }
>
> -    out = ngx_quic_read_chain(pc, &qs->out, n);
> -    if (out == NGX_CHAIN_ERROR) {
> -        return NGX_CHAIN_ERROR;
> -    }
> -
> -    frame = ngx_quic_alloc_frame(pc);
> -    if (frame == NULL) {
> -        return NGX_CHAIN_ERROR;
> -    }
> -
> -    frame->level = ssl_encryption_application;
> -    frame->type = NGX_QUIC_FT_STREAM;
> -    frame->data = out;
> -    frame->u.stream.off = 1;
> -    frame->u.stream.len = 1;
> -    frame->u.stream.fin = 0;
> -
> -    frame->u.stream.stream_id = qs->id;
> -    frame->u.stream.offset = c->sent;
> -    frame->u.stream.length = n;
> -
>      c->sent += n;
>      qc->streams.sent += n;
>
> -    ngx_quic_queue_frame(qc, frame);
> -
>      if (in) {
>          wev->ready = 0;
>      }
> @@ -890,61 +883,96 @@ ngx_quic_stream_send_chain(ngx_connectio
>      ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
>                     "quic send_chain sent:%uz", n);
>
> +    if (ngx_quic_stream_flush(qs) != NGX_OK) {
> +        return NGX_CHAIN_ERROR;
> +    }
> +
>      return in;
>  }
>
>
> -static size_t
> -ngx_quic_max_stream_flow(ngx_connection_t *c)
> +static ngx_int_t
> +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
>  {
> -    size_t                  size;
> -    uint64_t                sent, unacked;
> -    ngx_quic_stream_t      *qs;
> +    off_t                   limit;
> +    size_t                  len;
> +    ngx_uint_t              last;
> +    ngx_chain_t            *out, *cl;
> +    ngx_quic_frame_t       *frame;
> +    ngx_connection_t       *pc;
>      ngx_quic_connection_t  *qc;
>
> -    qs = c->quic;
> -    qc = ngx_quic_get_connection(qs->parent);
> +    if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
> +        return NGX_OK;
> +    }
>
> -    size = qc->conf->stream_buffer_size;
> -    sent = c->sent;
> -    unacked = sent - qs->acked;
> +    pc = qs->parent;
> +    qc = ngx_quic_get_connection(pc);
>
>      if (qc->streams.send_max_data == 0) {
>          qc->streams.send_max_data = qc->ctp.initial_max_data;
>      }
>
> -    if (unacked >= size) {
> -        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> -                       "quic send flow hit buffer size");
> -        return 0;
> +    limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset,
> +                    qs->send_max_data - qs->send_offset);
> +
> +    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> +                   "quic stream id:0x%xL flush limit:%O", qs->id, limit);
> +
> +    out = ngx_quic_read_chain(pc, &qs->out, limit);
> +    if (out == NGX_CHAIN_ERROR) {
> +        return NGX_ERROR;
>      }
>
> -    size -= unacked;
> +    len = 0;
> +    last = 0;
> +
> +    for (cl = out; cl; cl = cl->next) {
> +        len += cl->buf->last - cl->buf->pos;
> +    }
>
> -    if (qc->streams.sent >= qc->streams.send_max_data) {
> -        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> -                       "quic send flow hit MAX_DATA");
> -        return 0;
> +    if (qs->send_final_size != (uint64_t) -1
> +        && qs->send_final_size == qs->send_offset + len)
> +    {
> +        qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
> +        last = 1;
> +    }
> +
> +    if (len == 0 && !last) {
> +        return NGX_OK;
>      }
>
> -    if (qc->streams.sent + size > qc->streams.send_max_data) {
> -        size = qc->streams.send_max_data - qc->streams.sent;
> +    frame = ngx_quic_alloc_frame(pc);
> +    if (frame == NULL) {
> +        return NGX_ERROR;
>      }
>
> -    if (sent >= qs->send_max_data) {
> -        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> -                       "quic send flow hit MAX_STREAM_DATA");
> -        return 0;
> +    frame->level = ssl_encryption_application;
> +    frame->type = NGX_QUIC_FT_STREAM;
> +    frame->data = out;
> +
> +    frame->u.stream.off = 1;
> +    frame->u.stream.len = 1;
> +    frame->u.stream.fin = last;
> +
> +    frame->u.stream.stream_id = qs->id;
> +    frame->u.stream.offset = qs->send_offset;
> +    frame->u.stream.length = len;
> +
> +    ngx_quic_queue_frame(qc, frame);
> +
> +    qs->send_offset += len;
> +    qc->streams.send_offset += len;
> +
> +    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> +                   "quic stream id:0x%xL flush len:%uz last:%ui",
> +                   qs->id, len, last);
> +
> +    if (qs->connection == NULL) {
> +        return ngx_quic_close_stream(qs);
>      }
>
> -    if (sent + size > qs->send_max_data) {
> -        size = qs->send_max_data - sent;
> -    }
> -
> -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> -                   "quic send flow:%uz", size);
> -
> -    return size;
> +    return NGX_OK;
>  }
>
>
> @@ -953,40 +981,67 @@ ngx_quic_stream_cleanup_handler(void *da
>  {
>      ngx_connection_t *c = data;
>
> +    ngx_quic_stream_t  *qs;
> +
> +    qs = c->quic;
> +
> +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
> +                   "quic stream id:0x%xL cleanup", qs->id);
> +
> +    if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) {
> +        ngx_quic_close_connection(c, NGX_ERROR);
> +        return;
> +    }
> +
> +    qs->connection = NULL;
> +
> +    if (ngx_quic_close_stream(qs) != NGX_OK) {
> +        ngx_quic_close_connection(c, NGX_ERROR);
> +        return;
> +    }
> +}
> +
> +
> +static ngx_int_t
> +ngx_quic_close_stream(ngx_quic_stream_t *qs)
> +{
>      ngx_connection_t       *pc;
>      ngx_quic_frame_t       *frame;
> -    ngx_quic_stream_t      *qs;
>      ngx_quic_connection_t  *qc;
>
> -    qs = c->quic;
>      pc = qs->parent;
>      qc = ngx_quic_get_connection(pc);
>
> -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> -                   "quic stream id:0x%xL cleanup", qs->id);
> +    if (!qc->closing) {
> +        /* make sure everything is sent and final size is received */
> +
> +        if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
> +            || qs->send_state == NGX_QUIC_STREAM_SEND_READY
> +            || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
> +        {
> +            return NGX_OK;
> +        }
> +    }
> +
> +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> +                   "quic stream id:0x%xL close", qs->id);
> +
> +    ngx_quic_free_chain(pc, qs->in);
> +    ngx_quic_free_chain(pc, qs->out);
>
>      ngx_rbtree_delete(&qc->streams.tree, &qs->node);
> -    ngx_quic_free_chain(pc, qs->in);
> -    ngx_quic_free_chain(pc, qs->out);
> +    ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
>
>      if (qc->closing) {
>          /* schedule handler call to continue ngx_quic_close_connection() */
>          ngx_post_event(pc->read, &ngx_posted_events);
> -        return;
> +        return NGX_OK;
>      }
>
> -    if (qc->error) {
> -        goto done;
> -    }
> -
> -    (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN);
> -
> -    (void) ngx_quic_update_flow(c, qs->recv_last);
> -
>      if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
>          frame = ngx_quic_alloc_frame(pc);
>          if (frame == NULL) {
> -            goto done;
> +            return NGX_ERROR;
>          }
>
>          frame->level = ssl_encryption_application;
> @@ -1004,13 +1059,11 @@ ngx_quic_stream_cleanup_handler(void *da
>          ngx_quic_queue_frame(qc, frame);
>      }
>
> -done:
> -
> -    (void) ngx_quic_output(pc);
> -
>      if (qc->shutdown) {
>          ngx_post_event(pc->read, &ngx_posted_events);
>      }
> +
> +    return NGX_OK;
>  }
>
>
> @@ -1020,7 +1073,6 @@ ngx_quic_handle_stream_frame(ngx_connect
>  {
>      size_t                    size;
>      uint64_t                  last;
> -    ngx_connection_t         *sc;
>      ngx_quic_stream_t        *qs;
>      ngx_quic_connection_t    *qc;
>      ngx_quic_stream_frame_t  *f;
> @@ -1048,19 +1100,17 @@ ngx_quic_handle_stream_frame(ngx_connect
>          return NGX_OK;
>      }
>
> -    sc = qs->connection;
> -
>      if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
>          && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
>      {
>          return NGX_OK;
>      }
>
> -    if (ngx_quic_control_flow(sc, last) != NGX_OK) {
> +    if (ngx_quic_control_flow(qs, last) != NGX_OK) {
>          return NGX_ERROR;
>      }
>
> -    if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
> +    if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) {
>          qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
>          return NGX_ERROR;
>      }
> @@ -1075,7 +1125,8 @@ ngx_quic_handle_stream_frame(ngx_connect
>      }
>
>      if (f->fin) {
> -        if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
> +        if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last)
> +        {
>              qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
>              return NGX_ERROR;
>          }
> @@ -1085,7 +1136,7 @@ ngx_quic_handle_stream_frame(ngx_connect
>              return NGX_ERROR;
>          }
>
> -        qs->final_size = last;
> +        qs->recv_final_size = last;
>          qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
>      }
>
> @@ -1099,13 +1150,17 @@ ngx_quic_handle_stream_frame(ngx_connect
>      qs->recv_size += size;
>
>      if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
> -        && qs->recv_size == qs->final_size)
> +        && qs->recv_size == qs->recv_final_size)
>      {
>          qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD;
>      }
>
> +    if (qs->connection == NULL) {
> +        return ngx_quic_close_stream(qs);
> +    }
> +
>      if (f->offset == qs->recv_offset) {
> -        ngx_quic_set_event(sc->read);
> +        ngx_quic_set_event(qs->connection->read);
>      }
>
>      return NGX_OK;
> @@ -1128,20 +1183,26 @@ ngx_quic_handle_max_data_frame(ngx_conne
>          return NGX_OK;
>      }
>
> -    if (tree->root != tree->sentinel
> -        && qc->streams.sent >= qc->streams.send_max_data)
> +    if (tree->root == tree->sentinel
> +        || qc->streams.send_offset < qc->streams.send_max_data)
>      {
> -
> -        for (node = ngx_rbtree_min(tree->root, tree->sentinel);
> -             node;
> -             node = ngx_rbtree_next(tree, node))
> -        {
> -            qs = (ngx_quic_stream_t *) node;
> -            ngx_quic_set_event(qs->connection->write);
> -        }
> +        /* not blocked on MAX_DATA */
> +        qc->streams.send_max_data = f->max_data;
> +        return NGX_OK;
>      }
>
>      qc->streams.send_max_data = f->max_data;
> +    node = ngx_rbtree_min(tree->root, tree->sentinel);
> +
> +    while (node && qc->streams.send_offset < qc->streams.send_max_data) {
> +
> +        qs = (ngx_quic_stream_t *) node;
> +        node = ngx_rbtree_next(tree, node);
> +
> +        if (ngx_quic_stream_flush(qs) != NGX_OK) {
> +            return NGX_ERROR;
> +        }
> +    }
>
>      return NGX_OK;
>  }
> @@ -1189,7 +1250,7 @@ ngx_quic_handle_stream_data_blocked_fram
>          return NGX_OK;
>      }
>
> -    return ngx_quic_update_max_stream_data(qs->connection);
> +    return ngx_quic_update_max_stream_data(qs);
>  }
>
>
> @@ -1197,7 +1258,6 @@ ngx_int_t
>  ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
>      ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
>  {
> -    uint64_t                sent;
>      ngx_quic_stream_t      *qs;
>      ngx_quic_connection_t  *qc;
>
> @@ -1224,15 +1284,15 @@ ngx_quic_handle_max_stream_data_frame(ng
>          return NGX_OK;
>      }
>
> -    sent = qs->connection->sent;
> -
> -    if (sent >= qs->send_max_data) {
> -        ngx_quic_set_event(qs->connection->write);
> +    if (qs->send_offset < qs->send_max_data) {
> +        /* not blocked on MAX_STREAM_DATA */
> +        qs->send_max_data = f->limit;
> +        return NGX_OK;
>      }
>
>      qs->send_max_data = f->limit;
>
> -    return NGX_OK;
> +    return ngx_quic_stream_flush(qs);
>  }
>
>
> @@ -1240,7 +1300,6 @@ ngx_int_t
>  ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
>      ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
>  {
> -    ngx_connection_t       *sc;
>      ngx_quic_stream_t      *qs;
>      ngx_quic_connection_t  *qc;
>
> @@ -1271,13 +1330,13 @@ ngx_quic_handle_reset_stream_frame(ngx_c
>
>      qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
>
> -    sc = qs->connection;
> -
> -    if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
> +    if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) {
>          return NGX_ERROR;
>      }
>
> -    if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
> +    if (qs->recv_final_size != (uint64_t) -1
> +        && qs->recv_final_size != f->final_size)
> +    {
>          qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
>          return NGX_ERROR;
>      }
> @@ -1287,12 +1346,16 @@ ngx_quic_handle_reset_stream_frame(ngx_c
>          return NGX_ERROR;
>      }
>
> -    qs->final_size = f->final_size;
> +    qs->recv_final_size = f->final_size;
>
> -    if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
> +    if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) {
>          return NGX_ERROR;
>      }
>
> +    if (qs->connection == NULL) {
> +        return ngx_quic_close_stream(qs);
> +    }
> +
>      ngx_quic_set_event(qs->connection->read);
>
>      return NGX_OK;
> @@ -1325,10 +1388,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c
>          return NGX_OK;
>      }
>
> -    if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {
> +    if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) {
>          return NGX_ERROR;
>      }
>
> +    if (qs->connection == NULL) {
> +        return ngx_quic_close_stream(qs);
> +    }
> +
>      ngx_quic_set_event(qs->connection->write);
>
>      return NGX_OK;
> @@ -1378,30 +1445,37 @@ ngx_quic_handle_stream_ack(ngx_connectio
>          return;
>      }
>
> +    if (qs->connection == NULL) {
> +        qs->acked += f->u.stream.length;
> +        return;
> +    }
> +
>      sent = qs->connection->sent;
>      unacked = sent - qs->acked;
> +    qs->acked += f->u.stream.length;
>
> -    if (unacked >= qc->conf->stream_buffer_size) {
> -        ngx_quic_set_event(qs->connection->write);
> +    ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
> +                   "quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL",
> +                   qs->id, f->u.stream.length, qs->acked, sent - qs->acked);
> +
> +    if (unacked != qc->conf->stream_buffer_size) {
> +        /* not blocked on buffer size */
> +        return;
>      }
>
> -    qs->acked += f->u.stream.length;
> -
> -    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
> -                   "quic stream ack len:%uL acked:%uL unacked:%uL",
> -                   f->u.stream.length, qs->acked, sent - qs->acked);
> +    ngx_quic_set_event(qs->connection->write);
>  }
>
>
>  static ngx_int_t
> -ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
> +ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last)
>  {
>      uint64_t                len;
> -    ngx_quic_stream_t      *qs;
> +    ngx_connection_t       *pc;
>      ngx_quic_connection_t  *qc;
>
> -    qs = c->quic;
> -    qc = ngx_quic_get_connection(qs->parent);
> +    pc = qs->parent;
> +    qc = ngx_quic_get_connection(pc);
>
>      if (last <= qs->recv_last) {
>          return NGX_OK;
> @@ -1409,9 +1483,9 @@ ngx_quic_control_flow(ngx_connection_t *
>
>      len = last - qs->recv_last;
>
> -    ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
> -                   "quic flow control msd:%uL/%uL md:%uL/%uL",
> -                   last, qs->recv_max_data, qc->streams.recv_last + len,
> +    ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> +                   "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL",
> +                   qs->id, last, qs->recv_max_data, qc->streams.recv_last + len,
>                     qc->streams.recv_max_data);
>
>      qs->recv_last += len;
> @@ -1435,14 +1509,12 @@ ngx_quic_control_flow(ngx_connection_t *
>
>
>  static ngx_int_t
> -ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
> +ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last)
>  {
>      uint64_t                len;
>      ngx_connection_t       *pc;
> -    ngx_quic_stream_t      *qs;
>      ngx_quic_connection_t  *qc;
>
> -    qs = c->quic;
>      pc = qs->parent;
>      qc = ngx_quic_get_connection(pc);
>
> @@ -1452,13 +1524,13 @@ ngx_quic_update_flow(ngx_connection_t *c
>
>      len = last - qs->recv_offset;
>
> -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> -                   "quic flow update %uL", last);
> +    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> +                   "quic stream id:0x%xL flow update %uL", qs->id, last);
>
>      qs->recv_offset += len;
>
>      if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
> -        if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
> +        if (ngx_quic_update_max_stream_data(qs) != NGX_OK) {
>              return NGX_ERROR;
>          }
>      }
> @@ -1478,15 +1550,13 @@ ngx_quic_update_flow(ngx_connection_t *c
>
>
>  static ngx_int_t
> -ngx_quic_update_max_stream_data(ngx_connection_t *c)
> +ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs)
>  {
>      uint64_t                recv_max_data;
>      ngx_connection_t       *pc;
>      ngx_quic_frame_t       *frame;
> -    ngx_quic_stream_t      *qs;
>      ngx_quic_connection_t  *qc;
>
> -    qs = c->quic;
>      pc = qs->parent;
>      qc = ngx_quic_get_connection(pc);
>
> @@ -1502,8 +1572,9 @@ ngx_quic_update_max_stream_data(ngx_conn
>
>      qs->recv_max_data = recv_max_data;
>
> -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> -                   "quic flow update msd:%uL", qs->recv_max_data);
> +    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> +                   "quic stream id:0x%xL flow update msd:%uL",
> +                   qs->id, qs->recv_max_data);
>
>      frame = ngx_quic_alloc_frame(pc);
>      if (frame == NULL) {
> diff --git a/src/http/v3/ngx_http_v3_uni.c b/src/http/v3/ngx_http_v3_uni.c
> --- a/src/http/v3/ngx_http_v3_uni.c
> +++ b/src/http/v3/ngx_http_v3_uni.c
> @@ -295,8 +295,6 @@ ngx_http_v3_uni_dummy_write_handler(ngx_
>  }
>
>
> -/* XXX async & buffered stream writes */
> -
>  ngx_connection_t *
>  ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id)
>  {

> _______________________________________________
> nginx-devel mailing list -- nginx-devel at nginx.org
> To unsubscribe send an email to nginx-devel-leave at nginx.org



More information about the nginx-devel mailing list