[PATCH] QUIC: stream lingering
Roman Arutyunyan
arut at nginx.com
Tue Feb 8 12:18:13 UTC 2022
On Tue, Feb 08, 2022 at 02:45:19PM +0300, Vladimir Homutov wrote:
> On Mon, Feb 07, 2022 at 05:16:17PM +0300, Roman Arutyunyan wrote:
> > Hi,
> >
> > On Fri, Feb 04, 2022 at 04:56:23PM +0300, Vladimir Homutov wrote:
> > > On Tue, Feb 01, 2022 at 04:39:59PM +0300, Roman Arutyunyan wrote:
> > > > # HG changeset patch
> > > > # User Roman Arutyunyan <arut at nginx.com>
> > > > # Date 1643722727 -10800
> > > > # Tue Feb 01 16:38:47 2022 +0300
> > > > # Branch quic
> > > > # Node ID db31ae16c1f2050be9c9f6b1f117ab6725b97dd4
> > > > # Parent 308ac307b3e6952ef0c5ccf10cc82904c59fa4c3
> > > > QUIC: stream lingering.
> > > >
> > > > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
> > > > can persist after connection is closed by application. During this period,
> > > > server is expecting stream final size from client for correct flow control.
> > > > Also, buffered output is sent to client as more flow control credit is granted.
> > > >
> > > [..]
> > >
> > > > +static ngx_int_t
> > > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > > > +{
> > > > + size_t limit, len;
> > > > + ngx_uint_t last;
> > > > + ngx_chain_t *out, *cl;
> > > > + ngx_quic_frame_t *frame;
> > > > + ngx_connection_t *pc;
> > > > + ngx_quic_connection_t *qc;
> > > > +
> > > > + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
> > > > + return NGX_OK;
> > > > + }
> > > > +
> > > > + pc = qs->parent;
> > > > + qc = ngx_quic_get_connection(pc);
> > > > +
> > > > + limit = ngx_quic_max_stream_flow(qs);
> > > > + last = 0;
> > > > +
> > > > + out = ngx_quic_read_chain(pc, &qs->out, limit);
> > > > + if (out == NGX_CHAIN_ERROR) {
> > > > + return NGX_ERROR;
> > > > + }
> > > > +
> > > > + len = 0;
> > > > + last = 0;
> > >
> > > this assignment looks duplicate.
> >
> > Thanks, fixed.
> >
> > > [..]
> > >
> > > > +static ngx_int_t
> > > > +ngx_quic_close_stream(ngx_quic_stream_t *qs)
> > > > +{
> > > > ngx_connection_t *pc;
> > > > ngx_quic_frame_t *frame;
> > > > - ngx_quic_stream_t *qs;
> > > > ngx_quic_connection_t *qc;
> > > >
> > > > - qs = c->quic;
> > > > pc = qs->parent;
> > > > qc = ngx_quic_get_connection(pc);
> > > >
> > > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > > > - "quic stream id:0x%xL cleanup", qs->id);
> > > > + if (!qc->closing) {
> > > > + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
> > > > + || qs->send_state == NGX_QUIC_STREAM_SEND_READY
> > > > + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
> > > > + {
> > >
> > > so basically this are the states where we need to wait for FIN?
> > > and thus avoid closing till we get it.
> > > I would add a comment here.
> >
> > On the receiving end we wait either for fin or for reset to have final size.
> > On the sending end we wait for everything that's buffered to be sent.
> > Added a comment about that.
> >
> > > [..]
> > > > + if (qs->connection == NULL) {
> > > > + return ngx_quic_close_stream(qs);
> > > > + }
> > > > +
> > > > ngx_quic_set_event(qs->connection->write);
> > >
> > > this pattern - check connection, close if NULL and set event seem to
> > > repeat. Maybe it's worth to try to put this check/action into
> > > ngx_quic_set_event somehow ? we could instead have
> > > set_read_event/set_write_event maybe.
> >
> > I thought about this too, but it's not always that simple. And even if it was,
> > the new function/macro would have unclear semantics. Let's just remember this
> > as a possible future optimiation.
> >
> > > > +static ngx_int_t
> > > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > > > +
> > > [..]
> > > > + if (len == 0 && !last) {
> > > > + return NGX_OK;
> > > > + }
> > > > +
> > > > + frame = ngx_quic_alloc_frame(pc);
> > > > + if (frame == NULL) {
> > > > + return NGX_ERROR;
> > > > + }
> > > > +
> > > > + frame = ngx_quic_alloc_frame(pc);
> > > > + if (frame == NULL) {
> > > > + return NGX_ERROR;
> > > > + }
> > >
> > > one more dup here.
> >
> > Yes, thanks.
> >
> > > Overal, it looks good, but the testing revealed another issue: with big
> > > buffer sizes we run into issue of too long chains in ngx_quic_write_chain().
> > > As discussed, this certainly needs optimization - probably adding some
> > > pointer to the end to facilitate appending, or something else.
> >
> > It's true ngx_quic_write_chain() needs to be optimized. When the buffered
> > chain is big, it takes too much time to find the write point. I'll address
> > this is a separate patch. Meanwhile, attached is an updated version of the
> > current one.
> >
> > In the new version of the patch I also eliminated the
> > ngx_quic_max_stream_flow() function and embedded its content in
> > ngx_quic_stream_flush().
>
> yes, this looks correct - flow limit should not consider buffer as it
> was before.
>
> I think we should check for limit == 0 before doing read_chain and this
> is good place for debug logging about 'hit MAX_DATA/MAX_STREAM_DATA' that
> was removed by update.
I don't know how much do we really need those messages. What really needs to
be added here is sending DATA_BLOCKED/STREAM_DATA_BLOCKED, for which I
already have a separate patch. That patch also adds some logging.
Once we finish with optimization, I'll send it out.
Apart from logging, checking limit == 0 does not seem to make sense, because
even if the limit is zero, we should still proceed, since we are still able to
send fin.
> > --
> > Roman Arutyunyan
>
> > # HG changeset patch
> > # User Roman Arutyunyan <arut at nginx.com>
> > # Date 1644054894 -10800
> > # Sat Feb 05 12:54:54 2022 +0300
> > # Branch quic
> > # Node ID 6e1674c257709341a7508ae4bdab6f7f7d2e9284
> > # Parent 6c1dfd072859022f830aeea49db7cbe3c9f7fb55
> > QUIC: stream lingering.
> >
> > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
> > can persist after connection is closed by application. During this period,
> > server is expecting stream final size from client for correct flow control.
> > Also, buffered output is sent to client as more flow control credit is granted.
> >
> > diff --git a/src/event/quic/ngx_event_quic.c b/src/event/quic/ngx_event_quic.c
> > --- a/src/event/quic/ngx_event_quic.c
> > +++ b/src/event/quic/ngx_event_quic.c
> > @@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t
> > ctp->active_connection_id_limit = 2;
> >
> > ngx_queue_init(&qc->streams.uninitialized);
> > + ngx_queue_init(&qc->streams.free);
> >
> > qc->streams.recv_max_data = qc->tp.initial_max_data;
> > qc->streams.recv_window = qc->streams.recv_max_data;
> > diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h
> > --- a/src/event/quic/ngx_event_quic.h
> > +++ b/src/event/quic/ngx_event_quic.h
> > @@ -78,12 +78,14 @@ struct ngx_quic_stream_s {
> > uint64_t id;
> > uint64_t acked;
> > uint64_t send_max_data;
> > + uint64_t send_offset;
> > + uint64_t send_final_size;
> > uint64_t recv_max_data;
> > uint64_t recv_offset;
> > uint64_t recv_window;
> > uint64_t recv_last;
> > uint64_t recv_size;
> > - uint64_t final_size;
> > + uint64_t recv_final_size;
> > ngx_chain_t *in;
> > ngx_chain_t *out;
> > ngx_uint_t cancelable; /* unsigned cancelable:1; */
> > diff --git a/src/event/quic/ngx_event_quic_connection.h b/src/event/quic/ngx_event_quic_connection.h
> > --- a/src/event/quic/ngx_event_quic_connection.h
> > +++ b/src/event/quic/ngx_event_quic_connection.h
> > @@ -114,13 +114,16 @@ struct ngx_quic_socket_s {
> > typedef struct {
> > ngx_rbtree_t tree;
> > ngx_rbtree_node_t sentinel;
> > +
> > ngx_queue_t uninitialized;
> > + ngx_queue_t free;
> >
> > uint64_t sent;
> > uint64_t recv_offset;
> > uint64_t recv_window;
> > uint64_t recv_last;
> > uint64_t recv_max_data;
> > + uint64_t send_offset;
> > uint64_t send_max_data;
> >
> > uint64_t server_max_streams_uni;
> > diff --git a/src/event/quic/ngx_event_quic_frames.c b/src/event/quic/ngx_event_quic_frames.c
> > --- a/src/event/quic/ngx_event_quic_frames.c
> > +++ b/src/event/quic/ngx_event_quic_frames.c
> > @@ -391,6 +391,10 @@ ngx_quic_split_frame(ngx_connection_t *c
> > return NGX_ERROR;
> > }
> >
> > + if (f->type == NGX_QUIC_FT_STREAM) {
> > + f->u.stream.fin = 0;
> > + }
> > +
> > ngx_queue_insert_after(&f->queue, &nf->queue);
> >
> > return NGX_OK;
> > diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
> > --- a/src/event/quic/ngx_event_quic_streams.c
> > +++ b/src/event/quic/ngx_event_quic_streams.c
> > @@ -13,6 +13,8 @@
> > #define NGX_QUIC_STREAM_GONE (void *) -1
> >
> >
> > +static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs,
> > + ngx_uint_t err);
> > static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c);
> > static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c);
> > static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
> > @@ -28,11 +30,12 @@ static ssize_t ngx_quic_stream_send(ngx_
> > size_t size);
> > static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
> > ngx_chain_t *in, off_t limit);
> > -static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
> > +static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs);
> > static void ngx_quic_stream_cleanup_handler(void *data);
> > -static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
> > -static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
> > -static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c);
> > +static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs);
> > +static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last);
> > +static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last);
> > +static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs);
> > static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c);
> > static void ngx_quic_set_event(ngx_event_t *ev);
> >
> > @@ -186,15 +189,20 @@ ngx_quic_close_streams(ngx_connection_t
> > ns = 0;
> > #endif
> >
> > - for (node = ngx_rbtree_min(tree->root, tree->sentinel);
> > - node;
> > - node = ngx_rbtree_next(tree, node))
> > - {
> > + node = ngx_rbtree_min(tree->root, tree->sentinel);
> > +
> > + while (node) {
> > qs = (ngx_quic_stream_t *) node;
> > + node = ngx_rbtree_next(tree, node);
> >
> > qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
> > qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
> >
> > + if (qs->connection == NULL) {
> > + ngx_quic_close_stream(qs);
> > + continue;
> > + }
> > +
> > ngx_quic_set_event(qs->connection->read);
> > ngx_quic_set_event(qs->connection->write);
> >
> > @@ -213,13 +221,17 @@ ngx_quic_close_streams(ngx_connection_t
> > ngx_int_t
> > ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
> > {
> > + return ngx_quic_do_reset_stream(c->quic, err);
> > +}
> > +
> > +
> > +static ngx_int_t
> > +ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err)
> > +{
> > ngx_connection_t *pc;
> > ngx_quic_frame_t *frame;
> > - ngx_quic_stream_t *qs;
> > ngx_quic_connection_t *qc;
> >
> > - qs = c->quic;
> > -
> > if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
> > || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
> > || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
> > @@ -228,10 +240,14 @@ ngx_quic_reset_stream(ngx_connection_t *
> > }
> >
> > qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
> > + qs->send_final_size = qs->send_offset;
> >
> > pc = qs->parent;
> > qc = ngx_quic_get_connection(pc);
> >
> > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > + "quic stream id:0x%xL reset", qs->id);
> > +
> > frame = ngx_quic_alloc_frame(pc);
> > if (frame == NULL) {
> > return NGX_ERROR;
> > @@ -241,10 +257,13 @@ ngx_quic_reset_stream(ngx_connection_t *
> > frame->type = NGX_QUIC_FT_RESET_STREAM;
> > frame->u.reset_stream.id = qs->id;
> > frame->u.reset_stream.error_code = err;
> > - frame->u.reset_stream.final_size = c->sent;
> > + frame->u.reset_stream.final_size = qs->send_offset;
> >
> > ngx_quic_queue_frame(qc, frame);
> >
> > + ngx_quic_free_chain(pc, qs->out);
> > + qs->out = NULL;
> > +
> > return NGX_OK;
> > }
> >
> > @@ -271,10 +290,7 @@ ngx_quic_shutdown_stream(ngx_connection_
> > static ngx_int_t
> > ngx_quic_shutdown_stream_send(ngx_connection_t *c)
> > {
> > - ngx_connection_t *pc;
> > - ngx_quic_frame_t *frame;
> > - ngx_quic_stream_t *qs;
> > - ngx_quic_connection_t *qc;
> > + ngx_quic_stream_t *qs;
> >
> > qs = c->quic;
> >
> > @@ -284,32 +300,13 @@ ngx_quic_shutdown_stream_send(ngx_connec
> > return NGX_OK;
> > }
> >
> > - qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
> > -
> > - pc = qs->parent;
> > - qc = ngx_quic_get_connection(pc);
> > + qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
> > + qs->send_final_size = c->sent;
> >
> > - frame = ngx_quic_alloc_frame(pc);
> > - if (frame == NULL) {
> > - return NGX_ERROR;
> > - }
> > -
> > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
> > "quic stream id:0x%xL send shutdown", qs->id);
> >
> > - frame->level = ssl_encryption_application;
> > - frame->type = NGX_QUIC_FT_STREAM;
> > - frame->u.stream.off = 1;
> > - frame->u.stream.len = 1;
> > - frame->u.stream.fin = 1;
> > -
> > - frame->u.stream.stream_id = qs->id;
> > - frame->u.stream.offset = c->sent;
> > - frame->u.stream.length = 0;
> > -
> > - ngx_quic_queue_frame(qc, frame);
> > -
> > - return NGX_OK;
> > + return ngx_quic_stream_flush(qs);
> > }
> >
> >
> > @@ -341,7 +338,7 @@ ngx_quic_shutdown_stream_recv(ngx_connec
> > return NGX_ERROR;
> > }
> >
> > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > "quic stream id:0x%xL recv shutdown", qs->id);
> >
> > frame->level = ssl_encryption_application;
> > @@ -591,6 +588,7 @@ ngx_quic_create_stream(ngx_connection_t
> > {
> > ngx_log_t *log;
> > ngx_pool_t *pool;
> > + ngx_queue_t *q;
> > ngx_connection_t *sc;
> > ngx_quic_stream_t *qs;
> > ngx_pool_cleanup_t *cln;
> > @@ -601,25 +599,41 @@ ngx_quic_create_stream(ngx_connection_t
> >
> > qc = ngx_quic_get_connection(c);
> >
> > - pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
> > - if (pool == NULL) {
> > - return NULL;
> > + if (!ngx_queue_empty(&qc->streams.free)) {
> > + q = ngx_queue_head(&qc->streams.free);
> > + qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
> > + ngx_queue_remove(&qs->queue);
> > +
> > + } else {
> > + /*
> > + * the number of streams is limited by transport
> > + * parameters and application requirements
> > + */
> > +
> > + qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t));
> > + if (qs == NULL) {
> > + return NULL;
> > + }
> > }
> >
> > - qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
> > - if (qs == NULL) {
> > - ngx_destroy_pool(pool);
> > - return NULL;
> > - }
> > + ngx_memzero(qs, sizeof(ngx_quic_stream_t));
> >
> > qs->node.key = id;
> > qs->parent = c;
> > qs->id = id;
> > - qs->final_size = (uint64_t) -1;
> > + qs->send_final_size = (uint64_t) -1;
> > + qs->recv_final_size = (uint64_t) -1;
> > +
> > + pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
> > + if (pool == NULL) {
> > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> > + return NULL;
> > + }
> >
> > log = ngx_palloc(pool, sizeof(ngx_log_t));
> > if (log == NULL) {
> > ngx_destroy_pool(pool);
> > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> > return NULL;
> > }
> >
> > @@ -629,6 +643,7 @@ ngx_quic_create_stream(ngx_connection_t
> > sc = ngx_get_connection(c->fd, log);
> > if (sc == NULL) {
> > ngx_destroy_pool(pool);
> > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> > return NULL;
> > }
> >
> > @@ -697,6 +712,7 @@ ngx_quic_create_stream(ngx_connection_t
> > if (cln == NULL) {
> > ngx_close_connection(sc);
> > ngx_destroy_pool(pool);
> > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> > return NULL;
> > }
> >
> > @@ -737,7 +753,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
> > return NGX_ERROR;
> > }
> >
> > - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > "quic stream id:0x%xL recv buf:%uz", qs->id, size);
> >
> > if (size == 0) {
> > @@ -763,7 +779,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
> > rev->ready = 0;
> >
> > if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD
> > - && qs->recv_offset == qs->final_size)
> > + && qs->recv_offset == qs->recv_final_size)
> > {
> > qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
> > }
> > @@ -781,7 +797,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
> > ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > "quic stream id:0x%xL recv len:%z", qs->id, len);
> >
> > - if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
> > + if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) {
> > return NGX_ERROR;
> > }
> >
> > @@ -822,9 +838,7 @@ ngx_quic_stream_send_chain(ngx_connectio
> > off_t flow;
> > size_t n;
> > ngx_event_t *wev;
> > - ngx_chain_t *out;
> > ngx_connection_t *pc;
> > - ngx_quic_frame_t *frame;
> > ngx_quic_stream_t *qs;
> > ngx_quic_connection_t *qc;
> >
> > @@ -842,7 +856,8 @@ ngx_quic_stream_send_chain(ngx_connectio
> >
> > qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
> >
> > - flow = ngx_quic_max_stream_flow(c);
> > + flow = qs->acked + qc->conf->stream_buffer_size - c->sent;
> > +
> > if (flow == 0) {
> > wev->ready = 0;
> > return in;
> > @@ -852,37 +867,15 @@ ngx_quic_stream_send_chain(ngx_connectio
> > limit = flow;
> > }
> >
> > - in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n);
> > + in = ngx_quic_write_chain(pc, &qs->out, in, limit,
> > + c->sent - qs->send_offset, &n);
> > if (in == NGX_CHAIN_ERROR) {
> > return NGX_CHAIN_ERROR;
> > }
> >
> > - out = ngx_quic_read_chain(pc, &qs->out, n);
> > - if (out == NGX_CHAIN_ERROR) {
> > - return NGX_CHAIN_ERROR;
> > - }
> > -
> > - frame = ngx_quic_alloc_frame(pc);
> > - if (frame == NULL) {
> > - return NGX_CHAIN_ERROR;
> > - }
> > -
> > - frame->level = ssl_encryption_application;
> > - frame->type = NGX_QUIC_FT_STREAM;
> > - frame->data = out;
> > - frame->u.stream.off = 1;
> > - frame->u.stream.len = 1;
> > - frame->u.stream.fin = 0;
> > -
> > - frame->u.stream.stream_id = qs->id;
> > - frame->u.stream.offset = c->sent;
> > - frame->u.stream.length = n;
> > -
> > c->sent += n;
> > qc->streams.sent += n;
> >
> > - ngx_quic_queue_frame(qc, frame);
> > -
> > if (in) {
> > wev->ready = 0;
> > }
> > @@ -890,61 +883,96 @@ ngx_quic_stream_send_chain(ngx_connectio
> > ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > "quic send_chain sent:%uz", n);
> >
> > + if (ngx_quic_stream_flush(qs) != NGX_OK) {
> > + return NGX_CHAIN_ERROR;
> > + }
> > +
> > return in;
> > }
> >
> >
> > -static size_t
> > -ngx_quic_max_stream_flow(ngx_connection_t *c)
> > +static ngx_int_t
> > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > {
> > - size_t size;
> > - uint64_t sent, unacked;
> > - ngx_quic_stream_t *qs;
> > + off_t limit;
> > + size_t len;
> > + ngx_uint_t last;
> > + ngx_chain_t *out, *cl;
> > + ngx_quic_frame_t *frame;
> > + ngx_connection_t *pc;
> > ngx_quic_connection_t *qc;
> >
> > - qs = c->quic;
> > - qc = ngx_quic_get_connection(qs->parent);
> > + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
> > + return NGX_OK;
> > + }
> >
> > - size = qc->conf->stream_buffer_size;
> > - sent = c->sent;
> > - unacked = sent - qs->acked;
> > + pc = qs->parent;
> > + qc = ngx_quic_get_connection(pc);
> >
> > if (qc->streams.send_max_data == 0) {
> > qc->streams.send_max_data = qc->ctp.initial_max_data;
> > }
> >
> > - if (unacked >= size) {
> > - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > - "quic send flow hit buffer size");
> > - return 0;
> > + limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset,
> > + qs->send_max_data - qs->send_offset);
> > +
> > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > + "quic stream id:0x%xL flush limit:%O", qs->id, limit);
> > +
> > + out = ngx_quic_read_chain(pc, &qs->out, limit);
> > + if (out == NGX_CHAIN_ERROR) {
> > + return NGX_ERROR;
> > }
> >
> > - size -= unacked;
> > + len = 0;
> > + last = 0;
> > +
> > + for (cl = out; cl; cl = cl->next) {
> > + len += cl->buf->last - cl->buf->pos;
> > + }
> >
> > - if (qc->streams.sent >= qc->streams.send_max_data) {
> > - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > - "quic send flow hit MAX_DATA");
> > - return 0;
> > + if (qs->send_final_size != (uint64_t) -1
> > + && qs->send_final_size == qs->send_offset + len)
> > + {
> > + qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
> > + last = 1;
> > + }
> > +
> > + if (len == 0 && !last) {
> > + return NGX_OK;
> > }
> >
> > - if (qc->streams.sent + size > qc->streams.send_max_data) {
> > - size = qc->streams.send_max_data - qc->streams.sent;
> > + frame = ngx_quic_alloc_frame(pc);
> > + if (frame == NULL) {
> > + return NGX_ERROR;
> > }
> >
> > - if (sent >= qs->send_max_data) {
> > - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > - "quic send flow hit MAX_STREAM_DATA");
> > - return 0;
> > + frame->level = ssl_encryption_application;
> > + frame->type = NGX_QUIC_FT_STREAM;
> > + frame->data = out;
> > +
> > + frame->u.stream.off = 1;
> > + frame->u.stream.len = 1;
> > + frame->u.stream.fin = last;
> > +
> > + frame->u.stream.stream_id = qs->id;
> > + frame->u.stream.offset = qs->send_offset;
> > + frame->u.stream.length = len;
> > +
> > + ngx_quic_queue_frame(qc, frame);
> > +
> > + qs->send_offset += len;
> > + qc->streams.send_offset += len;
> > +
> > + ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > + "quic stream id:0x%xL flush len:%uz last:%ui",
> > + qs->id, len, last);
> > +
> > + if (qs->connection == NULL) {
> > + return ngx_quic_close_stream(qs);
> > }
> >
> > - if (sent + size > qs->send_max_data) {
> > - size = qs->send_max_data - sent;
> > - }
> > -
> > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > - "quic send flow:%uz", size);
> > -
> > - return size;
> > + return NGX_OK;
> > }
> >
> >
> > @@ -953,40 +981,67 @@ ngx_quic_stream_cleanup_handler(void *da
> > {
> > ngx_connection_t *c = data;
> >
> > + ngx_quic_stream_t *qs;
> > +
> > + qs = c->quic;
> > +
> > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
> > + "quic stream id:0x%xL cleanup", qs->id);
> > +
> > + if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) {
> > + ngx_quic_close_connection(c, NGX_ERROR);
> > + return;
> > + }
> > +
> > + qs->connection = NULL;
> > +
> > + if (ngx_quic_close_stream(qs) != NGX_OK) {
> > + ngx_quic_close_connection(c, NGX_ERROR);
> > + return;
> > + }
> > +}
> > +
> > +
> > +static ngx_int_t
> > +ngx_quic_close_stream(ngx_quic_stream_t *qs)
> > +{
> > ngx_connection_t *pc;
> > ngx_quic_frame_t *frame;
> > - ngx_quic_stream_t *qs;
> > ngx_quic_connection_t *qc;
> >
> > - qs = c->quic;
> > pc = qs->parent;
> > qc = ngx_quic_get_connection(pc);
> >
> > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > - "quic stream id:0x%xL cleanup", qs->id);
> > + if (!qc->closing) {
> > + /* make sure everything is sent and final size is received */
> > +
> > + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
> > + || qs->send_state == NGX_QUIC_STREAM_SEND_READY
> > + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
> > + {
> > + return NGX_OK;
> > + }
> > + }
> > +
> > + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > + "quic stream id:0x%xL close", qs->id);
> > +
> > + ngx_quic_free_chain(pc, qs->in);
> > + ngx_quic_free_chain(pc, qs->out);
> >
> > ngx_rbtree_delete(&qc->streams.tree, &qs->node);
> > - ngx_quic_free_chain(pc, qs->in);
> > - ngx_quic_free_chain(pc, qs->out);
> > + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> >
> > if (qc->closing) {
> > /* schedule handler call to continue ngx_quic_close_connection() */
> > ngx_post_event(pc->read, &ngx_posted_events);
> > - return;
> > + return NGX_OK;
> > }
> >
> > - if (qc->error) {
> > - goto done;
> > - }
> > -
> > - (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN);
> > -
> > - (void) ngx_quic_update_flow(c, qs->recv_last);
> > -
> > if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
> > frame = ngx_quic_alloc_frame(pc);
> > if (frame == NULL) {
> > - goto done;
> > + return NGX_ERROR;
> > }
> >
> > frame->level = ssl_encryption_application;
> > @@ -1004,13 +1059,11 @@ ngx_quic_stream_cleanup_handler(void *da
> > ngx_quic_queue_frame(qc, frame);
> > }
> >
> > -done:
> > -
> > - (void) ngx_quic_output(pc);
> > -
> > if (qc->shutdown) {
> > ngx_post_event(pc->read, &ngx_posted_events);
> > }
> > +
> > + return NGX_OK;
> > }
> >
> >
> > @@ -1020,7 +1073,6 @@ ngx_quic_handle_stream_frame(ngx_connect
> > {
> > size_t size;
> > uint64_t last;
> > - ngx_connection_t *sc;
> > ngx_quic_stream_t *qs;
> > ngx_quic_connection_t *qc;
> > ngx_quic_stream_frame_t *f;
> > @@ -1048,19 +1100,17 @@ ngx_quic_handle_stream_frame(ngx_connect
> > return NGX_OK;
> > }
> >
> > - sc = qs->connection;
> > -
> > if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
> > && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
> > {
> > return NGX_OK;
> > }
> >
> > - if (ngx_quic_control_flow(sc, last) != NGX_OK) {
> > + if (ngx_quic_control_flow(qs, last) != NGX_OK) {
> > return NGX_ERROR;
> > }
> >
> > - if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
> > + if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) {
> > qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
> > return NGX_ERROR;
> > }
> > @@ -1075,7 +1125,8 @@ ngx_quic_handle_stream_frame(ngx_connect
> > }
> >
> > if (f->fin) {
> > - if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
> > + if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last)
> > + {
> > qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
> > return NGX_ERROR;
> > }
> > @@ -1085,7 +1136,7 @@ ngx_quic_handle_stream_frame(ngx_connect
> > return NGX_ERROR;
> > }
> >
> > - qs->final_size = last;
> > + qs->recv_final_size = last;
> > qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
> > }
> >
> > @@ -1099,13 +1150,17 @@ ngx_quic_handle_stream_frame(ngx_connect
> > qs->recv_size += size;
> >
> > if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
> > - && qs->recv_size == qs->final_size)
> > + && qs->recv_size == qs->recv_final_size)
> > {
> > qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD;
> > }
> >
> > + if (qs->connection == NULL) {
> > + return ngx_quic_close_stream(qs);
> > + }
> > +
> > if (f->offset == qs->recv_offset) {
> > - ngx_quic_set_event(sc->read);
> > + ngx_quic_set_event(qs->connection->read);
> > }
> >
> > return NGX_OK;
> > @@ -1128,20 +1183,26 @@ ngx_quic_handle_max_data_frame(ngx_conne
> > return NGX_OK;
> > }
> >
> > - if (tree->root != tree->sentinel
> > - && qc->streams.sent >= qc->streams.send_max_data)
> > + if (tree->root == tree->sentinel
> > + || qc->streams.send_offset < qc->streams.send_max_data)
> > {
> > -
> > - for (node = ngx_rbtree_min(tree->root, tree->sentinel);
> > - node;
> > - node = ngx_rbtree_next(tree, node))
> > - {
> > - qs = (ngx_quic_stream_t *) node;
> > - ngx_quic_set_event(qs->connection->write);
> > - }
> > + /* not blocked on MAX_DATA */
> > + qc->streams.send_max_data = f->max_data;
> > + return NGX_OK;
> > }
> >
> > qc->streams.send_max_data = f->max_data;
> > + node = ngx_rbtree_min(tree->root, tree->sentinel);
> > +
> > + while (node && qc->streams.send_offset < qc->streams.send_max_data) {
> > +
> > + qs = (ngx_quic_stream_t *) node;
> > + node = ngx_rbtree_next(tree, node);
> > +
> > + if (ngx_quic_stream_flush(qs) != NGX_OK) {
> > + return NGX_ERROR;
> > + }
> > + }
> >
> > return NGX_OK;
> > }
> > @@ -1189,7 +1250,7 @@ ngx_quic_handle_stream_data_blocked_fram
> > return NGX_OK;
> > }
> >
> > - return ngx_quic_update_max_stream_data(qs->connection);
> > + return ngx_quic_update_max_stream_data(qs);
> > }
> >
> >
> > @@ -1197,7 +1258,6 @@ ngx_int_t
> > ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
> > ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
> > {
> > - uint64_t sent;
> > ngx_quic_stream_t *qs;
> > ngx_quic_connection_t *qc;
> >
> > @@ -1224,15 +1284,15 @@ ngx_quic_handle_max_stream_data_frame(ng
> > return NGX_OK;
> > }
> >
> > - sent = qs->connection->sent;
> > -
> > - if (sent >= qs->send_max_data) {
> > - ngx_quic_set_event(qs->connection->write);
> > + if (qs->send_offset < qs->send_max_data) {
> > + /* not blocked on MAX_STREAM_DATA */
> > + qs->send_max_data = f->limit;
> > + return NGX_OK;
> > }
> >
> > qs->send_max_data = f->limit;
> >
> > - return NGX_OK;
> > + return ngx_quic_stream_flush(qs);
> > }
> >
> >
> > @@ -1240,7 +1300,6 @@ ngx_int_t
> > ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
> > ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
> > {
> > - ngx_connection_t *sc;
> > ngx_quic_stream_t *qs;
> > ngx_quic_connection_t *qc;
> >
> > @@ -1271,13 +1330,13 @@ ngx_quic_handle_reset_stream_frame(ngx_c
> >
> > qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
> >
> > - sc = qs->connection;
> > -
> > - if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
> > + if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) {
> > return NGX_ERROR;
> > }
> >
> > - if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
> > + if (qs->recv_final_size != (uint64_t) -1
> > + && qs->recv_final_size != f->final_size)
> > + {
> > qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
> > return NGX_ERROR;
> > }
> > @@ -1287,12 +1346,16 @@ ngx_quic_handle_reset_stream_frame(ngx_c
> > return NGX_ERROR;
> > }
> >
> > - qs->final_size = f->final_size;
> > + qs->recv_final_size = f->final_size;
> >
> > - if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
> > + if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) {
> > return NGX_ERROR;
> > }
> >
> > + if (qs->connection == NULL) {
> > + return ngx_quic_close_stream(qs);
> > + }
> > +
> > ngx_quic_set_event(qs->connection->read);
> >
> > return NGX_OK;
> > @@ -1325,10 +1388,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c
> > return NGX_OK;
> > }
> >
> > - if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {
> > + if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) {
> > return NGX_ERROR;
> > }
> >
> > + if (qs->connection == NULL) {
> > + return ngx_quic_close_stream(qs);
> > + }
> > +
> > ngx_quic_set_event(qs->connection->write);
> >
> > return NGX_OK;
> > @@ -1378,30 +1445,37 @@ ngx_quic_handle_stream_ack(ngx_connectio
> > return;
> > }
> >
> > + if (qs->connection == NULL) {
> > + qs->acked += f->u.stream.length;
> > + return;
> > + }
> > +
> > sent = qs->connection->sent;
> > unacked = sent - qs->acked;
> > + qs->acked += f->u.stream.length;
> >
> > - if (unacked >= qc->conf->stream_buffer_size) {
> > - ngx_quic_set_event(qs->connection->write);
> > + ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > + "quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL",
> > + qs->id, f->u.stream.length, qs->acked, sent - qs->acked);
> > +
> > + if (unacked != qc->conf->stream_buffer_size) {
> > + /* not blocked on buffer size */
> > + return;
> > }
> >
> > - qs->acked += f->u.stream.length;
> > -
> > - ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
> > - "quic stream ack len:%uL acked:%uL unacked:%uL",
> > - f->u.stream.length, qs->acked, sent - qs->acked);
> > + ngx_quic_set_event(qs->connection->write);
> > }
> >
> >
> > static ngx_int_t
> > -ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
> > +ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last)
> > {
> > uint64_t len;
> > - ngx_quic_stream_t *qs;
> > + ngx_connection_t *pc;
> > ngx_quic_connection_t *qc;
> >
> > - qs = c->quic;
> > - qc = ngx_quic_get_connection(qs->parent);
> > + pc = qs->parent;
> > + qc = ngx_quic_get_connection(pc);
> >
> > if (last <= qs->recv_last) {
> > return NGX_OK;
> > @@ -1409,9 +1483,9 @@ ngx_quic_control_flow(ngx_connection_t *
> >
> > len = last - qs->recv_last;
> >
> > - ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > - "quic flow control msd:%uL/%uL md:%uL/%uL",
> > - last, qs->recv_max_data, qc->streams.recv_last + len,
> > + ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > + "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL",
> > + qs->id, last, qs->recv_max_data, qc->streams.recv_last + len,
> > qc->streams.recv_max_data);
> >
> > qs->recv_last += len;
> > @@ -1435,14 +1509,12 @@ ngx_quic_control_flow(ngx_connection_t *
> >
> >
> > static ngx_int_t
> > -ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
> > +ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last)
> > {
> > uint64_t len;
> > ngx_connection_t *pc;
> > - ngx_quic_stream_t *qs;
> > ngx_quic_connection_t *qc;
> >
> > - qs = c->quic;
> > pc = qs->parent;
> > qc = ngx_quic_get_connection(pc);
> >
> > @@ -1452,13 +1524,13 @@ ngx_quic_update_flow(ngx_connection_t *c
> >
> > len = last - qs->recv_offset;
> >
> > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > - "quic flow update %uL", last);
> > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > + "quic stream id:0x%xL flow update %uL", qs->id, last);
> >
> > qs->recv_offset += len;
> >
> > if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
> > - if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
> > + if (ngx_quic_update_max_stream_data(qs) != NGX_OK) {
> > return NGX_ERROR;
> > }
> > }
> > @@ -1478,15 +1550,13 @@ ngx_quic_update_flow(ngx_connection_t *c
> >
> >
> > static ngx_int_t
> > -ngx_quic_update_max_stream_data(ngx_connection_t *c)
> > +ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs)
> > {
> > uint64_t recv_max_data;
> > ngx_connection_t *pc;
> > ngx_quic_frame_t *frame;
> > - ngx_quic_stream_t *qs;
> > ngx_quic_connection_t *qc;
> >
> > - qs = c->quic;
> > pc = qs->parent;
> > qc = ngx_quic_get_connection(pc);
> >
> > @@ -1502,8 +1572,9 @@ ngx_quic_update_max_stream_data(ngx_conn
> >
> > qs->recv_max_data = recv_max_data;
> >
> > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > - "quic flow update msd:%uL", qs->recv_max_data);
> > + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> > + "quic stream id:0x%xL flow update msd:%uL",
> > + qs->id, qs->recv_max_data);
> >
> > frame = ngx_quic_alloc_frame(pc);
> > if (frame == NULL) {
> > diff --git a/src/http/v3/ngx_http_v3_uni.c b/src/http/v3/ngx_http_v3_uni.c
> > --- a/src/http/v3/ngx_http_v3_uni.c
> > +++ b/src/http/v3/ngx_http_v3_uni.c
> > @@ -295,8 +295,6 @@ ngx_http_v3_uni_dummy_write_handler(ngx_
> > }
> >
> >
> > -/* XXX async & buffered stream writes */
> > -
> > ngx_connection_t *
> > ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id)
> > {
>
> > _______________________________________________
> > nginx-devel mailing list -- nginx-devel at nginx.org
> > To unsubscribe send an email to nginx-devel-leave at nginx.org
>
> _______________________________________________
> nginx-devel mailing list -- nginx-devel at nginx.org
> To unsubscribe send an email to nginx-devel-leave at nginx.org
--
Roman Arutyunyan
More information about the nginx-devel
mailing list