upstream keepalive: call for testing

Maxim Dounin mdounin at mdounin.ru
Tue Jul 26 11:57:26 UTC 2011


Hello!

Attached patch (against 1.0.5) introduces upstream keepalive 
support for memcached, fastcgi and http.  Note the patch is 
experimental and may have problems (though it passes basic smoke 
tests here).  Testing is appreciated.

Major changes include:

1. Content length now parsed into u->headers_in.content_length_n
   instead of directly to r->headers_out.content_length_n.
   Note this may break 3rd party protocol modules.

2. Use off_t for r->upstream->length.
   Note this may break 3rd party protocol modules.

3. In buffered mode u->pipe->length was introduced to indicate minimal amount
   of data which must be passed to input filter.  This allows to not rely on
   connection close to indicate response end while still effectiently using
   buffers in most cases.  Defaults to -1 (that is, wait for
   connection close) if not set by protocol-specific handlers.

4. In buffered mode u->input_filter_init() now called if it's set.
   This is used to set initial value of u->pipe->length (see above).

5. Proxy module now able to talk HTTP/1.1, in particular it understands
   chunked encoding in responses.  Requests are sent using HTTP/1.1
   if proxy_http_version directive is set to 1.1.

6. Introduced u->keepalive flag to indicate connection to upstream is in
   correct state and may be kept alive.  Memcached, fastcgi and proxy
   modules are updated to set it.

Patch is expected to be used with upstream keepalive module[1] compiled with
NGX_UPSTREAM_KEEPALIVE_PATCHED defined, i.e use something like this:

    ./configure --with-cc-opt="-D NGX_UPSTREAM_KEEPALIVE_PATCHED" \
        --add-module=/path/to/ngx_http_upstream_keepalive

And use something like this in config:

    upstream memcached_backend {
        server 127.0.0.1:11211;
        keepalive 1;
    }

    upstream http_backend {
        server 127.0.0.1:8080;
        keepalive 1;
    }

    upstream fastcgi_backend {
        server 127.0.0.1:9000;
        keepalive 1;
    }

    server {
        ...

        location /memcached/ {
            set $memcached_key $uri;
            memcached_pass memcached_backend;
        }

        location /fastcgi/ {
            fastcgi_pass fastcgi_backend;
            ...
        }

        location /http/ {
            proxy_pass http://http_backend;
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            ...
        }
    }

[1] http://mdounin.ru/hg/ngx_http_upstream_keepalive/

Maxim Dounin
-------------- next part --------------
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -392,8 +392,30 @@ ngx_event_pipe_read_upstream(ngx_event_p
                        cl->buf->file_last - cl->buf->file_pos);
     }
 
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
+                   "pipe length: %O", p->length);
+
 #endif
 
+    if (p->free_raw_bufs && p->length != -1) {
+        cl = p->free_raw_bufs;
+
+        if (cl->buf->last - cl->buf->pos >= p->length) {
+
+            /* STUB */ cl->buf->num = p->num++;
+
+            if (p->input_filter(p, cl->buf) == NGX_ERROR) {
+                 return NGX_ABORT;
+            }
+
+            p->free_raw_bufs = cl->next;
+        }
+    }
+
+    if (p->length == 0) {
+        p->upstream_done = 1;
+    }
+
     if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
 
         /* STUB */ p->free_raw_bufs->buf->num = p->num++;
@@ -848,6 +870,12 @@ ngx_event_pipe_copy_input_filter(ngx_eve
     }
     p->last_in = &cl->next;
 
+    if (p->length == -1) {
+        return NGX_OK;
+    }
+
+    p->length -= b->last - b->pos;
+
     return NGX_OK;
 }
 
diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h
--- a/src/event/ngx_event_pipe.h
+++ b/src/event/ngx_event_pipe.h
@@ -65,6 +65,7 @@ struct ngx_event_pipe_s {
     ssize_t            busy_size;
 
     off_t              read_length;
+    off_t              length;
 
     off_t              max_temp_file_size;
     ssize_t            temp_file_write_size;
diff --git a/src/http/modules/ngx_http_fastcgi_module.c b/src/http/modules/ngx_http_fastcgi_module.c
--- a/src/http/modules/ngx_http_fastcgi_module.c
+++ b/src/http/modules/ngx_http_fastcgi_module.c
@@ -77,6 +77,8 @@ typedef struct {
 
 #define NGX_HTTP_FASTCGI_RESPONDER      1
 
+#define NGX_HTTP_FASTCGI_KEEP_CONN      1
+
 #define NGX_HTTP_FASTCGI_BEGIN_REQUEST  1
 #define NGX_HTTP_FASTCGI_ABORT_REQUEST  2
 #define NGX_HTTP_FASTCGI_END_REQUEST    3
@@ -130,6 +132,7 @@ static ngx_int_t ngx_http_fastcgi_create
 static ngx_int_t ngx_http_fastcgi_create_request(ngx_http_request_t *r);
 static ngx_int_t ngx_http_fastcgi_reinit_request(ngx_http_request_t *r);
 static ngx_int_t ngx_http_fastcgi_process_header(ngx_http_request_t *r);
+static ngx_int_t ngx_http_fastcgi_input_filter_init(void *data);
 static ngx_int_t ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p,
     ngx_buf_t *buf);
 static ngx_int_t ngx_http_fastcgi_process_record(ngx_http_request_t *r,
@@ -484,7 +487,7 @@ static ngx_http_fastcgi_request_start_t 
 
     { 0,                                               /* role_hi */
       NGX_HTTP_FASTCGI_RESPONDER,                      /* role_lo */
-      0, /* NGX_HTTP_FASTCGI_KEEP_CONN */              /* flags */
+      NGX_HTTP_FASTCGI_KEEP_CONN,                      /* flags */
       { 0, 0, 0, 0, 0 } },                             /* reserved[5] */
 
     { 1,                                               /* version */
@@ -600,6 +603,8 @@ ngx_http_fastcgi_handler(ngx_http_reques
     u->pipe->input_filter = ngx_http_fastcgi_input_filter;
     u->pipe->input_ctx = r;
 
+    u->input_filter_init = ngx_http_fastcgi_input_filter_init;
+
     rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
 
     if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
@@ -1566,6 +1571,17 @@ ngx_http_fastcgi_process_header(ngx_http
 
 
 static ngx_int_t
+ngx_http_fastcgi_input_filter_init(void *data)
+{
+    ngx_http_request_t  *r = data;
+
+    r->upstream->pipe->length = sizeof(ngx_http_fastcgi_header_t);
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
 ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
 {
     u_char                  *m, *msg;
@@ -1603,7 +1619,6 @@ ngx_http_fastcgi_input_filter(ngx_event_
 
             if (f->type == NGX_HTTP_FASTCGI_STDOUT && f->length == 0) {
                 f->state = ngx_http_fastcgi_st_version;
-                p->upstream_done = 1;
 
                 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0,
                                "http fastcgi closed stdout");
@@ -1614,6 +1629,7 @@ ngx_http_fastcgi_input_filter(ngx_event_
             if (f->type == NGX_HTTP_FASTCGI_END_REQUEST) {
                 f->state = ngx_http_fastcgi_st_version;
                 p->upstream_done = 1;
+                r->upstream->keepalive = 1;
 
                 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0,
                                "http fastcgi sent end request");
@@ -1773,6 +1789,20 @@ ngx_http_fastcgi_input_filter(ngx_event_
 
     }
 
+    /* set p->length, minimal amount of data we want to see */
+
+    if (f->state < ngx_http_fastcgi_st_data) {
+        p->length = 1;
+
+    } else if (f->state == ngx_http_fastcgi_st_padding) {
+        p->length = f->padding;
+
+    } else {
+        /* ngx_http_fastcgi_st_data */
+
+        p->length = f->length;
+    }
+
     if (b) {
         b->shadow = buf;
         b->last_shadow = 1;
diff --git a/src/http/modules/ngx_http_memcached_module.c b/src/http/modules/ngx_http_memcached_module.c
--- a/src/http/modules/ngx_http_memcached_module.c
+++ b/src/http/modules/ngx_http_memcached_module.c
@@ -344,8 +344,8 @@ found:
 
         while (*p && *p++ != CR) { /* void */ }
 
-        r->headers_out.content_length_n = ngx_atoof(len, p - len - 1);
-        if (r->headers_out.content_length_n == -1) {
+        u->headers_in.content_length_n = ngx_atoof(len, p - len - 1);
+        if (u->headers_in.content_length_n == -1) {
             ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                           "memcached sent invalid length in response \"%V\" "
                           "for key \"%V\"",
@@ -366,6 +366,7 @@ found:
 
         u->headers_in.status_n = 404;
         u->state->status = 404;
+        u->keepalive = 1;
 
         return NGX_OK;
     }
@@ -426,6 +427,10 @@ ngx_http_memcached_filter(void *data, ss
         u->length -= bytes;
         ctx->rest -= bytes;
 
+        if (u->length == 0) {
+            u->keepalive = 1;
+        }
+
         return NGX_OK;
     }
 
@@ -463,6 +468,13 @@ ngx_http_memcached_filter(void *data, ss
     if (ngx_strncmp(last, ngx_http_memcached_end, b->last - last) != 0) {
         ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
                       "memcached sent invalid trailer");
+
+        b->last = last;
+        cl->buf->last = last;
+        u->length = 0;
+        ctx->rest = 0;
+
+        return NGX_OK;
     }
 
     ctx->rest -= b->last - last;
@@ -470,6 +482,10 @@ ngx_http_memcached_filter(void *data, ss
     cl->buf->last = last;
     u->length = ctx->rest;
 
+    if (u->length == 0) {
+        u->keepalive = 1;
+    }
+
     return NGX_OK;
 }
 
diff --git a/src/http/modules/ngx_http_proxy_module.c b/src/http/modules/ngx_http_proxy_module.c
--- a/src/http/modules/ngx_http_proxy_module.c
+++ b/src/http/modules/ngx_http_proxy_module.c
@@ -71,6 +71,8 @@ typedef struct {
 
     ngx_flag_t                     redirect;
 
+    ngx_uint_t                     http_version;
+
     ngx_uint_t                     headers_hash_max_size;
     ngx_uint_t                     headers_hash_bucket_size;
 } ngx_http_proxy_loc_conf_t;
@@ -80,6 +82,12 @@ typedef struct {
     ngx_http_status_t              status;
     ngx_http_proxy_vars_t          vars;
     size_t                         internal_body_length;
+
+    ngx_uint_t                     state;
+    off_t                          size;
+    off_t                          length;
+
+    ngx_uint_t                     head;  /* unsigned  head:1 */
 } ngx_http_proxy_ctx_t;
 
 
@@ -92,6 +100,15 @@ static ngx_int_t ngx_http_proxy_create_r
 static ngx_int_t ngx_http_proxy_reinit_request(ngx_http_request_t *r);
 static ngx_int_t ngx_http_proxy_process_status_line(ngx_http_request_t *r);
 static ngx_int_t ngx_http_proxy_process_header(ngx_http_request_t *r);
+static ngx_int_t ngx_http_proxy_input_filter_init(void *data);
+static ngx_int_t ngx_http_proxy_copy_filter(ngx_event_pipe_t *p,
+    ngx_buf_t *buf);
+static ngx_int_t ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p,
+    ngx_buf_t *buf);
+static ngx_int_t ngx_http_proxy_non_buffered_copy_filter(void *data,
+    ssize_t bytes);
+static ngx_int_t ngx_http_proxy_non_buffered_chunked_filter(void *data,
+    ssize_t bytes);
 static void ngx_http_proxy_abort_request(ngx_http_request_t *r);
 static void ngx_http_proxy_finalize_request(ngx_http_request_t *r,
     ngx_int_t rc);
@@ -157,6 +174,13 @@ static ngx_conf_bitmask_t  ngx_http_prox
 };
 
 
+static ngx_conf_enum_t  ngx_http_proxy_http_version[] = {
+    { ngx_string("1.0"), NGX_HTTP_VERSION_10 },
+    { ngx_string("1.1"), NGX_HTTP_VERSION_11 },
+    { ngx_null_string, 0 }
+};
+
+
 ngx_module_t  ngx_http_proxy_module;
 
 
@@ -432,6 +456,13 @@ static ngx_command_t  ngx_http_proxy_com
       offsetof(ngx_http_proxy_loc_conf_t, upstream.ignore_headers),
       &ngx_http_upstream_ignore_headers_masks },
 
+    { ngx_string("proxy_http_version"),
+      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
+      ngx_conf_set_enum_slot,
+      NGX_HTTP_LOC_CONF_OFFSET,
+      offsetof(ngx_http_proxy_loc_conf_t, http_version),
+      &ngx_http_proxy_http_version },
+
 #if (NGX_HTTP_SSL)
 
     { ngx_string("proxy_ssl_session_reuse"),
@@ -479,6 +510,7 @@ ngx_module_t  ngx_http_proxy_module = {
 
 
 static char  ngx_http_proxy_version[] = " HTTP/1.0" CRLF;
+static char  ngx_http_proxy_version_11[] = " HTTP/1.1" CRLF;
 
 
 static ngx_keyval_t  ngx_http_proxy_headers[] = {
@@ -486,6 +518,7 @@ static ngx_keyval_t  ngx_http_proxy_head
     { ngx_string("Connection"), ngx_string("close") },
     { ngx_string("Keep-Alive"), ngx_string("") },
     { ngx_string("Expect"), ngx_string("") },
+    { ngx_string("Upgrade"), ngx_string("") },
     { ngx_null_string, ngx_null_string }
 };
 
@@ -610,7 +643,12 @@ ngx_http_proxy_handler(ngx_http_request_
         return NGX_HTTP_INTERNAL_SERVER_ERROR;
     }
 
-    u->pipe->input_filter = ngx_event_pipe_copy_input_filter;
+    u->pipe->input_filter = ngx_http_proxy_copy_filter;
+    u->pipe->input_ctx = r;
+
+    u->input_filter_init = ngx_http_proxy_input_filter_init;
+    u->input_filter = ngx_http_proxy_non_buffered_copy_filter;
+    u->input_filter_ctx = r;
 
     u->accel = 1;
 
@@ -864,14 +902,20 @@ ngx_http_proxy_create_request(ngx_http_r
         method.len++;
     }
 
+    ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+    if (method.len == 5
+        && ngx_strncasecmp(method.data, (u_char *) "HEAD ", 5) == 0)
+    {
+        ctx->head = 1;
+    }
+
     len = method.len + sizeof(ngx_http_proxy_version) - 1 + sizeof(CRLF) - 1;
 
     escape = 0;
     loc_len = 0;
     unparsed_uri = 0;
 
-    ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
-
     if (plcf->proxy_lengths) {
         uri_len = ctx->vars.uri.len;
 
@@ -1007,8 +1051,14 @@ ngx_http_proxy_create_request(ngx_http_r
 
     u->uri.len = b->last - u->uri.data;
 
-    b->last = ngx_cpymem(b->last, ngx_http_proxy_version,
-                         sizeof(ngx_http_proxy_version) - 1);
+    if (plcf->http_version == NGX_HTTP_VERSION_11) {
+        b->last = ngx_cpymem(b->last, ngx_http_proxy_version_11,
+                             sizeof(ngx_http_proxy_version_11) - 1);
+
+    } else {
+        b->last = ngx_cpymem(b->last, ngx_http_proxy_version,
+                             sizeof(ngx_http_proxy_version) - 1);
+    }
 
     ngx_memzero(&e, sizeof(ngx_http_script_engine_t));
 
@@ -1157,8 +1207,11 @@ ngx_http_proxy_reinit_request(ngx_http_r
     ctx->status.count = 0;
     ctx->status.start = NULL;
     ctx->status.end = NULL;
+    ctx->state = 0;
 
     r->upstream->process_header = ngx_http_proxy_process_status_line;
+    r->upstream->pipe->input_filter = ngx_http_proxy_copy_filter;
+    r->upstream->input_filter = ngx_http_proxy_non_buffered_copy_filter;
     r->state = 0;
 
     return NGX_OK;
@@ -1244,6 +1297,8 @@ ngx_http_proxy_process_header(ngx_http_r
 {
     ngx_int_t                       rc;
     ngx_table_elt_t                *h;
+    ngx_http_upstream_t            *u;
+    ngx_http_proxy_ctx_t           *ctx;
     ngx_http_upstream_header_t     *hh;
     ngx_http_upstream_main_conf_t  *umcf;
 
@@ -1339,6 +1394,23 @@ ngx_http_proxy_process_header(ngx_http_r
                 h->lowcase_key = (u_char *) "date";
             }
 
+            /*
+             * set u->keepalive if response has no body; this allows to keep
+             * connections alive in case of r->header_only or X-Accel-Redirect
+             */
+
+            u = r->upstream;
+            ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+            if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT
+                || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED
+                || ctx->head
+                || (!u->headers_in.chunked
+                    && u->headers_in.content_length_n == 0))
+            {
+                u->keepalive = 1;
+            }
+
             return NGX_OK;
         }
 
@@ -1356,6 +1428,690 @@ ngx_http_proxy_process_header(ngx_http_r
 }
 
 
+static ngx_int_t
+ngx_http_proxy_input_filter_init(void *data)
+{
+    ngx_http_request_t    *r = data;
+    ngx_http_upstream_t   *u;
+    ngx_http_proxy_ctx_t  *ctx;
+
+    u = r->upstream;
+    ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+    ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                   "http proxy filter init s:%d h:%d c:%d l:%O",
+                   u->headers_in.status_n, ctx->head, u->headers_in.chunked,
+                   u->headers_in.content_length_n);
+
+    /* as per RFC2616, 4.4 Message Length */
+
+    if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT
+        || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED
+        || ctx->head)
+    {
+        /* 1xx, 204, and 304 and replies to HEAD requests */
+        /* no 1xx since we don't send Expect and Upgrade */
+
+        u->pipe->length = 0;
+        u->length = 0;
+        u->keepalive = 1;
+
+    } else if (u->headers_in.chunked) {
+        /* chunked */
+
+        u->pipe->input_filter = ngx_http_proxy_chunked_filter;
+        u->pipe->length = 3; /* "0" LF LF */
+
+        u->input_filter = ngx_http_proxy_non_buffered_chunked_filter;
+        u->length = -1;
+
+    } else if (u->headers_in.content_length_n == 0) {
+        /* empty body: special case as filter won't be called */
+
+        u->pipe->length = 0;
+        u->length = 0;
+        u->keepalive = 1;
+
+    } else {
+        /* content length or connection close */
+
+        u->pipe->length = u->headers_in.content_length_n;
+        u->length = u->headers_in.content_length_n;
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_copy_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
+{
+    ngx_buf_t           *b;
+    ngx_chain_t         *cl;
+    ngx_http_request_t  *r;
+
+    if (buf->pos == buf->last) {
+        return NGX_OK;
+    }
+
+    if (p->free) {
+        cl = p->free;
+        b = cl->buf;
+        p->free = cl->next;
+        ngx_free_chain(p->pool, cl);
+
+    } else {
+        b = ngx_alloc_buf(p->pool);
+        if (b == NULL) {
+            return NGX_ERROR;
+        }
+    }
+
+    ngx_memcpy(b, buf, sizeof(ngx_buf_t));
+    b->shadow = buf;
+    b->tag = p->tag;
+    b->last_shadow = 1;
+    b->recycled = 1;
+    buf->shadow = b;
+
+    cl = ngx_alloc_chain_link(p->pool);
+    if (cl == NULL) {
+        return NGX_ERROR;
+    }
+
+    cl->buf = b;
+    cl->next = NULL;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
+
+    if (p->in) {
+        *p->last_in = cl;
+    } else {
+        p->in = cl;
+    }
+    p->last_in = &cl->next;
+
+    if (p->length == -1) {
+        return NGX_OK;
+    }
+
+    p->length -= b->last - b->pos;
+
+    if (p->length == 0) {
+        r = p->input_ctx;
+        p->upstream_done = 1;
+        r->upstream->keepalive = 1;
+
+    } else if (p->length < 0) {
+        r = p->input_ctx;
+        p->upstream_done = 1;
+
+        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+                      "upstream sent too many data");
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_inline ngx_int_t
+ngx_http_proxy_parse_chunked(ngx_http_request_t *r, ngx_buf_t *buf)
+{
+    u_char                *pos, ch, c;
+    ngx_int_t              rc;
+    ngx_http_proxy_ctx_t  *ctx;
+    enum {
+        sw_chunk_start = 0,
+        sw_chunk_size,
+        sw_chunk_extension,
+        sw_chunk_extension_almost_done,
+        sw_chunk_data,
+        sw_after_data,
+        sw_after_data_almost_done,
+        sw_last_chunk_extension,
+        sw_last_chunk_extension_almost_done,
+        sw_trailer,
+        sw_trailer_almost_done,
+        sw_trailer_header,
+        sw_trailer_header_almost_done
+    } state;
+
+    ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+    state = ctx->state;
+
+    if (state == sw_chunk_data && ctx->size == 0) {
+        state = sw_after_data;
+    }
+
+    rc = NGX_AGAIN;
+
+    for (pos = buf->pos; pos < buf->last; pos++) {
+
+        ch = *pos;
+
+        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                       "http proxy chunked byte: %02Xd s:%d", ch, state);
+
+        switch (state) {
+
+        case sw_chunk_start:
+            if (ch >= '0' && ch <= '9') {
+                state = sw_chunk_size;
+                ctx->size = ch - '0';
+                break;
+            }
+
+            c = (u_char) (ch | 0x20);
+
+            if (c >= 'a' && c <= 'f') {
+                state = sw_chunk_size;
+                ctx->size = c - 'a' + 10;
+                break;
+            }
+
+            goto invalid;
+
+        case sw_chunk_size:
+            if (ch >= '0' && ch <= '9') {
+                ctx->size = ctx->size * 16 + (ch - '0');
+                break;
+            }
+
+            c = (u_char) (ch | 0x20);
+
+            if (c >= 'a' && c <= 'f') {
+                ctx->size = ctx->size * 16 + (c - 'a' + 10);
+                break;
+            }
+
+            if (ctx->size == 0) {
+
+                switch (ch) {
+                case CR:
+                    state = sw_last_chunk_extension_almost_done;
+                    break;
+                case LF:
+                    state = sw_trailer;
+                    break;
+                case ';':
+                    state = sw_last_chunk_extension;
+                    break;
+                default:
+                    goto invalid;
+                }
+
+                break;
+            }
+
+            switch (ch) {
+            case CR:
+                state = sw_chunk_extension_almost_done;
+                break;
+            case LF:
+                state = sw_chunk_data;
+                break;
+            case ';':
+                state = sw_chunk_extension;
+                break;
+            default:
+                goto invalid;
+            }
+
+            break;
+
+        case sw_chunk_extension:
+            switch (ch) {
+            case CR:
+                state = sw_chunk_extension_almost_done;
+                break;
+            case LF:
+                state = sw_chunk_data;
+            }
+            break;
+
+        case sw_chunk_extension_almost_done:
+            if (ch == LF) {
+                state = sw_chunk_data;
+                break;
+            }
+            goto invalid;
+
+        case sw_chunk_data:
+            rc = NGX_OK;
+            goto data;
+
+        case sw_after_data:
+            switch (ch) {
+            case CR:
+                state = sw_after_data_almost_done;
+                break;
+            case LF:
+                state = sw_chunk_start;
+            }
+            break;
+
+        case sw_after_data_almost_done:
+            if (ch == LF) {
+                state = sw_chunk_start;
+                break;
+            }
+            goto invalid;
+
+        case sw_last_chunk_extension:
+            switch (ch) {
+            case CR:
+                state = sw_last_chunk_extension_almost_done;
+                break;
+            case LF:
+                state = sw_trailer;
+            }
+            break;
+
+        case sw_last_chunk_extension_almost_done:
+            if (ch == LF) {
+                state = sw_trailer;
+                break;
+            }
+            goto invalid;
+
+        case sw_trailer:
+            switch (ch) {
+            case CR:
+                state = sw_trailer_almost_done;
+                break;
+            case LF:
+                goto done;
+            default:
+                state = sw_trailer_header;
+            }
+            break;
+
+        case sw_trailer_almost_done:
+            if (ch == LF) {
+                goto done;
+            }
+            goto invalid;
+
+        case sw_trailer_header:
+            switch (ch) {
+            case CR:
+                state = sw_trailer_header_almost_done;
+                break;
+            case LF:
+                state = sw_trailer;
+            }
+            break;
+
+        case sw_trailer_header_almost_done:
+            if (ch == LF) {
+                state = sw_trailer;
+                break;
+            }
+            goto invalid;
+
+        }
+    }
+
+data:
+
+    ctx->state = state;
+    buf->pos = pos;
+
+    switch (state) {
+
+    case sw_chunk_start:
+        ctx->length = 3 /* "0" LF LF */;
+        break;
+    case sw_chunk_size:
+        ctx->length = 2 /* LF LF */
+                      + (ctx->size ? ctx->size + 4 /* LF "0" LF LF */ : 0);
+        break;
+    case sw_chunk_extension:
+    case sw_chunk_extension_almost_done:
+        ctx->length = 1 /* LF */ + ctx->size + 4 /* LF "0" LF LF */;
+        break;
+    case sw_chunk_data:
+        ctx->length = ctx->size + 4 /* LF "0" LF LF */;
+        break;
+    case sw_after_data:
+    case sw_after_data_almost_done:
+        ctx->length = 4 /* LF "0" LF LF */;
+        break;
+    case sw_last_chunk_extension:
+    case sw_last_chunk_extension_almost_done:
+        ctx->length = 2 /* LF LF */;
+        break;
+    case sw_trailer:
+    case sw_trailer_almost_done:
+        ctx->length = 1 /* LF */;
+        break;
+    case sw_trailer_header:
+    case sw_trailer_header_almost_done:
+        ctx->length = 2 /* LF LF */;
+        break;
+
+    }
+
+    return rc;
+
+done:
+
+    return NGX_DONE;
+
+invalid:
+
+    ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+                  "upstream sent invalid chunked response");
+
+    return NGX_ERROR;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
+{
+    ngx_int_t              rc;
+    ngx_buf_t             *b, **prev;
+    ngx_chain_t           *cl;
+    ngx_http_request_t    *r;
+    ngx_http_proxy_ctx_t  *ctx;
+
+    if (buf->pos == buf->last) {
+        return NGX_OK;
+    }
+
+    r = p->input_ctx;
+    ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+    b = NULL;
+    prev = &buf->shadow;
+
+    for ( ;; ) {
+
+        rc = ngx_http_proxy_parse_chunked(r, buf);
+
+        if (rc == NGX_OK) {
+
+            /* a chunk has been parsed successfully */
+
+            if (p->free) {
+                cl = p->free;
+                b = cl->buf;
+                p->free = cl->next;
+                ngx_free_chain(p->pool, cl);
+
+            } else {
+                b = ngx_alloc_buf(p->pool);
+                if (b == NULL) {
+                    return NGX_ERROR;
+                }
+            }
+
+            ngx_memzero(b, sizeof(ngx_buf_t));
+
+            b->pos = buf->pos;
+            b->start = buf->start;
+            b->end = buf->end;
+            b->tag = p->tag;
+            b->temporary = 1;
+            b->recycled = 1;
+
+            *prev = b;
+            prev = &b->shadow;
+
+            cl = ngx_alloc_chain_link(p->pool);
+            if (cl == NULL) {
+                return NGX_ERROR;
+            }
+
+            cl->buf = b;
+            cl->next = NULL;
+
+            if (p->in) {
+                *p->last_in = cl;
+            } else {
+                p->in = cl;
+            }
+            p->last_in = &cl->next;
+
+            /* STUB */ b->num = buf->num;
+
+            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
+                           "input buf #%d %p", b->num, b->pos);
+
+            if (buf->last - buf->pos >= ctx->size) {
+
+                buf->pos += ctx->size;
+                b->last = buf->pos;
+                ctx->size = 0;
+
+                continue;
+            }
+
+            ctx->size -= buf->last - buf->pos;
+            buf->pos = buf->last;
+            b->last = buf->last;
+
+            continue;
+        }
+
+        if (rc == NGX_DONE) {
+
+            /* a whole response has been parsed successfully */
+
+            p->upstream_done = 1; /* or p->length = 0; ? */
+            r->upstream->keepalive = 1;
+
+            break;
+        }
+
+        if (rc == NGX_AGAIN) {
+
+            /* set p->length, minimal amount of data we want to see */
+
+            p->length = ctx->length;
+
+            break;
+        }
+
+        /* invalid response */
+
+        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+                      "upstream sent invalid chunked response");
+
+        return NGX_ERROR;
+    }
+
+    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                   "http proxy chunked state %d, length %d",
+                   ctx->state, p->length);
+
+    if (b) {
+        b->shadow = buf;
+        b->last_shadow = 1;
+
+        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
+                       "input buf %p %z", b->pos, b->last - b->pos);
+
+        return NGX_OK;
+    }
+
+    /* there is no data record in the buf, add it to free chain */
+
+    if (ngx_event_pipe_add_free_buf(p, buf) != NGX_OK) {
+        return NGX_ERROR;
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_non_buffered_copy_filter(void *data, ssize_t bytes)
+{
+    ngx_http_request_t   *r = data;
+
+    ngx_buf_t            *b;
+    ngx_chain_t          *cl, **ll;
+    ngx_http_upstream_t  *u;
+
+    u = r->upstream;
+
+    for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
+        ll = &cl->next;
+    }
+
+    cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
+    if (cl == NULL) {
+        return NGX_ERROR;
+    }
+
+    *ll = cl;
+
+    cl->buf->flush = 1;
+    cl->buf->memory = 1;
+
+    b = &u->buffer;
+
+    cl->buf->pos = b->last;
+    b->last += bytes;
+    cl->buf->last = b->last;
+    cl->buf->tag = u->output.tag;
+
+    if (u->length == -1) {
+        return NGX_OK;
+    }
+
+    u->length -= bytes;
+
+    if (u->length == 0) {
+        u->keepalive = 1;
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_non_buffered_chunked_filter(void *data, ssize_t bytes)
+{
+    ngx_http_request_t   *r = data;
+
+    ngx_int_t              rc;
+    ngx_buf_t             *b, *buf;
+    ngx_chain_t           *cl, **ll;
+    ngx_http_upstream_t   *u;
+    ngx_http_proxy_ctx_t  *ctx;
+
+    ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+    u = r->upstream;
+    buf = &u->buffer;
+
+    buf->pos = buf->last;
+    buf->last += bytes;
+
+    for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
+        ll = &cl->next;
+    }
+
+    for ( ;; ) {
+
+        rc = ngx_http_proxy_parse_chunked(r, buf);
+
+        if (rc == NGX_OK) {
+
+            /* a chunk has been parsed successfully */
+
+            cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
+            if (cl == NULL) {
+                return NGX_ERROR;
+            }
+
+            *ll = cl;
+            ll = &cl->next;
+
+            b = cl->buf;
+
+            b->flush = 1;
+            b->memory = 1;
+
+            b->pos = buf->pos;
+            b->tag = u->output.tag;
+
+            if (buf->last - buf->pos >= ctx->size) {
+                buf->pos += ctx->size;
+                b->last = buf->pos;
+                ctx->size = 0;
+
+            } else {
+                ctx->size -= buf->last - buf->pos;
+                buf->pos = buf->last;
+                b->last = buf->last;
+            }
+
+            ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                           "http proxy out buf %p %z",
+                           b->pos, b->last - b->pos);
+
+            continue;
+        }
+
+        if (rc == NGX_DONE) {
+
+            /* a whole response has been parsed successfully */
+
+            u->keepalive = 1;
+            u->length = 0;
+
+            break;
+        }
+
+        if (rc == NGX_AGAIN) {
+            break;
+        }
+
+        /* invalid response */
+
+        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+                      "upstream sent invalid chunked response");
+
+        return NGX_ERROR;
+    }
+
+    /* provide continuous buffer for subrequests in memory */
+
+    if (r->subrequest_in_memory) {
+
+        cl = u->out_bufs;
+
+        if (cl) {
+            buf->pos = cl->buf->pos;
+        }
+
+        buf->last = buf->pos;
+
+        for (cl = u->out_bufs; cl; cl = cl->next) {
+            ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                           "http proxy in memory %p-%p %uz",
+                           cl->buf->pos, cl->buf->last, ngx_buf_size(cl->buf));
+
+            if (buf->last == cl->buf->pos) {
+                buf->last = cl->buf->last;
+                continue;
+            }
+
+            buf->last = ngx_movemem(buf->last, cl->buf->pos,
+                                    cl->buf->last - cl->buf->pos);
+
+            cl->buf->pos = buf->last - (cl->buf->last - cl->buf->pos);
+            cl->buf->last = buf->last;
+        }
+    }
+
+    return NGX_OK;
+}
+
+
 static void
 ngx_http_proxy_abort_request(ngx_http_request_t *r)
 {
@@ -1704,6 +2460,8 @@ ngx_http_proxy_create_loc_conf(ngx_conf_
     conf->redirect = NGX_CONF_UNSET;
     conf->upstream.change_buffering = 1;
 
+    conf->http_version = NGX_CONF_UNSET_UINT;
+
     conf->headers_hash_max_size = NGX_CONF_UNSET_UINT;
     conf->headers_hash_bucket_size = NGX_CONF_UNSET_UINT;
 
@@ -2005,6 +2763,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t
     }
 #endif
 
+    ngx_conf_merge_uint_value(conf->http_version, prev->http_version,
+                              NGX_HTTP_VERSION_10);
+
     ngx_conf_merge_uint_value(conf->headers_hash_max_size,
                               prev->headers_hash_max_size, 512);
 
diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c
--- a/src/http/ngx_http_upstream.c
+++ b/src/http/ngx_http_upstream.c
@@ -72,6 +72,8 @@ static void ngx_http_upstream_finalize_r
 
 static ngx_int_t ngx_http_upstream_process_header_line(ngx_http_request_t *r,
     ngx_table_elt_t *h, ngx_uint_t offset);
+static ngx_int_t ngx_http_upstream_process_content_length(ngx_http_request_t *r,
+    ngx_table_elt_t *h, ngx_uint_t offset);
 static ngx_int_t ngx_http_upstream_process_set_cookie(ngx_http_request_t *r,
     ngx_table_elt_t *h, ngx_uint_t offset);
 static ngx_int_t
@@ -89,6 +91,9 @@ static ngx_int_t ngx_http_upstream_proce
     ngx_table_elt_t *h, ngx_uint_t offset);
 static ngx_int_t ngx_http_upstream_process_charset(ngx_http_request_t *r,
     ngx_table_elt_t *h, ngx_uint_t offset);
+static ngx_int_t
+    ngx_http_upstream_process_transfer_encoding(ngx_http_request_t *r,
+    ngx_table_elt_t *h, ngx_uint_t offset);
 static ngx_int_t ngx_http_upstream_copy_header_line(ngx_http_request_t *r,
     ngx_table_elt_t *h, ngx_uint_t offset);
 static ngx_int_t
@@ -96,8 +101,6 @@ static ngx_int_t
     ngx_table_elt_t *h, ngx_uint_t offset);
 static ngx_int_t ngx_http_upstream_copy_content_type(ngx_http_request_t *r,
     ngx_table_elt_t *h, ngx_uint_t offset);
-static ngx_int_t ngx_http_upstream_copy_content_length(ngx_http_request_t *r,
-    ngx_table_elt_t *h, ngx_uint_t offset);
 static ngx_int_t ngx_http_upstream_copy_last_modified(ngx_http_request_t *r,
     ngx_table_elt_t *h, ngx_uint_t offset);
 static ngx_int_t ngx_http_upstream_rewrite_location(ngx_http_request_t *r,
@@ -149,9 +152,9 @@ ngx_http_upstream_header_t  ngx_http_ups
                  ngx_http_upstream_copy_content_type, 0, 1 },
 
     { ngx_string("Content-Length"),
-                 ngx_http_upstream_process_header_line,
+                 ngx_http_upstream_process_content_length,
                  offsetof(ngx_http_upstream_headers_in_t, content_length),
-                 ngx_http_upstream_copy_content_length, 0, 0 },
+                 ngx_http_upstream_ignore_header_line, 0, 0 },
 
     { ngx_string("Date"),
                  ngx_http_upstream_process_header_line,
@@ -247,6 +250,10 @@ ngx_http_upstream_header_t  ngx_http_ups
                  ngx_http_upstream_process_charset, 0,
                  ngx_http_upstream_copy_header_line, 0, 0 },
 
+    { ngx_string("Transfer-Encoding"),
+                 ngx_http_upstream_process_transfer_encoding, 0,
+                 ngx_http_upstream_ignore_header_line, 0, 0 },
+
 #if (NGX_HTTP_GZIP)
     { ngx_string("Content-Encoding"),
                  ngx_http_upstream_process_header_line,
@@ -396,6 +403,8 @@ ngx_http_upstream_create(ngx_http_reques
     r->cache = NULL;
 #endif
 
+    u->headers_in.content_length_n = -1;
+
     return NGX_OK;
 }
 
@@ -798,6 +807,7 @@ ngx_http_upstream_cache_send(ngx_http_re
     u->buffer.pos += c->header_start;
 
     ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
+    u->headers_in.content_length_n = -1;
 
     if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
                       sizeof(ngx_table_elt_t))
@@ -1280,7 +1290,10 @@ ngx_http_upstream_reinit(ngx_http_reques
         return NGX_ERROR;
     }
 
+    u->keepalive = 0;
+
     ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
+    u->headers_in.content_length_n = -1;
 
     if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
                       sizeof(ngx_table_elt_t))
@@ -1922,14 +1935,9 @@ ngx_http_upstream_process_headers(ngx_ht
     r->headers_out.status = u->headers_in.status_n;
     r->headers_out.status_line = u->headers_in.status_line;
 
-    u->headers_in.content_length_n = r->headers_out.content_length_n;
-
-    if (r->headers_out.content_length_n != -1) {
-        u->length = (size_t) r->headers_out.content_length_n;
-
-    } else {
-        u->length = NGX_MAX_SIZE_T_VALUE;
-    }
+    r->headers_out.content_length_n = u->headers_in.content_length_n;
+
+    u->length = u->headers_in.content_length_n;
 
     return NGX_OK;
 }
@@ -1993,6 +2001,11 @@ ngx_http_upstream_process_body_in_memory
         }
     }
 
+    if (u->length == 0) {
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
     if (ngx_handle_read_event(rev, 0) != NGX_OK) {
         ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
         return;
@@ -2294,6 +2307,15 @@ ngx_http_upstream_send_response(ngx_http
     u->read_event_handler = ngx_http_upstream_process_upstream;
     r->write_event_handler = ngx_http_upstream_process_downstream;
 
+    p->length = -1;
+
+    if (u->input_filter_init
+        && u->input_filter_init(p->input_ctx) != NGX_OK)
+    {
+        ngx_http_upstream_finalize_request(r, u, 0);
+        return;
+    }
+
     ngx_http_upstream_process_upstream(r, u);
 }
 
@@ -2386,6 +2408,10 @@ ngx_http_upstream_process_non_buffered_r
 
             if (u->busy_bufs == NULL) {
 
+                ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                               "http upstream non buffered length:%O",
+                               u->length);
+
                 if (u->length == 0
                     || upstream->read->eof
                     || upstream->read->error)
@@ -2401,10 +2427,6 @@ ngx_http_upstream_process_non_buffered_r
 
         size = b->end - b->last;
 
-        if (size > u->length) {
-            size = u->length;
-        }
-
         if (size && upstream->read->ready) {
 
             n = upstream->recv(upstream, b->last, size);
@@ -2501,12 +2523,16 @@ ngx_http_upstream_non_buffered_filter(vo
     cl->buf->last = b->last;
     cl->buf->tag = u->output.tag;
 
-    if (u->length == NGX_MAX_SIZE_T_VALUE) {
+    if (u->length == -1) {
         return NGX_OK;
     }
 
     u->length -= bytes;
 
+    if (u->length == 0) {
+        u->keepalive = 1;
+    }
+
     return NGX_OK;
 }
 
@@ -3057,6 +3083,21 @@ ngx_http_upstream_ignore_header_line(ngx
 
 
 static ngx_int_t
+ngx_http_upstream_process_content_length(ngx_http_request_t *r,
+    ngx_table_elt_t *h, ngx_uint_t offset)
+{
+    ngx_http_upstream_t  *u;
+
+    u = r->upstream;
+
+    u->headers_in.content_length = h;
+    u->headers_in.content_length_n = ngx_atoof(h->value.data, h->value.len);
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
 ngx_http_upstream_process_set_cookie(ngx_http_request_t *r, ngx_table_elt_t *h,
     ngx_uint_t offset)
 {
@@ -3318,6 +3359,23 @@ ngx_http_upstream_process_charset(ngx_ht
 
 
 static ngx_int_t
+ngx_http_upstream_process_transfer_encoding(ngx_http_request_t *r,
+    ngx_table_elt_t *h, ngx_uint_t offset)
+{
+    r->upstream->headers_in.transfer_encoding = h;
+
+    if (ngx_strlcasestrn(h->value.data, h->value.data + h->value.len,
+                         (u_char *) "chunked", 7 - 1)
+        != NULL)
+    {
+        r->upstream->headers_in.chunked = 1;
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
 ngx_http_upstream_copy_header_line(ngx_http_request_t *r, ngx_table_elt_t *h,
     ngx_uint_t offset)
 {
@@ -3425,26 +3483,6 @@ ngx_http_upstream_copy_content_type(ngx_
 
 
 static ngx_int_t
-ngx_http_upstream_copy_content_length(ngx_http_request_t *r, ngx_table_elt_t *h,
-    ngx_uint_t offset)
-{
-    ngx_table_elt_t  *ho;
-
-    ho = ngx_list_push(&r->headers_out.headers);
-    if (ho == NULL) {
-        return NGX_ERROR;
-    }
-
-    *ho = *h;
-
-    r->headers_out.content_length = ho;
-    r->headers_out.content_length_n = ngx_atoof(h->value.data, h->value.len);
-
-    return NGX_OK;
-}
-
-
-static ngx_int_t
 ngx_http_upstream_copy_last_modified(ngx_http_request_t *r, ngx_table_elt_t *h,
     ngx_uint_t offset)
 {
diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h
--- a/src/http/ngx_http_upstream.h
+++ b/src/http/ngx_http_upstream.h
@@ -216,6 +216,7 @@ typedef struct {
     ngx_table_elt_t                 *location;
     ngx_table_elt_t                 *accept_ranges;
     ngx_table_elt_t                 *www_authenticate;
+    ngx_table_elt_t                 *transfer_encoding;
 
 #if (NGX_HTTP_GZIP)
     ngx_table_elt_t                 *content_encoding;
@@ -224,6 +225,8 @@ typedef struct {
     off_t                            content_length_n;
 
     ngx_array_t                      cache_control;
+
+    unsigned                         chunked:1;
 } ngx_http_upstream_headers_in_t;
 
 
@@ -266,7 +269,7 @@ struct ngx_http_upstream_s {
     ngx_http_upstream_resolved_t    *resolved;
 
     ngx_buf_t                        buffer;
-    size_t                           length;
+    off_t                            length;
 
     ngx_chain_t                     *out_bufs;
     ngx_chain_t                     *busy_bufs;
@@ -307,6 +310,7 @@ struct ngx_http_upstream_s {
 #endif
 
     unsigned                         buffering:1;
+    unsigned                         keepalive:1;
 
     unsigned                         request_sent:1;
     unsigned                         header_sent:1;


More information about the nginx-devel mailing list