[PATCH 2 of 3] QUIC: simplified stream initialization

Roman Arutyunyan arut at nginx.com
Thu Nov 25 14:20:50 UTC 2021


# HG changeset patch
# User Roman Arutyunyan <arut at nginx.com>
# Date 1637693300 -10800
#      Tue Nov 23 21:48:20 2021 +0300
# Branch quic
# Node ID 3d2354bfa1a2a257b9f73772ad0836585be85a6c
# Parent  5b03ffd757804542daec73188a509b02e6b2c596
QUIC: simplified stream initialization.

After creation, a client stream is added to qc->streams.uninitialized queue.
After initialization it's removed from the queue.  If a stream is never
initialized, it is freed in ngx_quic_close_streams().  Stream initializer
is now set as read event handler in stream connection.

Previously qc->streams.uninitialized was used for delayed stream
initialization.

The change makes is possible not to handle separately the case of a new stream
in stream-related frame handlers.  It makes these handlers simpler since new
streams and existing streams are now handled by the same code.

diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
--- a/src/event/quic/ngx_event_quic_streams.c
+++ b/src/event/quic/ngx_event_quic_streams.c
@@ -13,10 +13,9 @@
 #define NGX_QUIC_STREAM_GONE     (void *) -1
 
 
-static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
-    uint64_t id);
+static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
 static ngx_int_t ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id);
-static ngx_int_t ngx_quic_init_stream(ngx_quic_stream_t *qs);
+static void ngx_quic_init_stream_handler(ngx_event_t *ev);
 static void ngx_quic_init_streams_handler(ngx_connection_t *c);
 static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
     uint64_t id);
@@ -306,21 +305,28 @@ ngx_quic_shutdown_stream(ngx_connection_
 
 
 static ngx_quic_stream_t *
-ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
+ngx_quic_get_stream(ngx_connection_t *c, uint64_t id)
 {
     uint64_t                min_id;
+    ngx_event_t            *rev;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL is new", id);
+    qc = ngx_quic_get_connection(c);
+
+    qs = ngx_quic_find_stream(&qc->streams.tree, id);
 
-    qc = ngx_quic_get_connection(c);
+    if (qs) {
+        return qs;
+    }
 
     if (qc->shutdown || qc->closing) {
         return NGX_QUIC_STREAM_GONE;
     }
 
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL is missing", id);
+
     if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
 
         if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
@@ -377,7 +383,11 @@ ngx_quic_create_client_stream(ngx_connec
      * streams of that type with lower-numbered stream IDs also being opened.
      */
 
-    for ( /* void */ ; min_id < id; min_id += 0x04) {
+#if (NGX_SUPPRESS_WARN)
+    qs = NULL;
+#endif
+
+    for ( /* void */ ; min_id <= id; min_id += 0x04) {
 
         qs = ngx_quic_create_stream(c, min_id);
 
@@ -389,22 +399,17 @@ ngx_quic_create_client_stream(ngx_connec
             continue;
         }
 
-        if (ngx_quic_init_stream(qs) != NGX_OK) {
-            return NULL;
-        }
+        ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue);
 
-        if (qc->shutdown || qc->closing) {
-            return NGX_QUIC_STREAM_GONE;
+        rev = qs->connection->read;
+        rev->handler = ngx_quic_init_stream_handler;
+
+        if (qc->streams.initialized) {
+            ngx_post_event(rev, &ngx_posted_events);
         }
     }
 
-    qs = ngx_quic_create_stream(c, id);
-
     if (qs == NULL) {
-        if (ngx_quic_reject_stream(c, id) != NGX_OK) {
-            return NULL;
-        }
-
         return NGX_QUIC_STREAM_GONE;
     }
 
@@ -461,29 +466,20 @@ ngx_quic_reject_stream(ngx_connection_t 
 }
 
 
-static ngx_int_t
-ngx_quic_init_stream(ngx_quic_stream_t *qs)
+static void
+ngx_quic_init_stream_handler(ngx_event_t *ev)
 {
-    ngx_connection_t       *c;
-    ngx_quic_connection_t  *qc;
-
-    qc = ngx_quic_get_connection(qs->parent);
+    ngx_connection_t   *c;
+    ngx_quic_stream_t  *qs;
 
-    c = qs->connection;
-
-    if (!qc->streams.initialized) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic postpone stream init");
-
-        ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue);
-        return NGX_OK;
-    }
+    c = ev->data;
+    qs = c->quic;
 
     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic init stream");
 
-    c->listening->handler(c);
+    ngx_queue_remove(&qs->queue);
 
-    return NGX_OK;
+    c->listening->handler(c);
 }
 
 
@@ -527,16 +523,12 @@ ngx_quic_init_streams_handler(ngx_connec
 
     qc = ngx_quic_get_connection(c);
 
-    while (!ngx_queue_empty(&qc->streams.uninitialized)) {
-        q = ngx_queue_head(&qc->streams.uninitialized);
-        ngx_queue_remove(q);
-
+    for (q = ngx_queue_head(&qc->streams.uninitialized);
+         q != ngx_queue_sentinel(&qc->streams.uninitialized);
+         q = ngx_queue_next(q))
+    {
         qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
-
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
-                       "quic init postponed stream");
-
-        qs->connection->listening->handler(qs->connection);
+        ngx_post_event(qs->connection->read, &ngx_posted_events);
     }
 
     qc->streams.initialized = 1;
@@ -1015,7 +1007,6 @@ ngx_quic_handle_stream_frame(ngx_connect
     ngx_quic_frame_t *frame)
 {
     uint64_t                  last;
-    ngx_pool_t               *pool;
     ngx_event_t              *rev;
     ngx_connection_t         *sc;
     ngx_quic_stream_t        *qs;
@@ -1035,39 +1026,14 @@ ngx_quic_handle_stream_frame(ngx_connect
     /* no overflow since both values are 62-bit */
     last = f->offset + f->length;
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
+    qs = ngx_quic_get_stream(c, f->stream_id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->stream_id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
-
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        sc = qs->connection;
+        return NGX_ERROR;
+    }
 
-        if (ngx_quic_control_flow(sc, last) != NGX_OK) {
-            goto cleanup;
-        }
-
-        if (f->fin) {
-            sc->read->pending_eof = 1;
-            qs->final_size = last;
-        }
-
-        if (f->offset == 0) {
-            sc->read->ready = 1;
-        }
-
-        if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) {
-            goto cleanup;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     sc = qs->connection;
@@ -1127,15 +1093,6 @@ ngx_quic_handle_stream_frame(ngx_connect
     }
 
     return NGX_OK;
-
-cleanup:
-
-    pool = sc->pool;
-
-    ngx_close_connection(sc);
-    ngx_destroy_pool(pool);
-
-    return NGX_ERROR;
 }
 
 
@@ -1212,20 +1169,14 @@ ngx_quic_handle_stream_data_blocked_fram
         return NGX_ERROR;
     }
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
+    qs = ngx_quic_get_stream(c, f->id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
+        return NGX_ERROR;
+    }
 
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     return ngx_quic_update_max_stream_data(qs->connection);
@@ -1250,24 +1201,14 @@ ngx_quic_handle_max_stream_data_frame(ng
         return NGX_ERROR;
     }
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
+    qs = ngx_quic_get_stream(c, f->id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
+        return NGX_ERROR;
+    }
 
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        if (f->limit > qs->send_max_data) {
-            qs->send_max_data = f->limit;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     if (f->limit <= qs->send_max_data) {
@@ -1295,7 +1236,6 @@ ngx_int_t
 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
 {
-    ngx_pool_t             *pool;
     ngx_event_t            *rev;
     ngx_connection_t       *sc;
     ngx_quic_stream_t      *qs;
@@ -1310,36 +1250,14 @@ ngx_quic_handle_reset_stream_frame(ngx_c
         return NGX_ERROR;
     }
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
+    qs = ngx_quic_get_stream(c, f->id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
-
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
-
-        sc = qs->connection;
+        return NGX_ERROR;
+    }
 
-        rev = sc->read;
-        rev->error = 1;
-        rev->ready = 1;
-
-        if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
-            goto cleanup;
-        }
-
-        qs->final_size = f->final_size;
-
-        if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
-            goto cleanup;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     sc = qs->connection;
@@ -1373,15 +1291,6 @@ ngx_quic_handle_reset_stream_frame(ngx_c
     }
 
     return NGX_OK;
-
-cleanup:
-
-    pool = sc->pool;
-
-    ngx_close_connection(sc);
-    ngx_destroy_pool(pool);
-
-    return NGX_ERROR;
 }
 
 
@@ -1389,9 +1298,7 @@ ngx_int_t
 ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f)
 {
-    ngx_pool_t             *pool;
     ngx_event_t            *wev;
-    ngx_connection_t       *sc;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
@@ -1404,31 +1311,14 @@ ngx_quic_handle_stop_sending_frame(ngx_c
         return NGX_ERROR;
     }
 
-    qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
+    qs = ngx_quic_get_stream(c, f->id);
 
     if (qs == NULL) {
-        qs = ngx_quic_create_client_stream(c, f->id);
-
-        if (qs == NULL) {
-            return NGX_ERROR;
-        }
-
-        if (qs == NGX_QUIC_STREAM_GONE) {
-            return NGX_OK;
-        }
+        return NGX_ERROR;
+    }
 
-        sc = qs->connection;
-
-        if (ngx_quic_reset_stream(sc, f->error_code) != NGX_OK) {
-            pool = sc->pool;
-
-            ngx_close_connection(sc);
-            ngx_destroy_pool(pool);
-
-            return NGX_ERROR;
-        }
-
-        return ngx_quic_init_stream(qs);
+    if (qs == NGX_QUIC_STREAM_GONE) {
+        return NGX_OK;
     }
 
     if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {


More information about the nginx-devel mailing list