[PATCH 1 of 3] QUIC: introduced explicit stream states
Roman Arutyunyan
arut at nginx.com
Mon Jan 31 07:34:06 UTC 2022
# HG changeset patch
# User Roman Arutyunyan <arut at nginx.com>
# Date 1643611562 -10800
# Mon Jan 31 09:46:02 2022 +0300
# Branch quic
# Node ID 8dcb9908989401d750b14fe5dccf444a5485c23d
# Parent 81a3429db8b00ec9fc476d3687d1cd18088f3365
QUIC: introduced explicit stream states.
This allows to eliminate the usage of stream connection event flags for tracking
stream state.
diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h
--- a/src/event/quic/ngx_event_quic.h
+++ b/src/event/quic/ngx_event_quic.h
@@ -28,6 +28,26 @@
#define NGX_QUIC_STREAM_UNIDIRECTIONAL 0x02
+typedef enum {
+ NGX_QUIC_STREAM_SEND_READY = 0,
+ NGX_QUIC_STREAM_SEND_SEND,
+ NGX_QUIC_STREAM_SEND_DATA_SENT,
+ NGX_QUIC_STREAM_SEND_DATA_RECVD,
+ NGX_QUIC_STREAM_SEND_RESET_SENT,
+ NGX_QUIC_STREAM_SEND_RESET_RECVD
+} ngx_quic_stream_send_state_e;
+
+
+typedef enum {
+ NGX_QUIC_STREAM_RECV_RECV = 0,
+ NGX_QUIC_STREAM_RECV_SIZE_KNOWN,
+ NGX_QUIC_STREAM_RECV_DATA_RECVD,
+ NGX_QUIC_STREAM_RECV_DATA_READ,
+ NGX_QUIC_STREAM_RECV_RESET_RECVD,
+ NGX_QUIC_STREAM_RECV_RESET_READ
+} ngx_quic_stream_recv_state_e;
+
+
typedef struct {
ngx_ssl_t *ssl;
@@ -66,6 +86,8 @@ struct ngx_quic_stream_s {
ngx_chain_t *in;
ngx_chain_t *out;
ngx_uint_t cancelable; /* unsigned cancelable:1; */
+ ngx_quic_stream_send_state_e send_state;
+ ngx_quic_stream_recv_state_e recv_state;
};
diff --git a/src/event/quic/ngx_event_quic_ack.c b/src/event/quic/ngx_event_quic_ack.c
--- a/src/event/quic/ngx_event_quic_ack.c
+++ b/src/event/quic/ngx_event_quic_ack.c
@@ -617,10 +617,13 @@ ngx_quic_resend_frames(ngx_connection_t
case NGX_QUIC_FT_STREAM:
qs = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
- if (qs && qs->connection->write->error) {
- /* RESET_STREAM was sent */
- ngx_quic_free_frame(c, f);
- break;
+ if (qs) {
+ if (qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
+ || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
+ {
+ ngx_quic_free_frame(c, f);
+ break;
+ }
}
/* fall through */
diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
--- a/src/event/quic/ngx_event_quic_streams.c
+++ b/src/event/quic/ngx_event_quic_streams.c
@@ -192,12 +192,13 @@ ngx_quic_close_streams(ngx_connection_t
{
qs = (ngx_quic_stream_t *) node;
+ qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
+ qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
+
rev = qs->connection->read;
- rev->error = 1;
rev->ready = 1;
wev = qs->connection->write;
- wev->error = 1;
wev->ready = 1;
ngx_post_event(rev, &ngx_posted_events);
@@ -221,19 +222,22 @@ ngx_quic_close_streams(ngx_connection_t
ngx_int_t
ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
{
- ngx_event_t *wev;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
- wev = c->write;
+ qs = c->quic;
- if (wev->error) {
+ if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
+ || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
+ || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
+ {
return NGX_OK;
}
- qs = c->quic;
+ qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
+
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@@ -250,9 +254,6 @@ ngx_quic_reset_stream(ngx_connection_t *
ngx_quic_queue_frame(qc, frame);
- wev->error = 1;
- wev->ready = 1;
-
return NGX_OK;
}
@@ -260,27 +261,15 @@ ngx_quic_reset_stream(ngx_connection_t *
ngx_int_t
ngx_quic_shutdown_stream(ngx_connection_t *c, int how)
{
- ngx_quic_stream_t *qs;
-
- qs = c->quic;
-
if (how == NGX_RDWR_SHUTDOWN || how == NGX_WRITE_SHUTDOWN) {
- if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED)
- || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
- {
- if (ngx_quic_shutdown_stream_send(c) != NGX_OK) {
- return NGX_ERROR;
- }
+ if (ngx_quic_shutdown_stream_send(c) != NGX_OK) {
+ return NGX_ERROR;
}
}
if (how == NGX_RDWR_SHUTDOWN || how == NGX_READ_SHUTDOWN) {
- if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
- || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
- {
- if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) {
- return NGX_ERROR;
- }
+ if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) {
+ return NGX_ERROR;
}
}
@@ -291,19 +280,21 @@ ngx_quic_shutdown_stream(ngx_connection_
static ngx_int_t
ngx_quic_shutdown_stream_send(ngx_connection_t *c)
{
- ngx_event_t *wev;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
- wev = c->write;
+ qs = c->quic;
- if (wev->error) {
+ if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
+ && qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
+ {
return NGX_OK;
}
- qs = c->quic;
+ qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
+
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@@ -327,8 +318,6 @@ ngx_quic_shutdown_stream_send(ngx_connec
ngx_quic_queue_frame(qc, frame);
- wev->error = 1;
-
return NGX_OK;
}
@@ -336,19 +325,19 @@ ngx_quic_shutdown_stream_send(ngx_connec
static ngx_int_t
ngx_quic_shutdown_stream_recv(ngx_connection_t *c)
{
- ngx_event_t *rev;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
- rev = c->read;
+ qs = c->quic;
- if (rev->pending_eof || rev->error) {
+ if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
+ && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
+ {
return NGX_OK;
}
- qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@@ -371,8 +360,6 @@ ngx_quic_shutdown_stream_recv(ngx_connec
ngx_quic_queue_frame(qc, frame);
- rev->error = 1;
-
return NGX_OK;
}
@@ -690,9 +677,13 @@ ngx_quic_create_stream(ngx_connection_t
if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
qs->send_max_data = qc->ctp.initial_max_stream_data_uni;
+ qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
+ qs->send_state = NGX_QUIC_STREAM_SEND_READY;
} else {
qs->recv_max_data = qc->tp.initial_max_stream_data_uni;
+ qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
+ qs->send_state = NGX_QUIC_STREAM_SEND_DATA_RECVD;
}
} else {
@@ -704,6 +695,9 @@ ngx_quic_create_stream(ngx_connection_t
qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote;
}
+
+ qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
+ qs->send_state = NGX_QUIC_STREAM_SEND_READY;
}
qs->recv_window = qs->recv_max_data;
@@ -744,26 +738,16 @@ ngx_quic_stream_recv(ngx_connection_t *c
pc = qs->parent;
rev = c->read;
- if (rev->error) {
+ if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
+ || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
+ {
+ qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_READ;
+ rev->error = 1;
return NGX_ERROR;
}
- ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic stream id:0x%xL recv eof:%d buf:%uz",
- qs->id, rev->pending_eof, size);
-
- if (qs->in == NULL || qs->in->buf->sync) {
- rev->ready = 0;
-
- if (qs->recv_offset == qs->final_size) {
- rev->eof = 1;
- return 0;
- }
-
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic stream id:0x%xL recv() not ready", qs->id);
- return NGX_AGAIN;
- }
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "quic stream id:0x%xL recv buf:%uz", qs->id, size);
in = ngx_quic_read_chain(pc, &qs->in, size);
if (in == NGX_CHAIN_ERROR) {
@@ -780,8 +764,23 @@ ngx_quic_stream_recv(ngx_connection_t *c
ngx_quic_free_chain(pc, in);
- if (qs->in == NULL) {
- rev->ready = rev->pending_eof;
+ if (len == 0) {
+ rev->ready = 0;
+
+ if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
+ && qs->recv_offset == qs->final_size)
+ {
+ qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
+ }
+
+ if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_READ) {
+ rev->eof = 1;
+ return 0;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "quic stream id:0x%xL recv() not ready", qs->id);
+ return NGX_AGAIN;
}
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
@@ -839,10 +838,15 @@ ngx_quic_stream_send_chain(ngx_connectio
qc = ngx_quic_get_connection(pc);
wev = c->write;
- if (wev->error) {
+ if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
+ && qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
+ {
+ wev->error = 1;
return NGX_CHAIN_ERROR;
}
+ qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
+
flow = ngx_quic_max_stream_flow(c);
if (flow == 0) {
wev->ready = 0;
@@ -1051,9 +1055,9 @@ ngx_quic_handle_stream_frame(ngx_connect
sc = qs->connection;
- rev = sc->read;
-
- if (rev->error) {
+ if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
+ && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
+ {
return NGX_OK;
}
@@ -1086,8 +1090,8 @@ ngx_quic_handle_stream_frame(ngx_connect
return NGX_ERROR;
}
- rev->pending_eof = 1;
qs->final_size = last;
+ qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
}
if (ngx_quic_write_chain(c, &qs->in, frame->data, f->length,
@@ -1098,6 +1102,7 @@ ngx_quic_handle_stream_frame(ngx_connect
}
if (f->offset == qs->recv_offset) {
+ rev = sc->read;
rev->ready = 1;
if (rev->active) {
@@ -1273,11 +1278,15 @@ ngx_quic_handle_reset_stream_frame(ngx_c
return NGX_OK;
}
- sc = qs->connection;
+ if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
+ || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
+ {
+ return NGX_OK;
+ }
- rev = sc->read;
- rev->error = 1;
- rev->ready = 1;
+ qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
+
+ sc = qs->connection;
if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
return NGX_ERROR;
@@ -1299,6 +1308,9 @@ ngx_quic_handle_reset_stream_frame(ngx_c
return NGX_ERROR;
}
+ rev = sc->read;
+ rev->ready = 1;
+
if (rev->active) {
ngx_post_event(rev, &ngx_posted_events);
}
@@ -1341,6 +1353,7 @@ ngx_quic_handle_stop_sending_frame(ngx_c
wev = qs->connection->write;
if (wev->active) {
+ wev->ready = 1;
ngx_post_event(wev, &ngx_posted_events);
}
@@ -1413,11 +1426,9 @@ static ngx_int_t
ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
{
uint64_t len;
- ngx_event_t *rev;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
- rev = c->read;
qs = c->quic;
qc = ngx_quic_get_connection(qs->parent);
@@ -1434,7 +1445,9 @@ ngx_quic_control_flow(ngx_connection_t *
qs->recv_last += len;
- if (!rev->error && qs->recv_last > qs->recv_max_data) {
+ if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
+ && qs->recv_last > qs->recv_max_data)
+ {
qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
return NGX_ERROR;
}
@@ -1454,12 +1467,10 @@ static ngx_int_t
ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
{
uint64_t len;
- ngx_event_t *rev;
ngx_connection_t *pc;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
- rev = c->read;
qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@@ -1475,9 +1486,7 @@ ngx_quic_update_flow(ngx_connection_t *c
qs->recv_offset += len;
- if (!rev->pending_eof && !rev->error
- && qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2)
- {
+ if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
return NGX_ERROR;
}
@@ -1510,6 +1519,10 @@ ngx_quic_update_max_stream_data(ngx_conn
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
+ if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV) {
+ return NGX_OK;
+ }
+
recv_max_data = qs->recv_offset + qs->recv_window;
if (qs->recv_max_data == recv_max_data) {
More information about the nginx-devel
mailing list