[PATCH 1 of 2] QUIC: introduced explicit stream states

Roman Arutyunyan arut at nginx.com
Wed Jan 26 09:06:53 UTC 2022


# HG changeset patch
# User Roman Arutyunyan <arut at nginx.com>
# Date 1643181129 -10800
#      Wed Jan 26 10:12:09 2022 +0300
# Branch quic
# Node ID 42776baa77bea8da4cc0e3a157f122f0803b1e97
# Parent  6c86685941a8fe90b7dfd41bb150abca1c345087
QUIC: introduced explicit stream states.

This allows to eliminate the usage of stream connection event flags for tracking
stream state.

diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h
--- a/src/event/quic/ngx_event_quic.h
+++ b/src/event/quic/ngx_event_quic.h
@@ -28,6 +28,26 @@
 #define NGX_QUIC_STREAM_UNIDIRECTIONAL       0x02
 
 
+typedef enum {
+    NGX_QUIC_STREAM_SEND_READY = 0,
+    NGX_QUIC_STREAM_SEND_SEND,
+    NGX_QUIC_STREAM_SEND_DATA_SENT,
+    NGX_QUIC_STREAM_SEND_DATA_RECVD,
+    NGX_QUIC_STREAM_SEND_RESET_SENT,
+    NGX_QUIC_STREAM_SEND_RESET_RECVD
+} ngx_quic_stream_send_state_e;
+
+
+typedef enum {
+    NGX_QUIC_STREAM_RECV_RECV = 0,
+    NGX_QUIC_STREAM_RECV_SIZE_KNOWN,
+    NGX_QUIC_STREAM_RECV_DATA_RECVD,
+    NGX_QUIC_STREAM_RECV_DATA_READ,
+    NGX_QUIC_STREAM_RECV_RESET_RECVD,
+    NGX_QUIC_STREAM_RECV_RESET_READ
+} ngx_quic_stream_recv_state_e;
+
+
 typedef struct {
     ngx_ssl_t                 *ssl;
 
@@ -66,6 +86,8 @@ struct ngx_quic_stream_s {
     ngx_chain_t               *in;
     ngx_chain_t               *out;
     ngx_uint_t                 cancelable;  /* unsigned  cancelable:1; */
+    ngx_quic_stream_send_state_e  send_state;
+    ngx_quic_stream_recv_state_e  recv_state;
 };
 
 
diff --git a/src/event/quic/ngx_event_quic_ack.c b/src/event/quic/ngx_event_quic_ack.c
--- a/src/event/quic/ngx_event_quic_ack.c
+++ b/src/event/quic/ngx_event_quic_ack.c
@@ -617,10 +617,13 @@ ngx_quic_resend_frames(ngx_connection_t 
         case NGX_QUIC_FT_STREAM:
             qs = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
 
-            if (qs && qs->connection->write->error) {
-                /* RESET_STREAM was sent */
-                ngx_quic_free_frame(c, f);
-                break;
+            if (qs) {
+                if (qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
+                    || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
+                {
+                    ngx_quic_free_frame(c, f);
+                    break;
+                }
             }
 
             /* fall through */
diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
--- a/src/event/quic/ngx_event_quic_streams.c
+++ b/src/event/quic/ngx_event_quic_streams.c
@@ -220,19 +220,22 @@ ngx_quic_close_streams(ngx_connection_t 
 ngx_int_t
 ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
 {
-    ngx_event_t            *wev;
     ngx_connection_t       *pc;
     ngx_quic_frame_t       *frame;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    wev = c->write;
+    qs = c->quic;
 
-    if (wev->error) {
+    if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
+        || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
+        || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
+    {
         return NGX_OK;
     }
 
-    qs = c->quic;
+    qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
+
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
 
@@ -249,9 +252,6 @@ ngx_quic_reset_stream(ngx_connection_t *
 
     ngx_quic_queue_frame(qc, frame);
 
-    wev->error = 1;
-    wev->ready = 1;
-
     return NGX_OK;
 }
 
@@ -259,27 +259,15 @@ ngx_quic_reset_stream(ngx_connection_t *
 ngx_int_t
 ngx_quic_shutdown_stream(ngx_connection_t *c, int how)
 {
-    ngx_quic_stream_t  *qs;
-
-    qs = c->quic;
-
     if (how == NGX_RDWR_SHUTDOWN || how == NGX_WRITE_SHUTDOWN) {
-        if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED)
-            || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
-        {
-            if (ngx_quic_shutdown_stream_send(c) != NGX_OK) {
-                return NGX_ERROR;
-            }
+        if (ngx_quic_shutdown_stream_send(c) != NGX_OK) {
+            return NGX_ERROR;
         }
     }
 
     if (how == NGX_RDWR_SHUTDOWN || how == NGX_READ_SHUTDOWN) {
-        if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
-            || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
-        {
-            if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) {
-                return NGX_ERROR;
-            }
+        if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) {
+            return NGX_ERROR;
         }
     }
 
@@ -290,19 +278,21 @@ ngx_quic_shutdown_stream(ngx_connection_
 static ngx_int_t
 ngx_quic_shutdown_stream_send(ngx_connection_t *c)
 {
-    ngx_event_t            *wev;
     ngx_connection_t       *pc;
     ngx_quic_frame_t       *frame;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    wev = c->write;
+    qs = c->quic;
 
-    if (wev->error) {
+    if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
+        && qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
+    {
         return NGX_OK;
     }
 
-    qs = c->quic;
+    qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
+
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
 
@@ -326,8 +316,6 @@ ngx_quic_shutdown_stream_send(ngx_connec
 
     ngx_quic_queue_frame(qc, frame);
 
-    wev->error = 1;
-
     return NGX_OK;
 }
 
@@ -335,19 +323,19 @@ ngx_quic_shutdown_stream_send(ngx_connec
 static ngx_int_t
 ngx_quic_shutdown_stream_recv(ngx_connection_t *c)
 {
-    ngx_event_t            *rev;
     ngx_connection_t       *pc;
     ngx_quic_frame_t       *frame;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    rev = c->read;
+    qs = c->quic;
 
-    if (rev->pending_eof || rev->error) {
+    if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
+        && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
+    {
         return NGX_OK;
     }
 
-    qs = c->quic;
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
 
@@ -370,8 +358,6 @@ ngx_quic_shutdown_stream_recv(ngx_connec
 
     ngx_quic_queue_frame(qc, frame);
 
-    rev->error = 1;
-
     return NGX_OK;
 }
 
@@ -689,9 +675,13 @@ ngx_quic_create_stream(ngx_connection_t 
     if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
             qs->send_max_data = qc->ctp.initial_max_stream_data_uni;
+            qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
+            qs->send_state = NGX_QUIC_STREAM_SEND_READY;
 
         } else {
             qs->recv_max_data = qc->tp.initial_max_stream_data_uni;
+            qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
+            qs->send_state = NGX_QUIC_STREAM_SEND_DATA_RECVD;
         }
 
     } else {
@@ -703,6 +693,9 @@ ngx_quic_create_stream(ngx_connection_t 
             qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
             qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote;
         }
+
+        qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
+        qs->send_state = NGX_QUIC_STREAM_SEND_READY;
     }
 
     qs->recv_window = qs->recv_max_data;
@@ -743,26 +736,16 @@ ngx_quic_stream_recv(ngx_connection_t *c
     pc = qs->parent;
     rev = c->read;
 
-    if (rev->error) {
+    if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
+        || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
+    {
+        qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_READ;
+        rev->error = 1;
         return NGX_ERROR;
     }
 
-    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL recv eof:%d buf:%uz",
-                   qs->id, rev->pending_eof, size);
-
-    if (qs->in == NULL || qs->in->buf->sync) {
-        rev->ready = 0;
-
-        if (qs->recv_offset == qs->final_size) {
-            rev->eof = 1;
-            return 0;
-        }
-
-        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic stream id:0x%xL recv() not ready", qs->id);
-        return NGX_AGAIN;
-    }
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL recv buf:%uz", qs->id, size);
 
     in = ngx_quic_read_chain(pc, &qs->in, size);
     if (in == NGX_CHAIN_ERROR) {
@@ -779,8 +762,23 @@ ngx_quic_stream_recv(ngx_connection_t *c
 
     ngx_quic_free_chain(pc, in);
 
-    if (qs->in == NULL) {
-        rev->ready = rev->pending_eof;
+    if (len == 0) {
+        rev->ready = 0;
+
+        if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
+            && qs->recv_offset == qs->final_size)
+        {
+            qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
+        }
+
+        if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_READ) {
+            rev->eof = 1;
+            return 0;
+        }
+
+        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                       "quic stream id:0x%xL recv() not ready", qs->id);
+        return NGX_AGAIN;
     }
 
     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
@@ -838,10 +836,15 @@ ngx_quic_stream_send_chain(ngx_connectio
     qc = ngx_quic_get_connection(pc);
     wev = c->write;
 
-    if (wev->error) {
+    if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
+        && qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
+    {
+        wev->error = 1;
         return NGX_CHAIN_ERROR;
     }
 
+    qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
+
     flow = ngx_quic_max_stream_flow(c);
     if (flow == 0) {
         wev->ready = 0;
@@ -1050,9 +1053,9 @@ ngx_quic_handle_stream_frame(ngx_connect
 
     sc = qs->connection;
 
-    rev = sc->read;
-
-    if (rev->error) {
+    if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
+        && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
+    {
         return NGX_OK;
     }
 
@@ -1085,8 +1088,8 @@ ngx_quic_handle_stream_frame(ngx_connect
             return NGX_ERROR;
         }
 
-        rev->pending_eof = 1;
         qs->final_size = last;
+        qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
     }
 
     if (ngx_quic_write_chain(c, &qs->in, frame->data, f->length,
@@ -1097,6 +1100,7 @@ ngx_quic_handle_stream_frame(ngx_connect
     }
 
     if (f->offset == qs->recv_offset) {
+        rev = sc->read;
         rev->ready = 1;
 
         if (rev->active) {
@@ -1272,11 +1276,15 @@ ngx_quic_handle_reset_stream_frame(ngx_c
         return NGX_OK;
     }
 
-    sc = qs->connection;
+    if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
+        || qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
+    {
+        return NGX_OK;
+    }
 
-    rev = sc->read;
-    rev->error = 1;
-    rev->ready = 1;
+    qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
+
+    sc = qs->connection;
 
     if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
         return NGX_ERROR;
@@ -1298,6 +1306,9 @@ ngx_quic_handle_reset_stream_frame(ngx_c
         return NGX_ERROR;
     }
 
+    rev = sc->read;
+    rev->ready = 1;
+
     if (rev->active) {
         ngx_post_event(rev, &ngx_posted_events);
     }
@@ -1340,6 +1351,7 @@ ngx_quic_handle_stop_sending_frame(ngx_c
     wev = qs->connection->write;
 
     if (wev->active) {
+        wev->ready = 1;
         ngx_post_event(wev, &ngx_posted_events);
     }
 
@@ -1412,11 +1424,9 @@ static ngx_int_t
 ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
 {
     uint64_t                len;
-    ngx_event_t            *rev;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    rev = c->read;
     qs = c->quic;
     qc = ngx_quic_get_connection(qs->parent);
 
@@ -1433,7 +1443,7 @@ ngx_quic_control_flow(ngx_connection_t *
 
     qs->recv_last += len;
 
-    if (!rev->error && qs->recv_last > qs->recv_max_data) {
+    if (qs->recv_last > qs->recv_max_data) {
         qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
         return NGX_ERROR;
     }
@@ -1453,12 +1463,10 @@ static ngx_int_t
 ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
 {
     uint64_t                len;
-    ngx_event_t            *rev;
     ngx_connection_t       *pc;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    rev = c->read;
     qs = c->quic;
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
@@ -1474,9 +1482,7 @@ ngx_quic_update_flow(ngx_connection_t *c
 
     qs->recv_offset += len;
 
-    if (!rev->pending_eof && !rev->error
-        && qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2)
-    {
+    if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
         if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
             return NGX_ERROR;
         }
@@ -1509,6 +1515,10 @@ ngx_quic_update_max_stream_data(ngx_conn
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
 
+    if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV) {
+        return NGX_OK;
+    }
+
     recv_max_data = qs->recv_offset + qs->recv_window;
 
     if (qs->recv_max_data == recv_max_data) {



More information about the nginx-devel mailing list