[PATCH] QUIC: stream lingering
Roman Arutyunyan
arut at nginx.com
Mon Feb 7 14:16:17 UTC 2022
Hi,
On Fri, Feb 04, 2022 at 04:56:23PM +0300, Vladimir Homutov wrote:
> On Tue, Feb 01, 2022 at 04:39:59PM +0300, Roman Arutyunyan wrote:
> > # HG changeset patch
> > # User Roman Arutyunyan <arut at nginx.com>
> > # Date 1643722727 -10800
> > # Tue Feb 01 16:38:47 2022 +0300
> > # Branch quic
> > # Node ID db31ae16c1f2050be9c9f6b1f117ab6725b97dd4
> > # Parent 308ac307b3e6952ef0c5ccf10cc82904c59fa4c3
> > QUIC: stream lingering.
> >
> > Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
> > can persist after connection is closed by application. During this period,
> > server is expecting stream final size from client for correct flow control.
> > Also, buffered output is sent to client as more flow control credit is granted.
> >
> [..]
>
> > +static ngx_int_t
> > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > +{
> > + size_t limit, len;
> > + ngx_uint_t last;
> > + ngx_chain_t *out, *cl;
> > + ngx_quic_frame_t *frame;
> > + ngx_connection_t *pc;
> > + ngx_quic_connection_t *qc;
> > +
> > + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
> > + return NGX_OK;
> > + }
> > +
> > + pc = qs->parent;
> > + qc = ngx_quic_get_connection(pc);
> > +
> > + limit = ngx_quic_max_stream_flow(qs);
> > + last = 0;
> > +
> > + out = ngx_quic_read_chain(pc, &qs->out, limit);
> > + if (out == NGX_CHAIN_ERROR) {
> > + return NGX_ERROR;
> > + }
> > +
> > + len = 0;
> > + last = 0;
>
> this assignment looks duplicate.
Thanks, fixed.
> [..]
>
> > +static ngx_int_t
> > +ngx_quic_close_stream(ngx_quic_stream_t *qs)
> > +{
> > ngx_connection_t *pc;
> > ngx_quic_frame_t *frame;
> > - ngx_quic_stream_t *qs;
> > ngx_quic_connection_t *qc;
> >
> > - qs = c->quic;
> > pc = qs->parent;
> > qc = ngx_quic_get_connection(pc);
> >
> > - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
> > - "quic stream id:0x%xL cleanup", qs->id);
> > + if (!qc->closing) {
> > + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
> > + || qs->send_state == NGX_QUIC_STREAM_SEND_READY
> > + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
> > + {
>
> so basically this are the states where we need to wait for FIN?
> and thus avoid closing till we get it.
> I would add a comment here.
On the receiving end we wait either for fin or for reset to have final size.
On the sending end we wait for everything that's buffered to be sent.
Added a comment about that.
> [..]
> > + if (qs->connection == NULL) {
> > + return ngx_quic_close_stream(qs);
> > + }
> > +
> > ngx_quic_set_event(qs->connection->write);
>
> this pattern - check connection, close if NULL and set event seem to
> repeat. Maybe it's worth to try to put this check/action into
> ngx_quic_set_event somehow ? we could instead have
> set_read_event/set_write_event maybe.
I thought about this too, but it's not always that simple. And even if it was,
the new function/macro would have unclear semantics. Let's just remember this
as a possible future optimiation.
> > +static ngx_int_t
> > +ngx_quic_stream_flush(ngx_quic_stream_t *qs)
> > +
> [..]
> > + if (len == 0 && !last) {
> > + return NGX_OK;
> > + }
> > +
> > + frame = ngx_quic_alloc_frame(pc);
> > + if (frame == NULL) {
> > + return NGX_ERROR;
> > + }
> > +
> > + frame = ngx_quic_alloc_frame(pc);
> > + if (frame == NULL) {
> > + return NGX_ERROR;
> > + }
>
> one more dup here.
Yes, thanks.
> Overal, it looks good, but the testing revealed another issue: with big
> buffer sizes we run into issue of too long chains in ngx_quic_write_chain().
> As discussed, this certainly needs optimization - probably adding some
> pointer to the end to facilitate appending, or something else.
It's true ngx_quic_write_chain() needs to be optimized. When the buffered
chain is big, it takes too much time to find the write point. I'll address
this is a separate patch. Meanwhile, attached is an updated version of the
current one.
In the new version of the patch I also eliminated the
ngx_quic_max_stream_flow() function and embedded its content in
ngx_quic_stream_flush().
--
Roman Arutyunyan
-------------- next part --------------
# HG changeset patch
# User Roman Arutyunyan <arut at nginx.com>
# Date 1644054894 -10800
# Sat Feb 05 12:54:54 2022 +0300
# Branch quic
# Node ID 6e1674c257709341a7508ae4bdab6f7f7d2e9284
# Parent 6c1dfd072859022f830aeea49db7cbe3c9f7fb55
QUIC: stream lingering.
Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
can persist after connection is closed by application. During this period,
server is expecting stream final size from client for correct flow control.
Also, buffered output is sent to client as more flow control credit is granted.
diff --git a/src/event/quic/ngx_event_quic.c b/src/event/quic/ngx_event_quic.c
--- a/src/event/quic/ngx_event_quic.c
+++ b/src/event/quic/ngx_event_quic.c
@@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t
ctp->active_connection_id_limit = 2;
ngx_queue_init(&qc->streams.uninitialized);
+ ngx_queue_init(&qc->streams.free);
qc->streams.recv_max_data = qc->tp.initial_max_data;
qc->streams.recv_window = qc->streams.recv_max_data;
diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h
--- a/src/event/quic/ngx_event_quic.h
+++ b/src/event/quic/ngx_event_quic.h
@@ -78,12 +78,14 @@ struct ngx_quic_stream_s {
uint64_t id;
uint64_t acked;
uint64_t send_max_data;
+ uint64_t send_offset;
+ uint64_t send_final_size;
uint64_t recv_max_data;
uint64_t recv_offset;
uint64_t recv_window;
uint64_t recv_last;
uint64_t recv_size;
- uint64_t final_size;
+ uint64_t recv_final_size;
ngx_chain_t *in;
ngx_chain_t *out;
ngx_uint_t cancelable; /* unsigned cancelable:1; */
diff --git a/src/event/quic/ngx_event_quic_connection.h b/src/event/quic/ngx_event_quic_connection.h
--- a/src/event/quic/ngx_event_quic_connection.h
+++ b/src/event/quic/ngx_event_quic_connection.h
@@ -114,13 +114,16 @@ struct ngx_quic_socket_s {
typedef struct {
ngx_rbtree_t tree;
ngx_rbtree_node_t sentinel;
+
ngx_queue_t uninitialized;
+ ngx_queue_t free;
uint64_t sent;
uint64_t recv_offset;
uint64_t recv_window;
uint64_t recv_last;
uint64_t recv_max_data;
+ uint64_t send_offset;
uint64_t send_max_data;
uint64_t server_max_streams_uni;
diff --git a/src/event/quic/ngx_event_quic_frames.c b/src/event/quic/ngx_event_quic_frames.c
--- a/src/event/quic/ngx_event_quic_frames.c
+++ b/src/event/quic/ngx_event_quic_frames.c
@@ -391,6 +391,10 @@ ngx_quic_split_frame(ngx_connection_t *c
return NGX_ERROR;
}
+ if (f->type == NGX_QUIC_FT_STREAM) {
+ f->u.stream.fin = 0;
+ }
+
ngx_queue_insert_after(&f->queue, &nf->queue);
return NGX_OK;
diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
--- a/src/event/quic/ngx_event_quic_streams.c
+++ b/src/event/quic/ngx_event_quic_streams.c
@@ -13,6 +13,8 @@
#define NGX_QUIC_STREAM_GONE (void *) -1
+static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs,
+ ngx_uint_t err);
static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c);
static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c);
static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
@@ -28,11 +30,12 @@ static ssize_t ngx_quic_stream_send(ngx_
size_t size);
static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
ngx_chain_t *in, off_t limit);
-static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
+static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs);
static void ngx_quic_stream_cleanup_handler(void *data);
-static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
-static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
-static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c);
+static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs);
+static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last);
+static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last);
+static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs);
static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c);
static void ngx_quic_set_event(ngx_event_t *ev);
@@ -186,15 +189,20 @@ ngx_quic_close_streams(ngx_connection_t
ns = 0;
#endif
- for (node = ngx_rbtree_min(tree->root, tree->sentinel);
- node;
- node = ngx_rbtree_next(tree, node))
- {
+ node = ngx_rbtree_min(tree->root, tree->sentinel);
+
+ while (node) {
qs = (ngx_quic_stream_t *) node;
+ node = ngx_rbtree_next(tree, node);
qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
+ if (qs->connection == NULL) {
+ ngx_quic_close_stream(qs);
+ continue;
+ }
+
ngx_quic_set_event(qs->connection->read);
ngx_quic_set_event(qs->connection->write);
@@ -213,13 +221,17 @@ ngx_quic_close_streams(ngx_connection_t
ngx_int_t
ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
{
+ return ngx_quic_do_reset_stream(c->quic, err);
+}
+
+
+static ngx_int_t
+ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err)
+{
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
- ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
- qs = c->quic;
-
if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
|| qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
|| qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
@@ -228,10 +240,14 @@ ngx_quic_reset_stream(ngx_connection_t *
}
qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
+ qs->send_final_size = qs->send_offset;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+ "quic stream id:0x%xL reset", qs->id);
+
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
return NGX_ERROR;
@@ -241,10 +257,13 @@ ngx_quic_reset_stream(ngx_connection_t *
frame->type = NGX_QUIC_FT_RESET_STREAM;
frame->u.reset_stream.id = qs->id;
frame->u.reset_stream.error_code = err;
- frame->u.reset_stream.final_size = c->sent;
+ frame->u.reset_stream.final_size = qs->send_offset;
ngx_quic_queue_frame(qc, frame);
+ ngx_quic_free_chain(pc, qs->out);
+ qs->out = NULL;
+
return NGX_OK;
}
@@ -271,10 +290,7 @@ ngx_quic_shutdown_stream(ngx_connection_
static ngx_int_t
ngx_quic_shutdown_stream_send(ngx_connection_t *c)
{
- ngx_connection_t *pc;
- ngx_quic_frame_t *frame;
- ngx_quic_stream_t *qs;
- ngx_quic_connection_t *qc;
+ ngx_quic_stream_t *qs;
qs = c->quic;
@@ -284,32 +300,13 @@ ngx_quic_shutdown_stream_send(ngx_connec
return NGX_OK;
}
- qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
-
- pc = qs->parent;
- qc = ngx_quic_get_connection(pc);
+ qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
+ qs->send_final_size = c->sent;
- frame = ngx_quic_alloc_frame(pc);
- if (frame == NULL) {
- return NGX_ERROR;
- }
-
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
"quic stream id:0x%xL send shutdown", qs->id);
- frame->level = ssl_encryption_application;
- frame->type = NGX_QUIC_FT_STREAM;
- frame->u.stream.off = 1;
- frame->u.stream.len = 1;
- frame->u.stream.fin = 1;
-
- frame->u.stream.stream_id = qs->id;
- frame->u.stream.offset = c->sent;
- frame->u.stream.length = 0;
-
- ngx_quic_queue_frame(qc, frame);
-
- return NGX_OK;
+ return ngx_quic_stream_flush(qs);
}
@@ -341,7 +338,7 @@ ngx_quic_shutdown_stream_recv(ngx_connec
return NGX_ERROR;
}
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL recv shutdown", qs->id);
frame->level = ssl_encryption_application;
@@ -591,6 +588,7 @@ ngx_quic_create_stream(ngx_connection_t
{
ngx_log_t *log;
ngx_pool_t *pool;
+ ngx_queue_t *q;
ngx_connection_t *sc;
ngx_quic_stream_t *qs;
ngx_pool_cleanup_t *cln;
@@ -601,25 +599,41 @@ ngx_quic_create_stream(ngx_connection_t
qc = ngx_quic_get_connection(c);
- pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
- if (pool == NULL) {
- return NULL;
+ if (!ngx_queue_empty(&qc->streams.free)) {
+ q = ngx_queue_head(&qc->streams.free);
+ qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
+ ngx_queue_remove(&qs->queue);
+
+ } else {
+ /*
+ * the number of streams is limited by transport
+ * parameters and application requirements
+ */
+
+ qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t));
+ if (qs == NULL) {
+ return NULL;
+ }
}
- qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
- if (qs == NULL) {
- ngx_destroy_pool(pool);
- return NULL;
- }
+ ngx_memzero(qs, sizeof(ngx_quic_stream_t));
qs->node.key = id;
qs->parent = c;
qs->id = id;
- qs->final_size = (uint64_t) -1;
+ qs->send_final_size = (uint64_t) -1;
+ qs->recv_final_size = (uint64_t) -1;
+
+ pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
+ if (pool == NULL) {
+ ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
+ return NULL;
+ }
log = ngx_palloc(pool, sizeof(ngx_log_t));
if (log == NULL) {
ngx_destroy_pool(pool);
+ ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
return NULL;
}
@@ -629,6 +643,7 @@ ngx_quic_create_stream(ngx_connection_t
sc = ngx_get_connection(c->fd, log);
if (sc == NULL) {
ngx_destroy_pool(pool);
+ ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
return NULL;
}
@@ -697,6 +712,7 @@ ngx_quic_create_stream(ngx_connection_t
if (cln == NULL) {
ngx_close_connection(sc);
ngx_destroy_pool(pool);
+ ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
return NULL;
}
@@ -737,7 +753,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
return NGX_ERROR;
}
- ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL recv buf:%uz", qs->id, size);
if (size == 0) {
@@ -763,7 +779,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
rev->ready = 0;
if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD
- && qs->recv_offset == qs->final_size)
+ && qs->recv_offset == qs->recv_final_size)
{
qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
}
@@ -781,7 +797,7 @@ ngx_quic_stream_recv(ngx_connection_t *c
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL recv len:%z", qs->id, len);
- if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
+ if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) {
return NGX_ERROR;
}
@@ -822,9 +838,7 @@ ngx_quic_stream_send_chain(ngx_connectio
off_t flow;
size_t n;
ngx_event_t *wev;
- ngx_chain_t *out;
ngx_connection_t *pc;
- ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
@@ -842,7 +856,8 @@ ngx_quic_stream_send_chain(ngx_connectio
qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
- flow = ngx_quic_max_stream_flow(c);
+ flow = qs->acked + qc->conf->stream_buffer_size - c->sent;
+
if (flow == 0) {
wev->ready = 0;
return in;
@@ -852,37 +867,15 @@ ngx_quic_stream_send_chain(ngx_connectio
limit = flow;
}
- in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n);
+ in = ngx_quic_write_chain(pc, &qs->out, in, limit,
+ c->sent - qs->send_offset, &n);
if (in == NGX_CHAIN_ERROR) {
return NGX_CHAIN_ERROR;
}
- out = ngx_quic_read_chain(pc, &qs->out, n);
- if (out == NGX_CHAIN_ERROR) {
- return NGX_CHAIN_ERROR;
- }
-
- frame = ngx_quic_alloc_frame(pc);
- if (frame == NULL) {
- return NGX_CHAIN_ERROR;
- }
-
- frame->level = ssl_encryption_application;
- frame->type = NGX_QUIC_FT_STREAM;
- frame->data = out;
- frame->u.stream.off = 1;
- frame->u.stream.len = 1;
- frame->u.stream.fin = 0;
-
- frame->u.stream.stream_id = qs->id;
- frame->u.stream.offset = c->sent;
- frame->u.stream.length = n;
-
c->sent += n;
qc->streams.sent += n;
- ngx_quic_queue_frame(qc, frame);
-
if (in) {
wev->ready = 0;
}
@@ -890,61 +883,96 @@ ngx_quic_stream_send_chain(ngx_connectio
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send_chain sent:%uz", n);
+ if (ngx_quic_stream_flush(qs) != NGX_OK) {
+ return NGX_CHAIN_ERROR;
+ }
+
return in;
}
-static size_t
-ngx_quic_max_stream_flow(ngx_connection_t *c)
+static ngx_int_t
+ngx_quic_stream_flush(ngx_quic_stream_t *qs)
{
- size_t size;
- uint64_t sent, unacked;
- ngx_quic_stream_t *qs;
+ off_t limit;
+ size_t len;
+ ngx_uint_t last;
+ ngx_chain_t *out, *cl;
+ ngx_quic_frame_t *frame;
+ ngx_connection_t *pc;
ngx_quic_connection_t *qc;
- qs = c->quic;
- qc = ngx_quic_get_connection(qs->parent);
+ if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
+ return NGX_OK;
+ }
- size = qc->conf->stream_buffer_size;
- sent = c->sent;
- unacked = sent - qs->acked;
+ pc = qs->parent;
+ qc = ngx_quic_get_connection(pc);
if (qc->streams.send_max_data == 0) {
qc->streams.send_max_data = qc->ctp.initial_max_data;
}
- if (unacked >= size) {
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic send flow hit buffer size");
- return 0;
+ limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset,
+ qs->send_max_data - qs->send_offset);
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+ "quic stream id:0x%xL flush limit:%O", qs->id, limit);
+
+ out = ngx_quic_read_chain(pc, &qs->out, limit);
+ if (out == NGX_CHAIN_ERROR) {
+ return NGX_ERROR;
}
- size -= unacked;
+ len = 0;
+ last = 0;
+
+ for (cl = out; cl; cl = cl->next) {
+ len += cl->buf->last - cl->buf->pos;
+ }
- if (qc->streams.sent >= qc->streams.send_max_data) {
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic send flow hit MAX_DATA");
- return 0;
+ if (qs->send_final_size != (uint64_t) -1
+ && qs->send_final_size == qs->send_offset + len)
+ {
+ qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
+ last = 1;
+ }
+
+ if (len == 0 && !last) {
+ return NGX_OK;
}
- if (qc->streams.sent + size > qc->streams.send_max_data) {
- size = qc->streams.send_max_data - qc->streams.sent;
+ frame = ngx_quic_alloc_frame(pc);
+ if (frame == NULL) {
+ return NGX_ERROR;
}
- if (sent >= qs->send_max_data) {
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic send flow hit MAX_STREAM_DATA");
- return 0;
+ frame->level = ssl_encryption_application;
+ frame->type = NGX_QUIC_FT_STREAM;
+ frame->data = out;
+
+ frame->u.stream.off = 1;
+ frame->u.stream.len = 1;
+ frame->u.stream.fin = last;
+
+ frame->u.stream.stream_id = qs->id;
+ frame->u.stream.offset = qs->send_offset;
+ frame->u.stream.length = len;
+
+ ngx_quic_queue_frame(qc, frame);
+
+ qs->send_offset += len;
+ qc->streams.send_offset += len;
+
+ ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+ "quic stream id:0x%xL flush len:%uz last:%ui",
+ qs->id, len, last);
+
+ if (qs->connection == NULL) {
+ return ngx_quic_close_stream(qs);
}
- if (sent + size > qs->send_max_data) {
- size = qs->send_max_data - sent;
- }
-
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic send flow:%uz", size);
-
- return size;
+ return NGX_OK;
}
@@ -953,40 +981,67 @@ ngx_quic_stream_cleanup_handler(void *da
{
ngx_connection_t *c = data;
+ ngx_quic_stream_t *qs;
+
+ qs = c->quic;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
+ "quic stream id:0x%xL cleanup", qs->id);
+
+ if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) {
+ ngx_quic_close_connection(c, NGX_ERROR);
+ return;
+ }
+
+ qs->connection = NULL;
+
+ if (ngx_quic_close_stream(qs) != NGX_OK) {
+ ngx_quic_close_connection(c, NGX_ERROR);
+ return;
+ }
+}
+
+
+static ngx_int_t
+ngx_quic_close_stream(ngx_quic_stream_t *qs)
+{
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
- ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
- qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic stream id:0x%xL cleanup", qs->id);
+ if (!qc->closing) {
+ /* make sure everything is sent and final size is received */
+
+ if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
+ || qs->send_state == NGX_QUIC_STREAM_SEND_READY
+ || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
+ {
+ return NGX_OK;
+ }
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+ "quic stream id:0x%xL close", qs->id);
+
+ ngx_quic_free_chain(pc, qs->in);
+ ngx_quic_free_chain(pc, qs->out);
ngx_rbtree_delete(&qc->streams.tree, &qs->node);
- ngx_quic_free_chain(pc, qs->in);
- ngx_quic_free_chain(pc, qs->out);
+ ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
if (qc->closing) {
/* schedule handler call to continue ngx_quic_close_connection() */
ngx_post_event(pc->read, &ngx_posted_events);
- return;
+ return NGX_OK;
}
- if (qc->error) {
- goto done;
- }
-
- (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN);
-
- (void) ngx_quic_update_flow(c, qs->recv_last);
-
if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
- goto done;
+ return NGX_ERROR;
}
frame->level = ssl_encryption_application;
@@ -1004,13 +1059,11 @@ ngx_quic_stream_cleanup_handler(void *da
ngx_quic_queue_frame(qc, frame);
}
-done:
-
- (void) ngx_quic_output(pc);
-
if (qc->shutdown) {
ngx_post_event(pc->read, &ngx_posted_events);
}
+
+ return NGX_OK;
}
@@ -1020,7 +1073,6 @@ ngx_quic_handle_stream_frame(ngx_connect
{
size_t size;
uint64_t last;
- ngx_connection_t *sc;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
ngx_quic_stream_frame_t *f;
@@ -1048,19 +1100,17 @@ ngx_quic_handle_stream_frame(ngx_connect
return NGX_OK;
}
- sc = qs->connection;
-
if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
&& qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
{
return NGX_OK;
}
- if (ngx_quic_control_flow(sc, last) != NGX_OK) {
+ if (ngx_quic_control_flow(qs, last) != NGX_OK) {
return NGX_ERROR;
}
- if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
+ if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) {
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
return NGX_ERROR;
}
@@ -1075,7 +1125,8 @@ ngx_quic_handle_stream_frame(ngx_connect
}
if (f->fin) {
- if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
+ if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last)
+ {
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
return NGX_ERROR;
}
@@ -1085,7 +1136,7 @@ ngx_quic_handle_stream_frame(ngx_connect
return NGX_ERROR;
}
- qs->final_size = last;
+ qs->recv_final_size = last;
qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
}
@@ -1099,13 +1150,17 @@ ngx_quic_handle_stream_frame(ngx_connect
qs->recv_size += size;
if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
- && qs->recv_size == qs->final_size)
+ && qs->recv_size == qs->recv_final_size)
{
qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD;
}
+ if (qs->connection == NULL) {
+ return ngx_quic_close_stream(qs);
+ }
+
if (f->offset == qs->recv_offset) {
- ngx_quic_set_event(sc->read);
+ ngx_quic_set_event(qs->connection->read);
}
return NGX_OK;
@@ -1128,20 +1183,26 @@ ngx_quic_handle_max_data_frame(ngx_conne
return NGX_OK;
}
- if (tree->root != tree->sentinel
- && qc->streams.sent >= qc->streams.send_max_data)
+ if (tree->root == tree->sentinel
+ || qc->streams.send_offset < qc->streams.send_max_data)
{
-
- for (node = ngx_rbtree_min(tree->root, tree->sentinel);
- node;
- node = ngx_rbtree_next(tree, node))
- {
- qs = (ngx_quic_stream_t *) node;
- ngx_quic_set_event(qs->connection->write);
- }
+ /* not blocked on MAX_DATA */
+ qc->streams.send_max_data = f->max_data;
+ return NGX_OK;
}
qc->streams.send_max_data = f->max_data;
+ node = ngx_rbtree_min(tree->root, tree->sentinel);
+
+ while (node && qc->streams.send_offset < qc->streams.send_max_data) {
+
+ qs = (ngx_quic_stream_t *) node;
+ node = ngx_rbtree_next(tree, node);
+
+ if (ngx_quic_stream_flush(qs) != NGX_OK) {
+ return NGX_ERROR;
+ }
+ }
return NGX_OK;
}
@@ -1189,7 +1250,7 @@ ngx_quic_handle_stream_data_blocked_fram
return NGX_OK;
}
- return ngx_quic_update_max_stream_data(qs->connection);
+ return ngx_quic_update_max_stream_data(qs);
}
@@ -1197,7 +1258,6 @@ ngx_int_t
ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
{
- uint64_t sent;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
@@ -1224,15 +1284,15 @@ ngx_quic_handle_max_stream_data_frame(ng
return NGX_OK;
}
- sent = qs->connection->sent;
-
- if (sent >= qs->send_max_data) {
- ngx_quic_set_event(qs->connection->write);
+ if (qs->send_offset < qs->send_max_data) {
+ /* not blocked on MAX_STREAM_DATA */
+ qs->send_max_data = f->limit;
+ return NGX_OK;
}
qs->send_max_data = f->limit;
- return NGX_OK;
+ return ngx_quic_stream_flush(qs);
}
@@ -1240,7 +1300,6 @@ ngx_int_t
ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
{
- ngx_connection_t *sc;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
@@ -1271,13 +1330,13 @@ ngx_quic_handle_reset_stream_frame(ngx_c
qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
- sc = qs->connection;
-
- if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
+ if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) {
return NGX_ERROR;
}
- if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
+ if (qs->recv_final_size != (uint64_t) -1
+ && qs->recv_final_size != f->final_size)
+ {
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
return NGX_ERROR;
}
@@ -1287,12 +1346,16 @@ ngx_quic_handle_reset_stream_frame(ngx_c
return NGX_ERROR;
}
- qs->final_size = f->final_size;
+ qs->recv_final_size = f->final_size;
- if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
+ if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) {
return NGX_ERROR;
}
+ if (qs->connection == NULL) {
+ return ngx_quic_close_stream(qs);
+ }
+
ngx_quic_set_event(qs->connection->read);
return NGX_OK;
@@ -1325,10 +1388,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c
return NGX_OK;
}
- if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {
+ if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) {
return NGX_ERROR;
}
+ if (qs->connection == NULL) {
+ return ngx_quic_close_stream(qs);
+ }
+
ngx_quic_set_event(qs->connection->write);
return NGX_OK;
@@ -1378,30 +1445,37 @@ ngx_quic_handle_stream_ack(ngx_connectio
return;
}
+ if (qs->connection == NULL) {
+ qs->acked += f->u.stream.length;
+ return;
+ }
+
sent = qs->connection->sent;
unacked = sent - qs->acked;
+ qs->acked += f->u.stream.length;
- if (unacked >= qc->conf->stream_buffer_size) {
- ngx_quic_set_event(qs->connection->write);
+ ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL",
+ qs->id, f->u.stream.length, qs->acked, sent - qs->acked);
+
+ if (unacked != qc->conf->stream_buffer_size) {
+ /* not blocked on buffer size */
+ return;
}
- qs->acked += f->u.stream.length;
-
- ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
- "quic stream ack len:%uL acked:%uL unacked:%uL",
- f->u.stream.length, qs->acked, sent - qs->acked);
+ ngx_quic_set_event(qs->connection->write);
}
static ngx_int_t
-ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
+ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last)
{
uint64_t len;
- ngx_quic_stream_t *qs;
+ ngx_connection_t *pc;
ngx_quic_connection_t *qc;
- qs = c->quic;
- qc = ngx_quic_get_connection(qs->parent);
+ pc = qs->parent;
+ qc = ngx_quic_get_connection(pc);
if (last <= qs->recv_last) {
return NGX_OK;
@@ -1409,9 +1483,9 @@ ngx_quic_control_flow(ngx_connection_t *
len = last - qs->recv_last;
- ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic flow control msd:%uL/%uL md:%uL/%uL",
- last, qs->recv_max_data, qc->streams.recv_last + len,
+ ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+ "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL",
+ qs->id, last, qs->recv_max_data, qc->streams.recv_last + len,
qc->streams.recv_max_data);
qs->recv_last += len;
@@ -1435,14 +1509,12 @@ ngx_quic_control_flow(ngx_connection_t *
static ngx_int_t
-ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
+ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last)
{
uint64_t len;
ngx_connection_t *pc;
- ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
- qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@@ -1452,13 +1524,13 @@ ngx_quic_update_flow(ngx_connection_t *c
len = last - qs->recv_offset;
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic flow update %uL", last);
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+ "quic stream id:0x%xL flow update %uL", qs->id, last);
qs->recv_offset += len;
if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
- if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
+ if (ngx_quic_update_max_stream_data(qs) != NGX_OK) {
return NGX_ERROR;
}
}
@@ -1478,15 +1550,13 @@ ngx_quic_update_flow(ngx_connection_t *c
static ngx_int_t
-ngx_quic_update_max_stream_data(ngx_connection_t *c)
+ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs)
{
uint64_t recv_max_data;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
- ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
- qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@@ -1502,8 +1572,9 @@ ngx_quic_update_max_stream_data(ngx_conn
qs->recv_max_data = recv_max_data;
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic flow update msd:%uL", qs->recv_max_data);
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+ "quic stream id:0x%xL flow update msd:%uL",
+ qs->id, qs->recv_max_data);
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
diff --git a/src/http/v3/ngx_http_v3_uni.c b/src/http/v3/ngx_http_v3_uni.c
--- a/src/http/v3/ngx_http_v3_uni.c
+++ b/src/http/v3/ngx_http_v3_uni.c
@@ -295,8 +295,6 @@ ngx_http_v3_uni_dummy_write_handler(ngx_
}
-/* XXX async & buffered stream writes */
-
ngx_connection_t *
ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id)
{
More information about the nginx-devel
mailing list