[PATCH] QUIC: stream lingering

Roman Arutyunyan arut at nginx.com
Tue Feb 8 12:18:13 UTC 2022


On Tue, Feb 08, 2022 at 02:45:19PM +0300, Vladimir Homutov wrote:
> On Mon, Feb 07, 2022 at 05:16:17PM +0300, Roman Arutyunyan wrote:
> > Hi,
> >
> > On Fri, Feb 04, 2022 at 04:56:23PM +0300, Vladimir Homutov wrote:
> > > On Tue, Feb 01, 2022 at 04:39:59PM +0300, Roman Arutyunyan wrote:
> > > > # HG changeset patch
> > > > # User Roman Arutyunyan <arut at nginx.com>
> > > > # Date 1643722727 -10800
> > > > #      Tue Feb 01 16:38:47 2022 +0300
> > > > # Branch quic
> > > > # Node ID db31ae16c1f2050be9c9f6b1f117ab6725b97dd4
> > > > # Parent  308ac307b3e6952ef0c5ccf10cc82904c59fa4c3
> > > > QUIC: stream lingering.
> > > >
> > > > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
> > > > can persist after connection is closed by application.  During this period,
> > > > server is expecting stream final size from client for correct flow control.
> > > > Also, buffered output is sent to client as more flow control credit is granted.
> > > >
> > > [..]
> > >
> > > > +static ngx_int_t
> > > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > > > +{
> > > > +    size_t                  limit, len;
> > > > +    ngx_uint_t              last;
> > > > +    ngx_chain_t            *out, *cl;
> > > > +    ngx_quic_frame_t       *frame;
> > > > +    ngx_connection_t       *pc;
> > > > +    ngx_quic_connection_t  *qc;
> > > > +
> > > > +    if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
> > > > +        return NGX_OK;
> > > > +    }
> > > > +
> > > > +    pc = qs->parent;
> > > > +    qc = ngx_quic_get_connection(pc);
> > > > +
> > > > +    limit = ngx_quic_max_stream_flow(qs);
> > > > +    last = 0;
> > > > +
> > > > +    out = ngx_quic_read_chain(pc, &qs->out, limit);
> > > > +    if (out == NGX_CHAIN_ERROR) {
> > > > +        return NGX_ERROR;
> > > > +    }
> > > > +
> > > > +    len = 0;
> > > > +    last = 0;
> > >
> > > this assignment looks duplicate.
> >
> > Thanks, fixed.
> >
> > > [..]
> > >
> > > > +static ngx_int_t
> > > > +ngx_quic_close_stream(ngx_quic_stream_t *qs)
> > > > +{
> > > >      ngx_connection_t       *pc;
> > > >      ngx_quic_frame_t       *frame;
> > > > -    ngx_quic_stream_t      *qs;
> > > >      ngx_quic_connection_t  *qc;
> > > >
> > > > -    qs = c->quic;
> > > >      pc = qs->parent;
> > > >      qc = ngx_quic_get_connection(pc);
> > > >
> > > > -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > > > -                   "quic stream id:0x%xL cleanup", qs->id);
> > > > +    if (!qc->closing) {
> > > > +        if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
> > > > +            || qs->send_state == NGX_QUIC_STREAM_SEND_READY
> > > > +            || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
> > > > +        {
> > >
> > > so basically this are the states where we need to wait for FIN?
> > > and thus avoid closing till we get it.
> > > I would add a comment here.
> >
> > On the receiving end we wait either for fin or for reset to have final size.
> > On the sending end we wait for everything that's buffered to be sent.
> > Added a comment about that.
> >
> > > [..]
> > > > +    if (qs->connection == NULL) {
> > > > +        return ngx_quic_close_stream(qs);
> > > > +    }
> > > > +
> > > >      ngx_quic_set_event(qs->connection->write);
> > >
> > > this pattern - check connection, close if NULL and set event seem to
> > > repeat. Maybe it's worth to try to put this check/action into
> > > ngx_quic_set_event somehow ? we could instead have
> > > set_read_event/set_write_event maybe.
> >
> > I thought about this too, but it's not always that simple.  And even if it was,
> > the new function/macro would have unclear semantics.  Let's just remember this
> > as a possible future optimiation.
> >
> > > > +static ngx_int_t
> > > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > > > +
> > > [..]
> > > > +    if (len == 0 && !last) {
> > > > +        return NGX_OK;
> > > > +    }
> > > > +
> > > > +    frame = ngx_quic_alloc_frame(pc);
> > > > +    if (frame == NULL) {
> > > > +        return NGX_ERROR;
> > > > +    }
> > > > +
> > > > +    frame = ngx_quic_alloc_frame(pc);
> > > > +    if (frame == NULL) {
> > > > +        return NGX_ERROR;
> > > > +    }
> > >
> > > one more dup here.
> >
> > Yes, thanks.
> >
> > > Overal, it looks good, but the testing revealed another issue: with big
> > > buffer sizes we run into issue of too long chains in ngx_quic_write_chain().
> > > As discussed, this certainly needs optimization - probably adding some
> > > pointer to the end to facilitate appending, or something else.
> >
> > It's true ngx_quic_write_chain() needs to be optimized.  When the buffered
> > chain is big, it takes too much time to find the write point.  I'll address
> > this is a separate patch.  Meanwhile, attached is an updated version of the
> > current one.
> >
> > In the new version of the patch I also eliminated the
> > ngx_quic_max_stream_flow() function and embedded its content in
> > ngx_quic_stream_flush().
> 
> yes, this looks correct - flow limit should not consider buffer as it
> was before.
> 
> I think we should check for limit == 0 before doing read_chain and this
> is good place for debug logging about 'hit MAX_DATA/MAX_STREAM_DATA' that
> was removed by update.

I don't know how much do we really need those messages.  What really needs to
be added here is sending DATA_BLOCKED/STREAM_DATA_BLOCKED, for which I
already have a separate patch.  That patch also adds some logging.
Once we finish with optimization, I'll send it out.

Apart from logging, checking limit == 0 does not seem to make sense, because
even if the limit is zero, we should still proceed, since we are still able to
send fin.

> > --
> > Roman Arutyunyan
> 
> > # HG changeset patch
> > # User Roman Arutyunyan <arut at nginx.com>
> > # Date 1644054894 -10800
> > #      Sat Feb 05 12:54:54 2022 +0300
> > # Branch quic
> > # Node ID 6e1674c257709341a7508ae4bdab6f7f7d2e9284
> > # Parent  6c1dfd072859022f830aeea49db7cbe3c9f7fb55
> > QUIC: stream lingering.
> >
> > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
> > can persist after connection is closed by application.  During this period,
> > server is expecting stream final size from client for correct flow control.
> > Also, buffered output is sent to client as more flow control credit is granted.
> >
> > diff --git a/src/event/quic/ngx_event_quic.c b/src/event/quic/ngx_event_quic.c
> > --- a/src/event/quic/ngx_event_quic.c
> > +++ b/src/event/quic/ngx_event_quic.c
> > @@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t
> >      ctp->active_connection_id_limit = 2;
> >
> >      ngx_queue_init(&qc->streams.uninitialized);
> > +    ngx_queue_init(&qc->streams.free);
> >
> >      qc->streams.recv_max_data = qc->tp.initial_max_data;
> >      qc->streams.recv_window = qc->streams.recv_max_data;
> > diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h
> > --- a/src/event/quic/ngx_event_quic.h
> > +++ b/src/event/quic/ngx_event_quic.h
> > @@ -78,12 +78,14 @@ struct ngx_quic_stream_s {
> >      uint64_t                       id;
> >      uint64_t                       acked;
> >      uint64_t                       send_max_data;
> > +    uint64_t                       send_offset;
> > +    uint64_t                       send_final_size;
> >      uint64_t                       recv_max_data;
> >      uint64_t                       recv_offset;
> >      uint64_t                       recv_window;
> >      uint64_t                       recv_last;
> >      uint64_t                       recv_size;
> > -    uint64_t                       final_size;
> > +    uint64_t                       recv_final_size;
> >      ngx_chain_t                   *in;
> >      ngx_chain_t                   *out;
> >      ngx_uint_t                     cancelable;  /* unsigned  cancelable:1; */
> > diff --git a/src/event/quic/ngx_event_quic_connection.h b/src/event/quic/ngx_event_quic_connection.h
> > --- a/src/event/quic/ngx_event_quic_connection.h
> > +++ b/src/event/quic/ngx_event_quic_connection.h
> > @@ -114,13 +114,16 @@ struct ngx_quic_socket_s {
> >  typedef struct {
> >      ngx_rbtree_t                      tree;
> >      ngx_rbtree_node_t                 sentinel;
> > +
> >      ngx_queue_t                       uninitialized;
> > +    ngx_queue_t                       free;
> >
> >      uint64_t                          sent;
> >      uint64_t                          recv_offset;
> >      uint64_t                          recv_window;
> >      uint64_t                          recv_last;
> >      uint64_t                          recv_max_data;
> > +    uint64_t                          send_offset;
> >      uint64_t                          send_max_data;
> >
> >      uint64_t                          server_max_streams_uni;
> > diff --git a/src/event/quic/ngx_event_quic_frames.c b/src/event/quic/ngx_event_quic_frames.c
> > --- a/src/event/quic/ngx_event_quic_frames.c
> > +++ b/src/event/quic/ngx_event_quic_frames.c
> > @@ -391,6 +391,10 @@ ngx_quic_split_frame(ngx_connection_t *c
> >          return NGX_ERROR;
> >      }
> >
> > +    if (f->type == NGX_QUIC_FT_STREAM) {
> > +        f->u.stream.fin = 0;
> > +    }
> > +
> >      ngx_queue_insert_after(&f->queue, &nf->queue);
> >
> >      return NGX_OK;
> > diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
> > --- a/src/event/quic/ngx_event_quic_streams.c
> > +++ b/src/event/quic/ngx_event_quic_streams.c
> > @@ -13,6 +13,8 @@
> >  #define NGX_QUIC_STREAM_GONE     (void *) -1
> >
> >
> > +static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs,
> > +    ngx_uint_t err);
> >  static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c);
> >  static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c);
> >  static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
> > @@ -28,11 +30,12 @@ static ssize_t ngx_quic_stream_send(ngx_
> >      size_t size);
> >  static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
> >      ngx_chain_t *in, off_t limit);
> > -static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
> > +static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs);
> >  static void ngx_quic_stream_cleanup_handler(void *data);
> > -static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
> > -static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
> > -static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c);
> > +static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs);
> > +static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last);
> > +static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last);
> > +static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs);
> >  static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c);
> >  static void ngx_quic_set_event(ngx_event_t *ev);
> >
> > @@ -186,15 +189,20 @@ ngx_quic_close_streams(ngx_connection_t
> >      ns = 0;
> >  #endif
> >
> > -    for (node = ngx_rbtree_min(tree->root, tree->sentinel);
> > -         node;
> > -         node = ngx_rbtree_next(tree, node))
> > -    {
> > +    node = ngx_rbtree_min(tree->root, tree->sentinel);
> > +
> > +    while (node) {
> >          qs = (ngx_quic_stream_t *) node;
> > +        node = ngx_rbtree_next(tree, node);
> >
> >          qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
> >          qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
> >
> > +        if (qs->connection == NULL) {
> > +            ngx_quic_close_stream(qs);
> > +            continue;
> > +        }
> > +
> >          ngx_quic_set_event(qs->connection->read);
> >          ngx_quic_set_event(qs->connection->write);
> >
> > @@ -213,13 +221,17 @@ ngx_quic_close_streams(ngx_connection_t
> >  ngx_int_t
> >  ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
> >  {
> > +    return ngx_quic_do_reset_stream(c->quic, err);
> > +}
> > +
> > +
> > +static ngx_int_t
> > +ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err)
> > +{
> >      ngx_connection_t       *pc;
> >      ngx_quic_frame_t       *frame;
> > -    ngx_quic_stream_t      *qs;
> >      ngx_quic_connection_t  *qc;
> >
> > -    qs = c->quic;
> > -
> >      if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
> >          || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
> >          || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
> > @@ -228,10 +240,14 @@ ngx_quic_reset_stream(ngx_connection_t *
> >      }
> >
> >      qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
> > +    qs->send_final_size = qs->send_offset;
> >
> >      pc = qs->parent;
> >      qc = ngx_quic_get_connection(pc);
> >
> > +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > +                   "quic stream id:0x%xL reset", qs->id);
> > +
> >      frame = ngx_quic_alloc_frame(pc);
> >      if (frame == NULL) {
> >          return NGX_ERROR;
> > @@ -241,10 +257,13 @@ ngx_quic_reset_stream(ngx_connection_t *
> >      frame->type = NGX_QUIC_FT_RESET_STREAM;
> >      frame->u.reset_stream.id = qs->id;
> >      frame->u.reset_stream.error_code = err;
> > -    frame->u.reset_stream.final_size = c->sent;
> > +    frame->u.reset_stream.final_size = qs->send_offset;
> >
> >      ngx_quic_queue_frame(qc, frame);
> >
> > +    ngx_quic_free_chain(pc, qs->out);
> > +    qs->out = NULL;
> > +
> >      return NGX_OK;
> >  }
> >
> > @@ -271,10 +290,7 @@ ngx_quic_shutdown_stream(ngx_connection_
> >  static ngx_int_t
> >  ngx_quic_shutdown_stream_send(ngx_connection_t *c)
> >  {
> > -    ngx_connection_t       *pc;
> > -    ngx_quic_frame_t       *frame;
> > -    ngx_quic_stream_t      *qs;
> > -    ngx_quic_connection_t  *qc;
> > +    ngx_quic_stream_t  *qs;
> >
> >      qs = c->quic;
> >
> > @@ -284,32 +300,13 @@ ngx_quic_shutdown_stream_send(ngx_connec
> >          return NGX_OK;
> >      }
> >
> > -    qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
> > -
> > -    pc = qs->parent;
> > -    qc = ngx_quic_get_connection(pc);
> > +    qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
> > +    qs->send_final_size = c->sent;
> >
> > -    frame = ngx_quic_alloc_frame(pc);
> > -    if (frame == NULL) {
> > -        return NGX_ERROR;
> > -    }
> > -
> > -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
> >                     "quic stream id:0x%xL send shutdown", qs->id);
> >
> > -    frame->level = ssl_encryption_application;
> > -    frame->type = NGX_QUIC_FT_STREAM;
> > -    frame->u.stream.off = 1;
> > -    frame->u.stream.len = 1;
> > -    frame->u.stream.fin = 1;
> > -
> > -    frame->u.stream.stream_id = qs->id;
> > -    frame->u.stream.offset = c->sent;
> > -    frame->u.stream.length = 0;
> > -
> > -    ngx_quic_queue_frame(qc, frame);
> > -
> > -    return NGX_OK;
> > +    return ngx_quic_stream_flush(qs);
> >  }
> >
> >
> > @@ -341,7 +338,7 @@ ngx_quic_shutdown_stream_recv(ngx_connec
> >          return NGX_ERROR;
> >      }
> >
> > -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> >                     "quic stream id:0x%xL recv shutdown", qs->id);
> >
> >      frame->level = ssl_encryption_application;
> > @@ -591,6 +588,7 @@ ngx_quic_create_stream(ngx_connection_t
> >  {
> >      ngx_log_t              *log;
> >      ngx_pool_t             *pool;
> > +    ngx_queue_t            *q;
> >      ngx_connection_t       *sc;
> >      ngx_quic_stream_t      *qs;
> >      ngx_pool_cleanup_t     *cln;
> > @@ -601,25 +599,41 @@ ngx_quic_create_stream(ngx_connection_t
> >
> >      qc = ngx_quic_get_connection(c);
> >
> > -    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
> > -    if (pool == NULL) {
> > -        return NULL;
> > +    if (!ngx_queue_empty(&qc->streams.free)) {
> > +        q = ngx_queue_head(&qc->streams.free);
> > +        qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
> > +        ngx_queue_remove(&qs->queue);
> > +
> > +    } else {
> > +        /*
> > +         * the number of streams is limited by transport
> > +         * parameters and application requirements
> > +         */
> > +
> > +        qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t));
> > +        if (qs == NULL) {
> > +            return NULL;
> > +        }
> >      }
> >
> > -    qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
> > -    if (qs == NULL) {
> > -        ngx_destroy_pool(pool);
> > -        return NULL;
> > -    }
> > +    ngx_memzero(qs, sizeof(ngx_quic_stream_t));
> >
> >      qs->node.key = id;
> >      qs->parent = c;
> >      qs->id = id;
> > -    qs->final_size = (uint64_t) -1;
> > +    qs->send_final_size = (uint64_t) -1;
> > +    qs->recv_final_size = (uint64_t) -1;
> > +
> > +    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
> > +    if (pool == NULL) {
> > +        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> > +        return NULL;
> > +    }
> >
> >      log = ngx_palloc(pool, sizeof(ngx_log_t));
> >      if (log == NULL) {
> >          ngx_destroy_pool(pool);
> > +        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> >          return NULL;
> >      }
> >
> > @@ -629,6 +643,7 @@ ngx_quic_create_stream(ngx_connection_t
> >      sc = ngx_get_connection(c->fd, log);
> >      if (sc == NULL) {
> >          ngx_destroy_pool(pool);
> > +        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> >          return NULL;
> >      }
> >
> > @@ -697,6 +712,7 @@ ngx_quic_create_stream(ngx_connection_t
> >      if (cln == NULL) {
> >          ngx_close_connection(sc);
> >          ngx_destroy_pool(pool);
> > +        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> >          return NULL;
> >      }
> >
> > @@ -737,7 +753,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
> >          return NGX_ERROR;
> >      }
> >
> > -    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > +    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> >                     "quic stream id:0x%xL recv buf:%uz", qs->id, size);
> >
> >      if (size == 0) {
> > @@ -763,7 +779,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
> >          rev->ready = 0;
> >
> >          if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD
> > -            && qs->recv_offset == qs->final_size)
> > +            && qs->recv_offset == qs->recv_final_size)
> >          {
> >              qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
> >          }
> > @@ -781,7 +797,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
> >      ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
> >                     "quic stream id:0x%xL recv len:%z", qs->id, len);
> >
> > -    if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
> > +    if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) {
> >          return NGX_ERROR;
> >      }
> >
> > @@ -822,9 +838,7 @@ ngx_quic_stream_send_chain(ngx_connectio
> >      off_t                   flow;
> >      size_t                  n;
> >      ngx_event_t            *wev;
> > -    ngx_chain_t            *out;
> >      ngx_connection_t       *pc;
> > -    ngx_quic_frame_t       *frame;
> >      ngx_quic_stream_t      *qs;
> >      ngx_quic_connection_t  *qc;
> >
> > @@ -842,7 +856,8 @@ ngx_quic_stream_send_chain(ngx_connectio
> >
> >      qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
> >
> > -    flow = ngx_quic_max_stream_flow(c);
> > +    flow = qs->acked + qc->conf->stream_buffer_size - c->sent;
> > +
> >      if (flow == 0) {
> >          wev->ready = 0;
> >          return in;
> > @@ -852,37 +867,15 @@ ngx_quic_stream_send_chain(ngx_connectio
> >          limit = flow;
> >      }
> >
> > -    in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n);
> > +    in = ngx_quic_write_chain(pc, &qs->out, in, limit,
> > +                              c->sent - qs->send_offset, &n);
> >      if (in == NGX_CHAIN_ERROR) {
> >          return NGX_CHAIN_ERROR;
> >      }
> >
> > -    out = ngx_quic_read_chain(pc, &qs->out, n);
> > -    if (out == NGX_CHAIN_ERROR) {
> > -        return NGX_CHAIN_ERROR;
> > -    }
> > -
> > -    frame = ngx_quic_alloc_frame(pc);
> > -    if (frame == NULL) {
> > -        return NGX_CHAIN_ERROR;
> > -    }
> > -
> > -    frame->level = ssl_encryption_application;
> > -    frame->type = NGX_QUIC_FT_STREAM;
> > -    frame->data = out;
> > -    frame->u.stream.off = 1;
> > -    frame->u.stream.len = 1;
> > -    frame->u.stream.fin = 0;
> > -
> > -    frame->u.stream.stream_id = qs->id;
> > -    frame->u.stream.offset = c->sent;
> > -    frame->u.stream.length = n;
> > -
> >      c->sent += n;
> >      qc->streams.sent += n;
> >
> > -    ngx_quic_queue_frame(qc, frame);
> > -
> >      if (in) {
> >          wev->ready = 0;
> >      }
> > @@ -890,61 +883,96 @@ ngx_quic_stream_send_chain(ngx_connectio
> >      ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> >                     "quic send_chain sent:%uz", n);
> >
> > +    if (ngx_quic_stream_flush(qs) != NGX_OK) {
> > +        return NGX_CHAIN_ERROR;
> > +    }
> > +
> >      return in;
> >  }
> >
> >
> > -static size_t
> > -ngx_quic_max_stream_flow(ngx_connection_t *c)
> > +static ngx_int_t
> > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> >  {
> > -    size_t                  size;
> > -    uint64_t                sent, unacked;
> > -    ngx_quic_stream_t      *qs;
> > +    off_t                   limit;
> > +    size_t                  len;
> > +    ngx_uint_t              last;
> > +    ngx_chain_t            *out, *cl;
> > +    ngx_quic_frame_t       *frame;
> > +    ngx_connection_t       *pc;
> >      ngx_quic_connection_t  *qc;
> >
> > -    qs = c->quic;
> > -    qc = ngx_quic_get_connection(qs->parent);
> > +    if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
> > +        return NGX_OK;
> > +    }
> >
> > -    size = qc->conf->stream_buffer_size;
> > -    sent = c->sent;
> > -    unacked = sent - qs->acked;
> > +    pc = qs->parent;
> > +    qc = ngx_quic_get_connection(pc);
> >
> >      if (qc->streams.send_max_data == 0) {
> >          qc->streams.send_max_data = qc->ctp.initial_max_data;
> >      }
> >
> > -    if (unacked >= size) {
> > -        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > -                       "quic send flow hit buffer size");
> > -        return 0;
> > +    limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset,
> > +                    qs->send_max_data - qs->send_offset);
> > +
> > +    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > +                   "quic stream id:0x%xL flush limit:%O", qs->id, limit);
> > +
> > +    out = ngx_quic_read_chain(pc, &qs->out, limit);
> > +    if (out == NGX_CHAIN_ERROR) {
> > +        return NGX_ERROR;
> >      }
> >
> > -    size -= unacked;
> > +    len = 0;
> > +    last = 0;
> > +
> > +    for (cl = out; cl; cl = cl->next) {
> > +        len += cl->buf->last - cl->buf->pos;
> > +    }
> >
> > -    if (qc->streams.sent >= qc->streams.send_max_data) {
> > -        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > -                       "quic send flow hit MAX_DATA");
> > -        return 0;
> > +    if (qs->send_final_size != (uint64_t) -1
> > +        && qs->send_final_size == qs->send_offset + len)
> > +    {
> > +        qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
> > +        last = 1;
> > +    }
> > +
> > +    if (len == 0 && !last) {
> > +        return NGX_OK;
> >      }
> >
> > -    if (qc->streams.sent + size > qc->streams.send_max_data) {
> > -        size = qc->streams.send_max_data - qc->streams.sent;
> > +    frame = ngx_quic_alloc_frame(pc);
> > +    if (frame == NULL) {
> > +        return NGX_ERROR;
> >      }
> >
> > -    if (sent >= qs->send_max_data) {
> > -        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > -                       "quic send flow hit MAX_STREAM_DATA");
> > -        return 0;
> > +    frame->level = ssl_encryption_application;
> > +    frame->type = NGX_QUIC_FT_STREAM;
> > +    frame->data = out;
> > +
> > +    frame->u.stream.off = 1;
> > +    frame->u.stream.len = 1;
> > +    frame->u.stream.fin = last;
> > +
> > +    frame->u.stream.stream_id = qs->id;
> > +    frame->u.stream.offset = qs->send_offset;
> > +    frame->u.stream.length = len;
> > +
> > +    ngx_quic_queue_frame(qc, frame);
> > +
> > +    qs->send_offset += len;
> > +    qc->streams.send_offset += len;
> > +
> > +    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > +                   "quic stream id:0x%xL flush len:%uz last:%ui",
> > +                   qs->id, len, last);
> > +
> > +    if (qs->connection == NULL) {
> > +        return ngx_quic_close_stream(qs);
> >      }
> >
> > -    if (sent + size > qs->send_max_data) {
> > -        size = qs->send_max_data - sent;
> > -    }
> > -
> > -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > -                   "quic send flow:%uz", size);
> > -
> > -    return size;
> > +    return NGX_OK;
> >  }
> >
> >
> > @@ -953,40 +981,67 @@ ngx_quic_stream_cleanup_handler(void *da
> >  {
> >      ngx_connection_t *c = data;
> >
> > +    ngx_quic_stream_t  *qs;
> > +
> > +    qs = c->quic;
> > +
> > +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
> > +                   "quic stream id:0x%xL cleanup", qs->id);
> > +
> > +    if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) {
> > +        ngx_quic_close_connection(c, NGX_ERROR);
> > +        return;
> > +    }
> > +
> > +    qs->connection = NULL;
> > +
> > +    if (ngx_quic_close_stream(qs) != NGX_OK) {
> > +        ngx_quic_close_connection(c, NGX_ERROR);
> > +        return;
> > +    }
> > +}
> > +
> > +
> > +static ngx_int_t
> > +ngx_quic_close_stream(ngx_quic_stream_t *qs)
> > +{
> >      ngx_connection_t       *pc;
> >      ngx_quic_frame_t       *frame;
> > -    ngx_quic_stream_t      *qs;
> >      ngx_quic_connection_t  *qc;
> >
> > -    qs = c->quic;
> >      pc = qs->parent;
> >      qc = ngx_quic_get_connection(pc);
> >
> > -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > -                   "quic stream id:0x%xL cleanup", qs->id);
> > +    if (!qc->closing) {
> > +        /* make sure everything is sent and final size is received */
> > +
> > +        if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
> > +            || qs->send_state == NGX_QUIC_STREAM_SEND_READY
> > +            || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
> > +        {
> > +            return NGX_OK;
> > +        }
> > +    }
> > +
> > +    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > +                   "quic stream id:0x%xL close", qs->id);
> > +
> > +    ngx_quic_free_chain(pc, qs->in);
> > +    ngx_quic_free_chain(pc, qs->out);
> >
> >      ngx_rbtree_delete(&qc->streams.tree, &qs->node);
> > -    ngx_quic_free_chain(pc, qs->in);
> > -    ngx_quic_free_chain(pc, qs->out);
> > +    ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> >
> >      if (qc->closing) {
> >          /* schedule handler call to continue ngx_quic_close_connection() */
> >          ngx_post_event(pc->read, &ngx_posted_events);
> > -        return;
> > +        return NGX_OK;
> >      }
> >
> > -    if (qc->error) {
> > -        goto done;
> > -    }
> > -
> > -    (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN);
> > -
> > -    (void) ngx_quic_update_flow(c, qs->recv_last);
> > -
> >      if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
> >          frame = ngx_quic_alloc_frame(pc);
> >          if (frame == NULL) {
> > -            goto done;
> > +            return NGX_ERROR;
> >          }
> >
> >          frame->level = ssl_encryption_application;
> > @@ -1004,13 +1059,11 @@ ngx_quic_stream_cleanup_handler(void *da
> >          ngx_quic_queue_frame(qc, frame);
> >      }
> >
> > -done:
> > -
> > -    (void) ngx_quic_output(pc);
> > -
> >      if (qc->shutdown) {
> >          ngx_post_event(pc->read, &ngx_posted_events);
> >      }
> > +
> > +    return NGX_OK;
> >  }
> >
> >
> > @@ -1020,7 +1073,6 @@ ngx_quic_handle_stream_frame(ngx_connect
> >  {
> >      size_t                    size;
> >      uint64_t                  last;
> > -    ngx_connection_t         *sc;
> >      ngx_quic_stream_t        *qs;
> >      ngx_quic_connection_t    *qc;
> >      ngx_quic_stream_frame_t  *f;
> > @@ -1048,19 +1100,17 @@ ngx_quic_handle_stream_frame(ngx_connect
> >          return NGX_OK;
> >      }
> >
> > -    sc = qs->connection;
> > -
> >      if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
> >          && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
> >      {
> >          return NGX_OK;
> >      }
> >
> > -    if (ngx_quic_control_flow(sc, last) != NGX_OK) {
> > +    if (ngx_quic_control_flow(qs, last) != NGX_OK) {
> >          return NGX_ERROR;
> >      }
> >
> > -    if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
> > +    if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) {
> >          qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
> >          return NGX_ERROR;
> >      }
> > @@ -1075,7 +1125,8 @@ ngx_quic_handle_stream_frame(ngx_connect
> >      }
> >
> >      if (f->fin) {
> > -        if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
> > +        if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last)
> > +        {
> >              qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
> >              return NGX_ERROR;
> >          }
> > @@ -1085,7 +1136,7 @@ ngx_quic_handle_stream_frame(ngx_connect
> >              return NGX_ERROR;
> >          }
> >
> > -        qs->final_size = last;
> > +        qs->recv_final_size = last;
> >          qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
> >      }
> >
> > @@ -1099,13 +1150,17 @@ ngx_quic_handle_stream_frame(ngx_connect
> >      qs->recv_size += size;
> >
> >      if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
> > -        && qs->recv_size == qs->final_size)
> > +        && qs->recv_size == qs->recv_final_size)
> >      {
> >          qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD;
> >      }
> >
> > +    if (qs->connection == NULL) {
> > +        return ngx_quic_close_stream(qs);
> > +    }
> > +
> >      if (f->offset == qs->recv_offset) {
> > -        ngx_quic_set_event(sc->read);
> > +        ngx_quic_set_event(qs->connection->read);
> >      }
> >
> >      return NGX_OK;
> > @@ -1128,20 +1183,26 @@ ngx_quic_handle_max_data_frame(ngx_conne
> >          return NGX_OK;
> >      }
> >
> > -    if (tree->root != tree->sentinel
> > -        && qc->streams.sent >= qc->streams.send_max_data)
> > +    if (tree->root == tree->sentinel
> > +        || qc->streams.send_offset < qc->streams.send_max_data)
> >      {
> > -
> > -        for (node = ngx_rbtree_min(tree->root, tree->sentinel);
> > -             node;
> > -             node = ngx_rbtree_next(tree, node))
> > -        {
> > -            qs = (ngx_quic_stream_t *) node;
> > -            ngx_quic_set_event(qs->connection->write);
> > -        }
> > +        /* not blocked on MAX_DATA */
> > +        qc->streams.send_max_data = f->max_data;
> > +        return NGX_OK;
> >      }
> >
> >      qc->streams.send_max_data = f->max_data;
> > +    node = ngx_rbtree_min(tree->root, tree->sentinel);
> > +
> > +    while (node && qc->streams.send_offset < qc->streams.send_max_data) {
> > +
> > +        qs = (ngx_quic_stream_t *) node;
> > +        node = ngx_rbtree_next(tree, node);
> > +
> > +        if (ngx_quic_stream_flush(qs) != NGX_OK) {
> > +            return NGX_ERROR;
> > +        }
> > +    }
> >
> >      return NGX_OK;
> >  }
> > @@ -1189,7 +1250,7 @@ ngx_quic_handle_stream_data_blocked_fram
> >          return NGX_OK;
> >      }
> >
> > -    return ngx_quic_update_max_stream_data(qs->connection);
> > +    return ngx_quic_update_max_stream_data(qs);
> >  }
> >
> >
> > @@ -1197,7 +1258,6 @@ ngx_int_t
> >  ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
> >      ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
> >  {
> > -    uint64_t                sent;
> >      ngx_quic_stream_t      *qs;
> >      ngx_quic_connection_t  *qc;
> >
> > @@ -1224,15 +1284,15 @@ ngx_quic_handle_max_stream_data_frame(ng
> >          return NGX_OK;
> >      }
> >
> > -    sent = qs->connection->sent;
> > -
> > -    if (sent >= qs->send_max_data) {
> > -        ngx_quic_set_event(qs->connection->write);
> > +    if (qs->send_offset < qs->send_max_data) {
> > +        /* not blocked on MAX_STREAM_DATA */
> > +        qs->send_max_data = f->limit;
> > +        return NGX_OK;
> >      }
> >
> >      qs->send_max_data = f->limit;
> >
> > -    return NGX_OK;
> > +    return ngx_quic_stream_flush(qs);
> >  }
> >
> >
> > @@ -1240,7 +1300,6 @@ ngx_int_t
> >  ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
> >      ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
> >  {
> > -    ngx_connection_t       *sc;
> >      ngx_quic_stream_t      *qs;
> >      ngx_quic_connection_t  *qc;
> >
> > @@ -1271,13 +1330,13 @@ ngx_quic_handle_reset_stream_frame(ngx_c
> >
> >      qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
> >
> > -    sc = qs->connection;
> > -
> > -    if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
> > +    if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) {
> >          return NGX_ERROR;
> >      }
> >
> > -    if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
> > +    if (qs->recv_final_size != (uint64_t) -1
> > +        && qs->recv_final_size != f->final_size)
> > +    {
> >          qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
> >          return NGX_ERROR;
> >      }
> > @@ -1287,12 +1346,16 @@ ngx_quic_handle_reset_stream_frame(ngx_c
> >          return NGX_ERROR;
> >      }
> >
> > -    qs->final_size = f->final_size;
> > +    qs->recv_final_size = f->final_size;
> >
> > -    if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
> > +    if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) {
> >          return NGX_ERROR;
> >      }
> >
> > +    if (qs->connection == NULL) {
> > +        return ngx_quic_close_stream(qs);
> > +    }
> > +
> >      ngx_quic_set_event(qs->connection->read);
> >
> >      return NGX_OK;
> > @@ -1325,10 +1388,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c
> >          return NGX_OK;
> >      }
> >
> > -    if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {
> > +    if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) {
> >          return NGX_ERROR;
> >      }
> >
> > +    if (qs->connection == NULL) {
> > +        return ngx_quic_close_stream(qs);
> > +    }
> > +
> >      ngx_quic_set_event(qs->connection->write);
> >
> >      return NGX_OK;
> > @@ -1378,30 +1445,37 @@ ngx_quic_handle_stream_ack(ngx_connectio
> >          return;
> >      }
> >
> > +    if (qs->connection == NULL) {
> > +        qs->acked += f->u.stream.length;
> > +        return;
> > +    }
> > +
> >      sent = qs->connection->sent;
> >      unacked = sent - qs->acked;
> > +    qs->acked += f->u.stream.length;
> >
> > -    if (unacked >= qc->conf->stream_buffer_size) {
> > -        ngx_quic_set_event(qs->connection->write);
> > +    ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > +                   "quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL",
> > +                   qs->id, f->u.stream.length, qs->acked, sent - qs->acked);
> > +
> > +    if (unacked != qc->conf->stream_buffer_size) {
> > +        /* not blocked on buffer size */
> > +        return;
> >      }
> >
> > -    qs->acked += f->u.stream.length;
> > -
> > -    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
> > -                   "quic stream ack len:%uL acked:%uL unacked:%uL",
> > -                   f->u.stream.length, qs->acked, sent - qs->acked);
> > +    ngx_quic_set_event(qs->connection->write);
> >  }
> >
> >
> >  static ngx_int_t
> > -ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
> > +ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last)
> >  {
> >      uint64_t                len;
> > -    ngx_quic_stream_t      *qs;
> > +    ngx_connection_t       *pc;
> >      ngx_quic_connection_t  *qc;
> >
> > -    qs = c->quic;
> > -    qc = ngx_quic_get_connection(qs->parent);
> > +    pc = qs->parent;
> > +    qc = ngx_quic_get_connection(pc);
> >
> >      if (last <= qs->recv_last) {
> >          return NGX_OK;
> > @@ -1409,9 +1483,9 @@ ngx_quic_control_flow(ngx_connection_t *
> >
> >      len = last - qs->recv_last;
> >
> > -    ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > -                   "quic flow control msd:%uL/%uL md:%uL/%uL",
> > -                   last, qs->recv_max_data, qc->streams.recv_last + len,
> > +    ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > +                   "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL",
> > +                   qs->id, last, qs->recv_max_data, qc->streams.recv_last + len,
> >                     qc->streams.recv_max_data);
> >
> >      qs->recv_last += len;
> > @@ -1435,14 +1509,12 @@ ngx_quic_control_flow(ngx_connection_t *
> >
> >
> >  static ngx_int_t
> > -ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
> > +ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last)
> >  {
> >      uint64_t                len;
> >      ngx_connection_t       *pc;
> > -    ngx_quic_stream_t      *qs;
> >      ngx_quic_connection_t  *qc;
> >
> > -    qs = c->quic;
> >      pc = qs->parent;
> >      qc = ngx_quic_get_connection(pc);
> >
> > @@ -1452,13 +1524,13 @@ ngx_quic_update_flow(ngx_connection_t *c
> >
> >      len = last - qs->recv_offset;
> >
> > -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > -                   "quic flow update %uL", last);
> > +    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > +                   "quic stream id:0x%xL flow update %uL", qs->id, last);
> >
> >      qs->recv_offset += len;
> >
> >      if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
> > -        if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
> > +        if (ngx_quic_update_max_stream_data(qs) != NGX_OK) {
> >              return NGX_ERROR;
> >          }
> >      }
> > @@ -1478,15 +1550,13 @@ ngx_quic_update_flow(ngx_connection_t *c
> >
> >
> >  static ngx_int_t
> > -ngx_quic_update_max_stream_data(ngx_connection_t *c)
> > +ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs)
> >  {
> >      uint64_t                recv_max_data;
> >      ngx_connection_t       *pc;
> >      ngx_quic_frame_t       *frame;
> > -    ngx_quic_stream_t      *qs;
> >      ngx_quic_connection_t  *qc;
> >
> > -    qs = c->quic;
> >      pc = qs->parent;
> >      qc = ngx_quic_get_connection(pc);
> >
> > @@ -1502,8 +1572,9 @@ ngx_quic_update_max_stream_data(ngx_conn
> >
> >      qs->recv_max_data = recv_max_data;
> >
> > -    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > -                   "quic flow update msd:%uL", qs->recv_max_data);
> > +    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > +                   "quic stream id:0x%xL flow update msd:%uL",
> > +                   qs->id, qs->recv_max_data);
> >
> >      frame = ngx_quic_alloc_frame(pc);
> >      if (frame == NULL) {
> > diff --git a/src/http/v3/ngx_http_v3_uni.c b/src/http/v3/ngx_http_v3_uni.c
> > --- a/src/http/v3/ngx_http_v3_uni.c
> > +++ b/src/http/v3/ngx_http_v3_uni.c
> > @@ -295,8 +295,6 @@ ngx_http_v3_uni_dummy_write_handler(ngx_
> >  }
> >
> >
> > -/* XXX async & buffered stream writes */
> > -
> >  ngx_connection_t *
> >  ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id)
> >  {
> 
> > _______________________________________________
> > nginx-devel mailing list -- nginx-devel at nginx.org
> > To unsubscribe send an email to nginx-devel-leave at nginx.org
> 
> _______________________________________________
> nginx-devel mailing list -- nginx-devel at nginx.org
> To unsubscribe send an email to nginx-devel-leave at nginx.org

-- 
Roman Arutyunyan



More information about the nginx-devel mailing list