[PATCH] QUIC: stream lingering
Vladimir Homutov
vl at nginx.com
Tue Feb 8 11:45:19 UTC 2022
On Mon, Feb 07, 2022 at 05:16:17PM +0300, Roman Arutyunyan wrote:
> Hi,
>
> On Fri, Feb 04, 2022 at 04:56:23PM +0300, Vladimir Homutov wrote:
> > On Tue, Feb 01, 2022 at 04:39:59PM +0300, Roman Arutyunyan wrote:
> > > # HG changeset patch
> > > # User Roman Arutyunyan <arut at nginx.com>
> > > # Date 1643722727 -10800
> > > # Tue Feb 01 16:38:47 2022 +0300
> > > # Branch quic
> > > # Node ID db31ae16c1f2050be9c9f6b1f117ab6725b97dd4
> > > # Parent 308ac307b3e6952ef0c5ccf10cc82904c59fa4c3
> > > QUIC: stream lingering.
> > >
> > > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
> > > can persist after connection is closed by application. During this period,
> > > server is expecting stream final size from client for correct flow control.
> > > Also, buffered output is sent to client as more flow control credit is granted.
> > >
> > [..]
> >
> > > +static ngx_int_t
> > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > > +{
> > > + size_t limit, len;
> > > + ngx_uint_t last;
> > > + ngx_chain_t *out, *cl;
> > > + ngx_quic_frame_t *frame;
> > > + ngx_connection_t *pc;
> > > + ngx_quic_connection_t *qc;
> > > +
> > > + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
> > > + return NGX_OK;
> > > + }
> > > +
> > > + pc = qs->parent;
> > > + qc = ngx_quic_get_connection(pc);
> > > +
> > > + limit = ngx_quic_max_stream_flow(qs);
> > > + last = 0;
> > > +
> > > + out = ngx_quic_read_chain(pc, &qs->out, limit);
> > > + if (out == NGX_CHAIN_ERROR) {
> > > + return NGX_ERROR;
> > > + }
> > > +
> > > + len = 0;
> > > + last = 0;
> >
> > this assignment looks duplicate.
>
> Thanks, fixed.
>
> > [..]
> >
> > > +static ngx_int_t
> > > +ngx_quic_close_stream(ngx_quic_stream_t *qs)
> > > +{
> > > ngx_connection_t *pc;
> > > ngx_quic_frame_t *frame;
> > > - ngx_quic_stream_t *qs;
> > > ngx_quic_connection_t *qc;
> > >
> > > - qs = c->quic;
> > > pc = qs->parent;
> > > qc = ngx_quic_get_connection(pc);
> > >
> > > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > > - "quic stream id:0x%xL cleanup", qs->id);
> > > + if (!qc->closing) {
> > > + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
> > > + || qs->send_state == NGX_QUIC_STREAM_SEND_READY
> > > + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
> > > + {
> >
> > so basically this are the states where we need to wait for FIN?
> > and thus avoid closing till we get it.
> > I would add a comment here.
>
> On the receiving end we wait either for fin or for reset to have final size.
> On the sending end we wait for everything that's buffered to be sent.
> Added a comment about that.
>
> > [..]
> > > + if (qs->connection == NULL) {
> > > + return ngx_quic_close_stream(qs);
> > > + }
> > > +
> > > ngx_quic_set_event(qs->connection->write);
> >
> > this pattern - check connection, close if NULL and set event seem to
> > repeat. Maybe it's worth to try to put this check/action into
> > ngx_quic_set_event somehow ? we could instead have
> > set_read_event/set_write_event maybe.
>
> I thought about this too, but it's not always that simple. And even if it was,
> the new function/macro would have unclear semantics. Let's just remember this
> as a possible future optimiation.
>
> > > +static ngx_int_t
> > > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > > +
> > [..]
> > > + if (len == 0 && !last) {
> > > + return NGX_OK;
> > > + }
> > > +
> > > + frame = ngx_quic_alloc_frame(pc);
> > > + if (frame == NULL) {
> > > + return NGX_ERROR;
> > > + }
> > > +
> > > + frame = ngx_quic_alloc_frame(pc);
> > > + if (frame == NULL) {
> > > + return NGX_ERROR;
> > > + }
> >
> > one more dup here.
>
> Yes, thanks.
>
> > Overal, it looks good, but the testing revealed another issue: with big
> > buffer sizes we run into issue of too long chains in ngx_quic_write_chain().
> > As discussed, this certainly needs optimization - probably adding some
> > pointer to the end to facilitate appending, or something else.
>
> It's true ngx_quic_write_chain() needs to be optimized. When the buffered
> chain is big, it takes too much time to find the write point. I'll address
> this is a separate patch. Meanwhile, attached is an updated version of the
> current one.
>
> In the new version of the patch I also eliminated the
> ngx_quic_max_stream_flow() function and embedded its content in
> ngx_quic_stream_flush().
yes, this looks correct - flow limit should not consider buffer as it
was before.
I think we should check for limit == 0 before doing read_chain and this
is good place for debug logging about 'hit MAX_DATA/MAX_STREAM_DATA' that
was removed by update.
>
> --
> Roman Arutyunyan
> # HG changeset patch
> # User Roman Arutyunyan <arut at nginx.com>
> # Date 1644054894 -10800
> # Sat Feb 05 12:54:54 2022 +0300
> # Branch quic
> # Node ID 6e1674c257709341a7508ae4bdab6f7f7d2e9284
> # Parent 6c1dfd072859022f830aeea49db7cbe3c9f7fb55
> QUIC: stream lingering.
>
> Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
> can persist after connection is closed by application. During this period,
> server is expecting stream final size from client for correct flow control.
> Also, buffered output is sent to client as more flow control credit is granted.
>
> diff --git a/src/event/quic/ngx_event_quic.c b/src/event/quic/ngx_event_quic.c
> --- a/src/event/quic/ngx_event_quic.c
> +++ b/src/event/quic/ngx_event_quic.c
> @@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t
> ctp->active_connection_id_limit = 2;
>
> ngx_queue_init(&qc->streams.uninitialized);
> + ngx_queue_init(&qc->streams.free);
>
> qc->streams.recv_max_data = qc->tp.initial_max_data;
> qc->streams.recv_window = qc->streams.recv_max_data;
> diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h
> --- a/src/event/quic/ngx_event_quic.h
> +++ b/src/event/quic/ngx_event_quic.h
> @@ -78,12 +78,14 @@ struct ngx_quic_stream_s {
> uint64_t id;
> uint64_t acked;
> uint64_t send_max_data;
> + uint64_t send_offset;
> + uint64_t send_final_size;
> uint64_t recv_max_data;
> uint64_t recv_offset;
> uint64_t recv_window;
> uint64_t recv_last;
> uint64_t recv_size;
> - uint64_t final_size;
> + uint64_t recv_final_size;
> ngx_chain_t *in;
> ngx_chain_t *out;
> ngx_uint_t cancelable; /* unsigned cancelable:1; */
> diff --git a/src/event/quic/ngx_event_quic_connection.h b/src/event/quic/ngx_event_quic_connection.h
> --- a/src/event/quic/ngx_event_quic_connection.h
> +++ b/src/event/quic/ngx_event_quic_connection.h
> @@ -114,13 +114,16 @@ struct ngx_quic_socket_s {
> typedef struct {
> ngx_rbtree_t tree;
> ngx_rbtree_node_t sentinel;
> +
> ngx_queue_t uninitialized;
> + ngx_queue_t free;
>
> uint64_t sent;
> uint64_t recv_offset;
> uint64_t recv_window;
> uint64_t recv_last;
> uint64_t recv_max_data;
> + uint64_t send_offset;
> uint64_t send_max_data;
>
> uint64_t server_max_streams_uni;
> diff --git a/src/event/quic/ngx_event_quic_frames.c b/src/event/quic/ngx_event_quic_frames.c
> --- a/src/event/quic/ngx_event_quic_frames.c
> +++ b/src/event/quic/ngx_event_quic_frames.c
> @@ -391,6 +391,10 @@ ngx_quic_split_frame(ngx_connection_t *c
> return NGX_ERROR;
> }
>
> + if (f->type == NGX_QUIC_FT_STREAM) {
> + f->u.stream.fin = 0;
> + }
> +
> ngx_queue_insert_after(&f->queue, &nf->queue);
>
> return NGX_OK;
> diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
> --- a/src/event/quic/ngx_event_quic_streams.c
> +++ b/src/event/quic/ngx_event_quic_streams.c
> @@ -13,6 +13,8 @@
> #define NGX_QUIC_STREAM_GONE (void *) -1
>
>
> +static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs,
> + ngx_uint_t err);
> static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c);
> static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c);
> static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
> @@ -28,11 +30,12 @@ static ssize_t ngx_quic_stream_send(ngx_
> size_t size);
> static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
> ngx_chain_t *in, off_t limit);
> -static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
> +static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs);
> static void ngx_quic_stream_cleanup_handler(void *data);
> -static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
> -static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
> -static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c);
> +static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs);
> +static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last);
> +static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last);
> +static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs);
> static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c);
> static void ngx_quic_set_event(ngx_event_t *ev);
>
> @@ -186,15 +189,20 @@ ngx_quic_close_streams(ngx_connection_t
> ns = 0;
> #endif
>
> - for (node = ngx_rbtree_min(tree->root, tree->sentinel);
> - node;
> - node = ngx_rbtree_next(tree, node))
> - {
> + node = ngx_rbtree_min(tree->root, tree->sentinel);
> +
> + while (node) {
> qs = (ngx_quic_stream_t *) node;
> + node = ngx_rbtree_next(tree, node);
>
> qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
> qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
>
> + if (qs->connection == NULL) {
> + ngx_quic_close_stream(qs);
> + continue;
> + }
> +
> ngx_quic_set_event(qs->connection->read);
> ngx_quic_set_event(qs->connection->write);
>
> @@ -213,13 +221,17 @@ ngx_quic_close_streams(ngx_connection_t
> ngx_int_t
> ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
> {
> + return ngx_quic_do_reset_stream(c->quic, err);
> +}
> +
> +
> +static ngx_int_t
> +ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err)
> +{
> ngx_connection_t *pc;
> ngx_quic_frame_t *frame;
> - ngx_quic_stream_t *qs;
> ngx_quic_connection_t *qc;
>
> - qs = c->quic;
> -
> if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
> || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
> || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
> @@ -228,10 +240,14 @@ ngx_quic_reset_stream(ngx_connection_t *
> }
>
> qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
> + qs->send_final_size = qs->send_offset;
>
> pc = qs->parent;
> qc = ngx_quic_get_connection(pc);
>
> + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> + "quic stream id:0x%xL reset", qs->id);
> +
> frame = ngx_quic_alloc_frame(pc);
> if (frame == NULL) {
> return NGX_ERROR;
> @@ -241,10 +257,13 @@ ngx_quic_reset_stream(ngx_connection_t *
> frame->type = NGX_QUIC_FT_RESET_STREAM;
> frame->u.reset_stream.id = qs->id;
> frame->u.reset_stream.error_code = err;
> - frame->u.reset_stream.final_size = c->sent;
> + frame->u.reset_stream.final_size = qs->send_offset;
>
> ngx_quic_queue_frame(qc, frame);
>
> + ngx_quic_free_chain(pc, qs->out);
> + qs->out = NULL;
> +
> return NGX_OK;
> }
>
> @@ -271,10 +290,7 @@ ngx_quic_shutdown_stream(ngx_connection_
> static ngx_int_t
> ngx_quic_shutdown_stream_send(ngx_connection_t *c)
> {
> - ngx_connection_t *pc;
> - ngx_quic_frame_t *frame;
> - ngx_quic_stream_t *qs;
> - ngx_quic_connection_t *qc;
> + ngx_quic_stream_t *qs;
>
> qs = c->quic;
>
> @@ -284,32 +300,13 @@ ngx_quic_shutdown_stream_send(ngx_connec
> return NGX_OK;
> }
>
> - qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
> -
> - pc = qs->parent;
> - qc = ngx_quic_get_connection(pc);
> + qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
> + qs->send_final_size = c->sent;
>
> - frame = ngx_quic_alloc_frame(pc);
> - if (frame == NULL) {
> - return NGX_ERROR;
> - }
> -
> - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
> "quic stream id:0x%xL send shutdown", qs->id);
>
> - frame->level = ssl_encryption_application;
> - frame->type = NGX_QUIC_FT_STREAM;
> - frame->u.stream.off = 1;
> - frame->u.stream.len = 1;
> - frame->u.stream.fin = 1;
> -
> - frame->u.stream.stream_id = qs->id;
> - frame->u.stream.offset = c->sent;
> - frame->u.stream.length = 0;
> -
> - ngx_quic_queue_frame(qc, frame);
> -
> - return NGX_OK;
> + return ngx_quic_stream_flush(qs);
> }
>
>
> @@ -341,7 +338,7 @@ ngx_quic_shutdown_stream_recv(ngx_connec
> return NGX_ERROR;
> }
>
> - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> "quic stream id:0x%xL recv shutdown", qs->id);
>
> frame->level = ssl_encryption_application;
> @@ -591,6 +588,7 @@ ngx_quic_create_stream(ngx_connection_t
> {
> ngx_log_t *log;
> ngx_pool_t *pool;
> + ngx_queue_t *q;
> ngx_connection_t *sc;
> ngx_quic_stream_t *qs;
> ngx_pool_cleanup_t *cln;
> @@ -601,25 +599,41 @@ ngx_quic_create_stream(ngx_connection_t
>
> qc = ngx_quic_get_connection(c);
>
> - pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
> - if (pool == NULL) {
> - return NULL;
> + if (!ngx_queue_empty(&qc->streams.free)) {
> + q = ngx_queue_head(&qc->streams.free);
> + qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
> + ngx_queue_remove(&qs->queue);
> +
> + } else {
> + /*
> + * the number of streams is limited by transport
> + * parameters and application requirements
> + */
> +
> + qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t));
> + if (qs == NULL) {
> + return NULL;
> + }
> }
>
> - qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
> - if (qs == NULL) {
> - ngx_destroy_pool(pool);
> - return NULL;
> - }
> + ngx_memzero(qs, sizeof(ngx_quic_stream_t));
>
> qs->node.key = id;
> qs->parent = c;
> qs->id = id;
> - qs->final_size = (uint64_t) -1;
> + qs->send_final_size = (uint64_t) -1;
> + qs->recv_final_size = (uint64_t) -1;
> +
> + pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
> + if (pool == NULL) {
> + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> + return NULL;
> + }
>
> log = ngx_palloc(pool, sizeof(ngx_log_t));
> if (log == NULL) {
> ngx_destroy_pool(pool);
> + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> return NULL;
> }
>
> @@ -629,6 +643,7 @@ ngx_quic_create_stream(ngx_connection_t
> sc = ngx_get_connection(c->fd, log);
> if (sc == NULL) {
> ngx_destroy_pool(pool);
> + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> return NULL;
> }
>
> @@ -697,6 +712,7 @@ ngx_quic_create_stream(ngx_connection_t
> if (cln == NULL) {
> ngx_close_connection(sc);
> ngx_destroy_pool(pool);
> + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
> return NULL;
> }
>
> @@ -737,7 +753,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
> return NGX_ERROR;
> }
>
> - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
> + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> "quic stream id:0x%xL recv buf:%uz", qs->id, size);
>
> if (size == 0) {
> @@ -763,7 +779,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
> rev->ready = 0;
>
> if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD
> - && qs->recv_offset == qs->final_size)
> + && qs->recv_offset == qs->recv_final_size)
> {
> qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
> }
> @@ -781,7 +797,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
> ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
> "quic stream id:0x%xL recv len:%z", qs->id, len);
>
> - if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
> + if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) {
> return NGX_ERROR;
> }
>
> @@ -822,9 +838,7 @@ ngx_quic_stream_send_chain(ngx_connectio
> off_t flow;
> size_t n;
> ngx_event_t *wev;
> - ngx_chain_t *out;
> ngx_connection_t *pc;
> - ngx_quic_frame_t *frame;
> ngx_quic_stream_t *qs;
> ngx_quic_connection_t *qc;
>
> @@ -842,7 +856,8 @@ ngx_quic_stream_send_chain(ngx_connectio
>
> qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
>
> - flow = ngx_quic_max_stream_flow(c);
> + flow = qs->acked + qc->conf->stream_buffer_size - c->sent;
> +
> if (flow == 0) {
> wev->ready = 0;
> return in;
> @@ -852,37 +867,15 @@ ngx_quic_stream_send_chain(ngx_connectio
> limit = flow;
> }
>
> - in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n);
> + in = ngx_quic_write_chain(pc, &qs->out, in, limit,
> + c->sent - qs->send_offset, &n);
> if (in == NGX_CHAIN_ERROR) {
> return NGX_CHAIN_ERROR;
> }
>
> - out = ngx_quic_read_chain(pc, &qs->out, n);
> - if (out == NGX_CHAIN_ERROR) {
> - return NGX_CHAIN_ERROR;
> - }
> -
> - frame = ngx_quic_alloc_frame(pc);
> - if (frame == NULL) {
> - return NGX_CHAIN_ERROR;
> - }
> -
> - frame->level = ssl_encryption_application;
> - frame->type = NGX_QUIC_FT_STREAM;
> - frame->data = out;
> - frame->u.stream.off = 1;
> - frame->u.stream.len = 1;
> - frame->u.stream.fin = 0;
> -
> - frame->u.stream.stream_id = qs->id;
> - frame->u.stream.offset = c->sent;
> - frame->u.stream.length = n;
> -
> c->sent += n;
> qc->streams.sent += n;
>
> - ngx_quic_queue_frame(qc, frame);
> -
> if (in) {
> wev->ready = 0;
> }
> @@ -890,61 +883,96 @@ ngx_quic_stream_send_chain(ngx_connectio
> ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> "quic send_chain sent:%uz", n);
>
> + if (ngx_quic_stream_flush(qs) != NGX_OK) {
> + return NGX_CHAIN_ERROR;
> + }
> +
> return in;
> }
>
>
> -static size_t
> -ngx_quic_max_stream_flow(ngx_connection_t *c)
> +static ngx_int_t
> +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> {
> - size_t size;
> - uint64_t sent, unacked;
> - ngx_quic_stream_t *qs;
> + off_t limit;
> + size_t len;
> + ngx_uint_t last;
> + ngx_chain_t *out, *cl;
> + ngx_quic_frame_t *frame;
> + ngx_connection_t *pc;
> ngx_quic_connection_t *qc;
>
> - qs = c->quic;
> - qc = ngx_quic_get_connection(qs->parent);
> + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
> + return NGX_OK;
> + }
>
> - size = qc->conf->stream_buffer_size;
> - sent = c->sent;
> - unacked = sent - qs->acked;
> + pc = qs->parent;
> + qc = ngx_quic_get_connection(pc);
>
> if (qc->streams.send_max_data == 0) {
> qc->streams.send_max_data = qc->ctp.initial_max_data;
> }
>
> - if (unacked >= size) {
> - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> - "quic send flow hit buffer size");
> - return 0;
> + limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset,
> + qs->send_max_data - qs->send_offset);
> +
> + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> + "quic stream id:0x%xL flush limit:%O", qs->id, limit);
> +
> + out = ngx_quic_read_chain(pc, &qs->out, limit);
> + if (out == NGX_CHAIN_ERROR) {
> + return NGX_ERROR;
> }
>
> - size -= unacked;
> + len = 0;
> + last = 0;
> +
> + for (cl = out; cl; cl = cl->next) {
> + len += cl->buf->last - cl->buf->pos;
> + }
>
> - if (qc->streams.sent >= qc->streams.send_max_data) {
> - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> - "quic send flow hit MAX_DATA");
> - return 0;
> + if (qs->send_final_size != (uint64_t) -1
> + && qs->send_final_size == qs->send_offset + len)
> + {
> + qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
> + last = 1;
> + }
> +
> + if (len == 0 && !last) {
> + return NGX_OK;
> }
>
> - if (qc->streams.sent + size > qc->streams.send_max_data) {
> - size = qc->streams.send_max_data - qc->streams.sent;
> + frame = ngx_quic_alloc_frame(pc);
> + if (frame == NULL) {
> + return NGX_ERROR;
> }
>
> - if (sent >= qs->send_max_data) {
> - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
> - "quic send flow hit MAX_STREAM_DATA");
> - return 0;
> + frame->level = ssl_encryption_application;
> + frame->type = NGX_QUIC_FT_STREAM;
> + frame->data = out;
> +
> + frame->u.stream.off = 1;
> + frame->u.stream.len = 1;
> + frame->u.stream.fin = last;
> +
> + frame->u.stream.stream_id = qs->id;
> + frame->u.stream.offset = qs->send_offset;
> + frame->u.stream.length = len;
> +
> + ngx_quic_queue_frame(qc, frame);
> +
> + qs->send_offset += len;
> + qc->streams.send_offset += len;
> +
> + ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> + "quic stream id:0x%xL flush len:%uz last:%ui",
> + qs->id, len, last);
> +
> + if (qs->connection == NULL) {
> + return ngx_quic_close_stream(qs);
> }
>
> - if (sent + size > qs->send_max_data) {
> - size = qs->send_max_data - sent;
> - }
> -
> - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> - "quic send flow:%uz", size);
> -
> - return size;
> + return NGX_OK;
> }
>
>
> @@ -953,40 +981,67 @@ ngx_quic_stream_cleanup_handler(void *da
> {
> ngx_connection_t *c = data;
>
> + ngx_quic_stream_t *qs;
> +
> + qs = c->quic;
> +
> + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
> + "quic stream id:0x%xL cleanup", qs->id);
> +
> + if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) {
> + ngx_quic_close_connection(c, NGX_ERROR);
> + return;
> + }
> +
> + qs->connection = NULL;
> +
> + if (ngx_quic_close_stream(qs) != NGX_OK) {
> + ngx_quic_close_connection(c, NGX_ERROR);
> + return;
> + }
> +}
> +
> +
> +static ngx_int_t
> +ngx_quic_close_stream(ngx_quic_stream_t *qs)
> +{
> ngx_connection_t *pc;
> ngx_quic_frame_t *frame;
> - ngx_quic_stream_t *qs;
> ngx_quic_connection_t *qc;
>
> - qs = c->quic;
> pc = qs->parent;
> qc = ngx_quic_get_connection(pc);
>
> - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> - "quic stream id:0x%xL cleanup", qs->id);
> + if (!qc->closing) {
> + /* make sure everything is sent and final size is received */
> +
> + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
> + || qs->send_state == NGX_QUIC_STREAM_SEND_READY
> + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
> + {
> + return NGX_OK;
> + }
> + }
> +
> + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> + "quic stream id:0x%xL close", qs->id);
> +
> + ngx_quic_free_chain(pc, qs->in);
> + ngx_quic_free_chain(pc, qs->out);
>
> ngx_rbtree_delete(&qc->streams.tree, &qs->node);
> - ngx_quic_free_chain(pc, qs->in);
> - ngx_quic_free_chain(pc, qs->out);
> + ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
>
> if (qc->closing) {
> /* schedule handler call to continue ngx_quic_close_connection() */
> ngx_post_event(pc->read, &ngx_posted_events);
> - return;
> + return NGX_OK;
> }
>
> - if (qc->error) {
> - goto done;
> - }
> -
> - (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN);
> -
> - (void) ngx_quic_update_flow(c, qs->recv_last);
> -
> if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
> frame = ngx_quic_alloc_frame(pc);
> if (frame == NULL) {
> - goto done;
> + return NGX_ERROR;
> }
>
> frame->level = ssl_encryption_application;
> @@ -1004,13 +1059,11 @@ ngx_quic_stream_cleanup_handler(void *da
> ngx_quic_queue_frame(qc, frame);
> }
>
> -done:
> -
> - (void) ngx_quic_output(pc);
> -
> if (qc->shutdown) {
> ngx_post_event(pc->read, &ngx_posted_events);
> }
> +
> + return NGX_OK;
> }
>
>
> @@ -1020,7 +1073,6 @@ ngx_quic_handle_stream_frame(ngx_connect
> {
> size_t size;
> uint64_t last;
> - ngx_connection_t *sc;
> ngx_quic_stream_t *qs;
> ngx_quic_connection_t *qc;
> ngx_quic_stream_frame_t *f;
> @@ -1048,19 +1100,17 @@ ngx_quic_handle_stream_frame(ngx_connect
> return NGX_OK;
> }
>
> - sc = qs->connection;
> -
> if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
> && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
> {
> return NGX_OK;
> }
>
> - if (ngx_quic_control_flow(sc, last) != NGX_OK) {
> + if (ngx_quic_control_flow(qs, last) != NGX_OK) {
> return NGX_ERROR;
> }
>
> - if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
> + if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) {
> qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
> return NGX_ERROR;
> }
> @@ -1075,7 +1125,8 @@ ngx_quic_handle_stream_frame(ngx_connect
> }
>
> if (f->fin) {
> - if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
> + if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last)
> + {
> qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
> return NGX_ERROR;
> }
> @@ -1085,7 +1136,7 @@ ngx_quic_handle_stream_frame(ngx_connect
> return NGX_ERROR;
> }
>
> - qs->final_size = last;
> + qs->recv_final_size = last;
> qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
> }
>
> @@ -1099,13 +1150,17 @@ ngx_quic_handle_stream_frame(ngx_connect
> qs->recv_size += size;
>
> if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
> - && qs->recv_size == qs->final_size)
> + && qs->recv_size == qs->recv_final_size)
> {
> qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD;
> }
>
> + if (qs->connection == NULL) {
> + return ngx_quic_close_stream(qs);
> + }
> +
> if (f->offset == qs->recv_offset) {
> - ngx_quic_set_event(sc->read);
> + ngx_quic_set_event(qs->connection->read);
> }
>
> return NGX_OK;
> @@ -1128,20 +1183,26 @@ ngx_quic_handle_max_data_frame(ngx_conne
> return NGX_OK;
> }
>
> - if (tree->root != tree->sentinel
> - && qc->streams.sent >= qc->streams.send_max_data)
> + if (tree->root == tree->sentinel
> + || qc->streams.send_offset < qc->streams.send_max_data)
> {
> -
> - for (node = ngx_rbtree_min(tree->root, tree->sentinel);
> - node;
> - node = ngx_rbtree_next(tree, node))
> - {
> - qs = (ngx_quic_stream_t *) node;
> - ngx_quic_set_event(qs->connection->write);
> - }
> + /* not blocked on MAX_DATA */
> + qc->streams.send_max_data = f->max_data;
> + return NGX_OK;
> }
>
> qc->streams.send_max_data = f->max_data;
> + node = ngx_rbtree_min(tree->root, tree->sentinel);
> +
> + while (node && qc->streams.send_offset < qc->streams.send_max_data) {
> +
> + qs = (ngx_quic_stream_t *) node;
> + node = ngx_rbtree_next(tree, node);
> +
> + if (ngx_quic_stream_flush(qs) != NGX_OK) {
> + return NGX_ERROR;
> + }
> + }
>
> return NGX_OK;
> }
> @@ -1189,7 +1250,7 @@ ngx_quic_handle_stream_data_blocked_fram
> return NGX_OK;
> }
>
> - return ngx_quic_update_max_stream_data(qs->connection);
> + return ngx_quic_update_max_stream_data(qs);
> }
>
>
> @@ -1197,7 +1258,6 @@ ngx_int_t
> ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
> ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
> {
> - uint64_t sent;
> ngx_quic_stream_t *qs;
> ngx_quic_connection_t *qc;
>
> @@ -1224,15 +1284,15 @@ ngx_quic_handle_max_stream_data_frame(ng
> return NGX_OK;
> }
>
> - sent = qs->connection->sent;
> -
> - if (sent >= qs->send_max_data) {
> - ngx_quic_set_event(qs->connection->write);
> + if (qs->send_offset < qs->send_max_data) {
> + /* not blocked on MAX_STREAM_DATA */
> + qs->send_max_data = f->limit;
> + return NGX_OK;
> }
>
> qs->send_max_data = f->limit;
>
> - return NGX_OK;
> + return ngx_quic_stream_flush(qs);
> }
>
>
> @@ -1240,7 +1300,6 @@ ngx_int_t
> ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
> ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
> {
> - ngx_connection_t *sc;
> ngx_quic_stream_t *qs;
> ngx_quic_connection_t *qc;
>
> @@ -1271,13 +1330,13 @@ ngx_quic_handle_reset_stream_frame(ngx_c
>
> qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
>
> - sc = qs->connection;
> -
> - if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
> + if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) {
> return NGX_ERROR;
> }
>
> - if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
> + if (qs->recv_final_size != (uint64_t) -1
> + && qs->recv_final_size != f->final_size)
> + {
> qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
> return NGX_ERROR;
> }
> @@ -1287,12 +1346,16 @@ ngx_quic_handle_reset_stream_frame(ngx_c
> return NGX_ERROR;
> }
>
> - qs->final_size = f->final_size;
> + qs->recv_final_size = f->final_size;
>
> - if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
> + if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) {
> return NGX_ERROR;
> }
>
> + if (qs->connection == NULL) {
> + return ngx_quic_close_stream(qs);
> + }
> +
> ngx_quic_set_event(qs->connection->read);
>
> return NGX_OK;
> @@ -1325,10 +1388,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c
> return NGX_OK;
> }
>
> - if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {
> + if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) {
> return NGX_ERROR;
> }
>
> + if (qs->connection == NULL) {
> + return ngx_quic_close_stream(qs);
> + }
> +
> ngx_quic_set_event(qs->connection->write);
>
> return NGX_OK;
> @@ -1378,30 +1445,37 @@ ngx_quic_handle_stream_ack(ngx_connectio
> return;
> }
>
> + if (qs->connection == NULL) {
> + qs->acked += f->u.stream.length;
> + return;
> + }
> +
> sent = qs->connection->sent;
> unacked = sent - qs->acked;
> + qs->acked += f->u.stream.length;
>
> - if (unacked >= qc->conf->stream_buffer_size) {
> - ngx_quic_set_event(qs->connection->write);
> + ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
> + "quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL",
> + qs->id, f->u.stream.length, qs->acked, sent - qs->acked);
> +
> + if (unacked != qc->conf->stream_buffer_size) {
> + /* not blocked on buffer size */
> + return;
> }
>
> - qs->acked += f->u.stream.length;
> -
> - ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
> - "quic stream ack len:%uL acked:%uL unacked:%uL",
> - f->u.stream.length, qs->acked, sent - qs->acked);
> + ngx_quic_set_event(qs->connection->write);
> }
>
>
> static ngx_int_t
> -ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
> +ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last)
> {
> uint64_t len;
> - ngx_quic_stream_t *qs;
> + ngx_connection_t *pc;
> ngx_quic_connection_t *qc;
>
> - qs = c->quic;
> - qc = ngx_quic_get_connection(qs->parent);
> + pc = qs->parent;
> + qc = ngx_quic_get_connection(pc);
>
> if (last <= qs->recv_last) {
> return NGX_OK;
> @@ -1409,9 +1483,9 @@ ngx_quic_control_flow(ngx_connection_t *
>
> len = last - qs->recv_last;
>
> - ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
> - "quic flow control msd:%uL/%uL md:%uL/%uL",
> - last, qs->recv_max_data, qc->streams.recv_last + len,
> + ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> + "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL",
> + qs->id, last, qs->recv_max_data, qc->streams.recv_last + len,
> qc->streams.recv_max_data);
>
> qs->recv_last += len;
> @@ -1435,14 +1509,12 @@ ngx_quic_control_flow(ngx_connection_t *
>
>
> static ngx_int_t
> -ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
> +ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last)
> {
> uint64_t len;
> ngx_connection_t *pc;
> - ngx_quic_stream_t *qs;
> ngx_quic_connection_t *qc;
>
> - qs = c->quic;
> pc = qs->parent;
> qc = ngx_quic_get_connection(pc);
>
> @@ -1452,13 +1524,13 @@ ngx_quic_update_flow(ngx_connection_t *c
>
> len = last - qs->recv_offset;
>
> - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> - "quic flow update %uL", last);
> + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> + "quic stream id:0x%xL flow update %uL", qs->id, last);
>
> qs->recv_offset += len;
>
> if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
> - if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
> + if (ngx_quic_update_max_stream_data(qs) != NGX_OK) {
> return NGX_ERROR;
> }
> }
> @@ -1478,15 +1550,13 @@ ngx_quic_update_flow(ngx_connection_t *c
>
>
> static ngx_int_t
> -ngx_quic_update_max_stream_data(ngx_connection_t *c)
> +ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs)
> {
> uint64_t recv_max_data;
> ngx_connection_t *pc;
> ngx_quic_frame_t *frame;
> - ngx_quic_stream_t *qs;
> ngx_quic_connection_t *qc;
>
> - qs = c->quic;
> pc = qs->parent;
> qc = ngx_quic_get_connection(pc);
>
> @@ -1502,8 +1572,9 @@ ngx_quic_update_max_stream_data(ngx_conn
>
> qs->recv_max_data = recv_max_data;
>
> - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> - "quic flow update msd:%uL", qs->recv_max_data);
> + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
> + "quic stream id:0x%xL flow update msd:%uL",
> + qs->id, qs->recv_max_data);
>
> frame = ngx_quic_alloc_frame(pc);
> if (frame == NULL) {
> diff --git a/src/http/v3/ngx_http_v3_uni.c b/src/http/v3/ngx_http_v3_uni.c
> --- a/src/http/v3/ngx_http_v3_uni.c
> +++ b/src/http/v3/ngx_http_v3_uni.c
> @@ -295,8 +295,6 @@ ngx_http_v3_uni_dummy_write_handler(ngx_
> }
>
>
> -/* XXX async & buffered stream writes */
> -
> ngx_connection_t *
> ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id)
> {
> _______________________________________________
> nginx-devel mailing list -- nginx-devel at nginx.org
> To unsubscribe send an email to nginx-devel-leave at nginx.org
More information about the nginx-devel
mailing list