[nginx] Stream: upstream and downstream limit rates.

Roman Arutyunyan arut at nginx.com
Tue Jun 23 17:19:43 UTC 2015


details:   http://hg.nginx.org/nginx/rev/24488e6db782
branches:  
changeset: 6201:24488e6db782
user:      Roman Arutyunyan <arut at nginx.com>
date:      Tue Jun 23 20:17:48 2015 +0300
description:
Stream: upstream and downstream limit rates.

diffstat:

 src/stream/ngx_stream_proxy_module.c |  125 ++++++++++++++++++++++++++++++----
 src/stream/ngx_stream_upstream.h     |    1 +
 2 files changed, 111 insertions(+), 15 deletions(-)

diffs (257 lines):

diff -r abee77018d3a -r 24488e6db782 src/stream/ngx_stream_proxy_module.c
--- a/src/stream/ngx_stream_proxy_module.c	Tue Jun 23 20:17:47 2015 +0300
+++ b/src/stream/ngx_stream_proxy_module.c	Tue Jun 23 20:17:48 2015 +0300
@@ -18,7 +18,9 @@ typedef struct {
     ngx_msec_t                       timeout;
     ngx_msec_t                       next_upstream_timeout;
     size_t                           downstream_buf_size;
+    size_t                           downstream_limit_rate;
     size_t                           upstream_buf_size;
+    size_t                           upstream_limit_rate;
     ngx_uint_t                       next_upstream_tries;
     ngx_flag_t                       next_upstream;
     ngx_flag_t                       proxy_protocol;
@@ -132,6 +134,13 @@ static ngx_command_t  ngx_stream_proxy_c
       offsetof(ngx_stream_proxy_srv_conf_t, downstream_buf_size),
       NULL },
 
+    { ngx_string("proxy_downstream_limit_rate"),
+      NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+      ngx_conf_set_size_slot,
+      NGX_STREAM_SRV_CONF_OFFSET,
+      offsetof(ngx_stream_proxy_srv_conf_t, downstream_limit_rate),
+      NULL },
+
     { ngx_string("proxy_upstream_buffer"),
       NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
       ngx_conf_set_size_slot,
@@ -139,6 +148,13 @@ static ngx_command_t  ngx_stream_proxy_c
       offsetof(ngx_stream_proxy_srv_conf_t, upstream_buf_size),
       NULL },
 
+    { ngx_string("proxy_upstream_limit_rate"),
+      NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+      ngx_conf_set_size_slot,
+      NGX_STREAM_SRV_CONF_OFFSET,
+      offsetof(ngx_stream_proxy_srv_conf_t, upstream_limit_rate),
+      NULL },
+
     { ngx_string("proxy_next_upstream"),
       NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
       ngx_conf_set_flag_slot,
@@ -340,6 +356,7 @@ ngx_stream_proxy_handler(ngx_stream_sess
     }
 
     u->proxy_protocol = pscf->proxy_protocol;
+    u->start_sec = ngx_time();
 
     p = ngx_pnalloc(c->pool, pscf->downstream_buf_size);
     if (p == NULL) {
@@ -831,17 +848,56 @@ ngx_stream_proxy_upstream_handler(ngx_ev
 static void
 ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream)
 {
-    ngx_connection_t       *c;
-    ngx_stream_session_t   *s;
-    ngx_stream_upstream_t  *u;
+    ngx_connection_t             *c, *pc;
+    ngx_stream_session_t         *s;
+    ngx_stream_upstream_t        *u;
+    ngx_stream_proxy_srv_conf_t  *pscf;
 
     c = ev->data;
     s = c->data;
     u = s->upstream;
 
     if (ev->timedout) {
-        ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
-        ngx_stream_proxy_finalize(s, NGX_DECLINED);
+
+        if (ev->delayed) {
+
+            ev->timedout = 0;
+            ev->delayed = 0;
+
+            if (!ev->ready) {
+                if (ngx_handle_read_event(ev, 0) != NGX_OK) {
+                    ngx_stream_proxy_finalize(s, NGX_ERROR);
+                    return;
+                }
+
+                if (u->upstream_buf.start) {
+                    pc = u->peer.connection;
+
+                    if (!c->read->delayed && !pc->read->delayed) {
+                        pscf = ngx_stream_get_module_srv_conf(s,
+                                                       ngx_stream_proxy_module);
+                        ngx_add_timer(c->write, pscf->timeout);
+                    }
+                }
+
+                return;
+            }
+
+        } else {
+            ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
+            ngx_stream_proxy_finalize(s, NGX_DECLINED);
+            return;
+        }
+
+    } else if (ev->delayed) {
+
+        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                       "stream connection delayed");
+
+        if (ngx_handle_read_event(ev, 0) != NGX_OK) {
+            ngx_stream_proxy_finalize(s, NGX_ERROR);
+        }
+
         return;
     }
 
@@ -930,10 +986,12 @@ static ngx_int_t
 ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
     ngx_uint_t do_write)
 {
-    size_t                        size;
+    off_t                        *received, limit;
+    size_t                        size, limit_rate;
     ssize_t                       n;
     ngx_buf_t                    *b;
     ngx_uint_t                    flags;
+    ngx_msec_t                    delay;
     ngx_connection_t             *c, *pc, *src, *dst;
     ngx_log_handler_pt            handler;
     ngx_stream_upstream_t        *u;
@@ -944,15 +1002,21 @@ ngx_stream_proxy_process(ngx_stream_sess
     c = s->connection;
     pc = u->upstream_buf.start ? u->peer.connection : NULL;
 
+    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
     if (from_upstream) {
         src = pc;
         dst = c;
         b = &u->upstream_buf;
+        limit_rate = pscf->upstream_limit_rate;
+        received = &u->received;
 
     } else {
         src = c;
         dst = pc;
         b = &u->downstream_buf;
+        limit_rate = pscf->downstream_limit_rate;
+        received = &s->received;
     }
 
     for ( ;; ) {
@@ -983,7 +1047,23 @@ ngx_stream_proxy_process(ngx_stream_sess
 
         size = b->end - b->last;
 
-        if (size && src->read->ready) {
+        if (size && src->read->ready && !src->read->delayed) {
+
+            if (limit_rate) {
+                limit = (off_t) limit_rate * (ngx_time() - u->start_sec + 1)
+                        - *received;
+
+                if (limit <= 0) {
+                    src->read->delayed = 1;
+                    delay = (ngx_msec_t) (- limit * 1000 / limit_rate + 1);
+                    ngx_add_timer(src->read, delay);
+                    break;
+                }
+
+                if (size > (size_t) limit) {
+                    size = limit;
+                }
+            }
 
             n = src->recv(src, b->last, size);
 
@@ -992,15 +1072,19 @@ ngx_stream_proxy_process(ngx_stream_sess
             }
 
             if (n > 0) {
-                if (from_upstream) {
-                    u->received += n;
+                if (limit_rate) {
+                    delay = (ngx_msec_t) (n * 1000 / limit_rate);
 
-                } else {
-                    s->received += n;
+                    if (delay > 0) {
+                        src->read->delayed = 1;
+                        ngx_add_timer(src->read, delay);
+                    }
                 }
 
+                *received += n;
+                b->last += n;
                 do_write = 1;
-                b->last += n;
+
                 continue;
             }
 
@@ -1012,8 +1096,6 @@ ngx_stream_proxy_process(ngx_stream_sess
         break;
     }
 
-    pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
-
     if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) {
         handler = c->log->handler;
         c->log->handler = NULL;
@@ -1044,7 +1126,12 @@ ngx_stream_proxy_process(ngx_stream_sess
             return NGX_ERROR;
         }
 
-        ngx_add_timer(c->read, pscf->timeout);
+        if (!c->read->delayed && !pc->read->delayed) {
+            ngx_add_timer(c->write, pscf->timeout);
+
+        } else if (c->write->timer_set) {
+            ngx_del_timer(c->write);
+        }
     }
 
     return NGX_OK;
@@ -1207,7 +1294,9 @@ ngx_stream_proxy_create_srv_conf(ngx_con
     conf->timeout = NGX_CONF_UNSET_MSEC;
     conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC;
     conf->downstream_buf_size = NGX_CONF_UNSET_SIZE;
+    conf->downstream_limit_rate = NGX_CONF_UNSET_SIZE;
     conf->upstream_buf_size = NGX_CONF_UNSET_SIZE;
+    conf->upstream_limit_rate = NGX_CONF_UNSET_SIZE;
     conf->next_upstream_tries = NGX_CONF_UNSET_UINT;
     conf->next_upstream = NGX_CONF_UNSET;
     conf->proxy_protocol = NGX_CONF_UNSET;
@@ -1244,9 +1333,15 @@ ngx_stream_proxy_merge_srv_conf(ngx_conf
     ngx_conf_merge_size_value(conf->downstream_buf_size,
                               prev->downstream_buf_size, 16384);
 
+    ngx_conf_merge_size_value(conf->downstream_limit_rate,
+                              prev->downstream_limit_rate, 0);
+
     ngx_conf_merge_size_value(conf->upstream_buf_size,
                               prev->upstream_buf_size, 16384);
 
+    ngx_conf_merge_size_value(conf->upstream_limit_rate,
+                              prev->upstream_limit_rate, 0);
+
     ngx_conf_merge_uint_value(conf->next_upstream_tries,
                               prev->next_upstream_tries, 0);
 
diff -r abee77018d3a -r 24488e6db782 src/stream/ngx_stream_upstream.h
--- a/src/stream/ngx_stream_upstream.h	Tue Jun 23 20:17:47 2015 +0300
+++ b/src/stream/ngx_stream_upstream.h	Tue Jun 23 20:17:48 2015 +0300
@@ -83,6 +83,7 @@ typedef struct {
     ngx_buf_t                          downstream_buf;
     ngx_buf_t                          upstream_buf;
     off_t                              received;
+    time_t                             start_sec;
 #if (NGX_STREAM_SSL)
     ngx_str_t                          ssl_name;
 #endif



More information about the nginx-devel mailing list