upstream keepalive: call for testing
Maxim Dounin
mdounin at mdounin.ru
Tue Jul 26 11:57:26 UTC 2011
Hello!
Attached patch (against 1.0.5) introduces upstream keepalive
support for memcached, fastcgi and http. Note the patch is
experimental and may have problems (though it passes basic smoke
tests here). Testing is appreciated.
Major changes include:
1. Content length now parsed into u->headers_in.content_length_n
instead of directly to r->headers_out.content_length_n.
Note this may break 3rd party protocol modules.
2. Use off_t for r->upstream->length.
Note this may break 3rd party protocol modules.
3. In buffered mode u->pipe->length was introduced to indicate minimal amount
of data which must be passed to input filter. This allows to not rely on
connection close to indicate response end while still effectiently using
buffers in most cases. Defaults to -1 (that is, wait for
connection close) if not set by protocol-specific handlers.
4. In buffered mode u->input_filter_init() now called if it's set.
This is used to set initial value of u->pipe->length (see above).
5. Proxy module now able to talk HTTP/1.1, in particular it understands
chunked encoding in responses. Requests are sent using HTTP/1.1
if proxy_http_version directive is set to 1.1.
6. Introduced u->keepalive flag to indicate connection to upstream is in
correct state and may be kept alive. Memcached, fastcgi and proxy
modules are updated to set it.
Patch is expected to be used with upstream keepalive module[1] compiled with
NGX_UPSTREAM_KEEPALIVE_PATCHED defined, i.e use something like this:
./configure --with-cc-opt="-D NGX_UPSTREAM_KEEPALIVE_PATCHED" \
--add-module=/path/to/ngx_http_upstream_keepalive
And use something like this in config:
upstream memcached_backend {
server 127.0.0.1:11211;
keepalive 1;
}
upstream http_backend {
server 127.0.0.1:8080;
keepalive 1;
}
upstream fastcgi_backend {
server 127.0.0.1:9000;
keepalive 1;
}
server {
...
location /memcached/ {
set $memcached_key $uri;
memcached_pass memcached_backend;
}
location /fastcgi/ {
fastcgi_pass fastcgi_backend;
...
}
location /http/ {
proxy_pass http://http_backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
...
}
}
[1] http://mdounin.ru/hg/ngx_http_upstream_keepalive/
Maxim Dounin
-------------- next part --------------
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -392,8 +392,30 @@ ngx_event_pipe_read_upstream(ngx_event_p
cl->buf->file_last - cl->buf->file_pos);
}
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "pipe length: %O", p->length);
+
#endif
+ if (p->free_raw_bufs && p->length != -1) {
+ cl = p->free_raw_bufs;
+
+ if (cl->buf->last - cl->buf->pos >= p->length) {
+
+ /* STUB */ cl->buf->num = p->num++;
+
+ if (p->input_filter(p, cl->buf) == NGX_ERROR) {
+ return NGX_ABORT;
+ }
+
+ p->free_raw_bufs = cl->next;
+ }
+ }
+
+ if (p->length == 0) {
+ p->upstream_done = 1;
+ }
+
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
/* STUB */ p->free_raw_bufs->buf->num = p->num++;
@@ -848,6 +870,12 @@ ngx_event_pipe_copy_input_filter(ngx_eve
}
p->last_in = &cl->next;
+ if (p->length == -1) {
+ return NGX_OK;
+ }
+
+ p->length -= b->last - b->pos;
+
return NGX_OK;
}
diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h
--- a/src/event/ngx_event_pipe.h
+++ b/src/event/ngx_event_pipe.h
@@ -65,6 +65,7 @@ struct ngx_event_pipe_s {
ssize_t busy_size;
off_t read_length;
+ off_t length;
off_t max_temp_file_size;
ssize_t temp_file_write_size;
diff --git a/src/http/modules/ngx_http_fastcgi_module.c b/src/http/modules/ngx_http_fastcgi_module.c
--- a/src/http/modules/ngx_http_fastcgi_module.c
+++ b/src/http/modules/ngx_http_fastcgi_module.c
@@ -77,6 +77,8 @@ typedef struct {
#define NGX_HTTP_FASTCGI_RESPONDER 1
+#define NGX_HTTP_FASTCGI_KEEP_CONN 1
+
#define NGX_HTTP_FASTCGI_BEGIN_REQUEST 1
#define NGX_HTTP_FASTCGI_ABORT_REQUEST 2
#define NGX_HTTP_FASTCGI_END_REQUEST 3
@@ -130,6 +132,7 @@ static ngx_int_t ngx_http_fastcgi_create
static ngx_int_t ngx_http_fastcgi_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_fastcgi_reinit_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_fastcgi_process_header(ngx_http_request_t *r);
+static ngx_int_t ngx_http_fastcgi_input_filter_init(void *data);
static ngx_int_t ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p,
ngx_buf_t *buf);
static ngx_int_t ngx_http_fastcgi_process_record(ngx_http_request_t *r,
@@ -484,7 +487,7 @@ static ngx_http_fastcgi_request_start_t
{ 0, /* role_hi */
NGX_HTTP_FASTCGI_RESPONDER, /* role_lo */
- 0, /* NGX_HTTP_FASTCGI_KEEP_CONN */ /* flags */
+ NGX_HTTP_FASTCGI_KEEP_CONN, /* flags */
{ 0, 0, 0, 0, 0 } }, /* reserved[5] */
{ 1, /* version */
@@ -600,6 +603,8 @@ ngx_http_fastcgi_handler(ngx_http_reques
u->pipe->input_filter = ngx_http_fastcgi_input_filter;
u->pipe->input_ctx = r;
+ u->input_filter_init = ngx_http_fastcgi_input_filter_init;
+
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
@@ -1566,6 +1571,17 @@ ngx_http_fastcgi_process_header(ngx_http
static ngx_int_t
+ngx_http_fastcgi_input_filter_init(void *data)
+{
+ ngx_http_request_t *r = data;
+
+ r->upstream->pipe->length = sizeof(ngx_http_fastcgi_header_t);
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
{
u_char *m, *msg;
@@ -1603,7 +1619,6 @@ ngx_http_fastcgi_input_filter(ngx_event_
if (f->type == NGX_HTTP_FASTCGI_STDOUT && f->length == 0) {
f->state = ngx_http_fastcgi_st_version;
- p->upstream_done = 1;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0,
"http fastcgi closed stdout");
@@ -1614,6 +1629,7 @@ ngx_http_fastcgi_input_filter(ngx_event_
if (f->type == NGX_HTTP_FASTCGI_END_REQUEST) {
f->state = ngx_http_fastcgi_st_version;
p->upstream_done = 1;
+ r->upstream->keepalive = 1;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0,
"http fastcgi sent end request");
@@ -1773,6 +1789,20 @@ ngx_http_fastcgi_input_filter(ngx_event_
}
+ /* set p->length, minimal amount of data we want to see */
+
+ if (f->state < ngx_http_fastcgi_st_data) {
+ p->length = 1;
+
+ } else if (f->state == ngx_http_fastcgi_st_padding) {
+ p->length = f->padding;
+
+ } else {
+ /* ngx_http_fastcgi_st_data */
+
+ p->length = f->length;
+ }
+
if (b) {
b->shadow = buf;
b->last_shadow = 1;
diff --git a/src/http/modules/ngx_http_memcached_module.c b/src/http/modules/ngx_http_memcached_module.c
--- a/src/http/modules/ngx_http_memcached_module.c
+++ b/src/http/modules/ngx_http_memcached_module.c
@@ -344,8 +344,8 @@ found:
while (*p && *p++ != CR) { /* void */ }
- r->headers_out.content_length_n = ngx_atoof(len, p - len - 1);
- if (r->headers_out.content_length_n == -1) {
+ u->headers_in.content_length_n = ngx_atoof(len, p - len - 1);
+ if (u->headers_in.content_length_n == -1) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"memcached sent invalid length in response \"%V\" "
"for key \"%V\"",
@@ -366,6 +366,7 @@ found:
u->headers_in.status_n = 404;
u->state->status = 404;
+ u->keepalive = 1;
return NGX_OK;
}
@@ -426,6 +427,10 @@ ngx_http_memcached_filter(void *data, ss
u->length -= bytes;
ctx->rest -= bytes;
+ if (u->length == 0) {
+ u->keepalive = 1;
+ }
+
return NGX_OK;
}
@@ -463,6 +468,13 @@ ngx_http_memcached_filter(void *data, ss
if (ngx_strncmp(last, ngx_http_memcached_end, b->last - last) != 0) {
ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
"memcached sent invalid trailer");
+
+ b->last = last;
+ cl->buf->last = last;
+ u->length = 0;
+ ctx->rest = 0;
+
+ return NGX_OK;
}
ctx->rest -= b->last - last;
@@ -470,6 +482,10 @@ ngx_http_memcached_filter(void *data, ss
cl->buf->last = last;
u->length = ctx->rest;
+ if (u->length == 0) {
+ u->keepalive = 1;
+ }
+
return NGX_OK;
}
diff --git a/src/http/modules/ngx_http_proxy_module.c b/src/http/modules/ngx_http_proxy_module.c
--- a/src/http/modules/ngx_http_proxy_module.c
+++ b/src/http/modules/ngx_http_proxy_module.c
@@ -71,6 +71,8 @@ typedef struct {
ngx_flag_t redirect;
+ ngx_uint_t http_version;
+
ngx_uint_t headers_hash_max_size;
ngx_uint_t headers_hash_bucket_size;
} ngx_http_proxy_loc_conf_t;
@@ -80,6 +82,12 @@ typedef struct {
ngx_http_status_t status;
ngx_http_proxy_vars_t vars;
size_t internal_body_length;
+
+ ngx_uint_t state;
+ off_t size;
+ off_t length;
+
+ ngx_uint_t head; /* unsigned head:1 */
} ngx_http_proxy_ctx_t;
@@ -92,6 +100,15 @@ static ngx_int_t ngx_http_proxy_create_r
static ngx_int_t ngx_http_proxy_reinit_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_proxy_process_status_line(ngx_http_request_t *r);
static ngx_int_t ngx_http_proxy_process_header(ngx_http_request_t *r);
+static ngx_int_t ngx_http_proxy_input_filter_init(void *data);
+static ngx_int_t ngx_http_proxy_copy_filter(ngx_event_pipe_t *p,
+ ngx_buf_t *buf);
+static ngx_int_t ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p,
+ ngx_buf_t *buf);
+static ngx_int_t ngx_http_proxy_non_buffered_copy_filter(void *data,
+ ssize_t bytes);
+static ngx_int_t ngx_http_proxy_non_buffered_chunked_filter(void *data,
+ ssize_t bytes);
static void ngx_http_proxy_abort_request(ngx_http_request_t *r);
static void ngx_http_proxy_finalize_request(ngx_http_request_t *r,
ngx_int_t rc);
@@ -157,6 +174,13 @@ static ngx_conf_bitmask_t ngx_http_prox
};
+static ngx_conf_enum_t ngx_http_proxy_http_version[] = {
+ { ngx_string("1.0"), NGX_HTTP_VERSION_10 },
+ { ngx_string("1.1"), NGX_HTTP_VERSION_11 },
+ { ngx_null_string, 0 }
+};
+
+
ngx_module_t ngx_http_proxy_module;
@@ -432,6 +456,13 @@ static ngx_command_t ngx_http_proxy_com
offsetof(ngx_http_proxy_loc_conf_t, upstream.ignore_headers),
&ngx_http_upstream_ignore_headers_masks },
+ { ngx_string("proxy_http_version"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_enum_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_proxy_loc_conf_t, http_version),
+ &ngx_http_proxy_http_version },
+
#if (NGX_HTTP_SSL)
{ ngx_string("proxy_ssl_session_reuse"),
@@ -479,6 +510,7 @@ ngx_module_t ngx_http_proxy_module = {
static char ngx_http_proxy_version[] = " HTTP/1.0" CRLF;
+static char ngx_http_proxy_version_11[] = " HTTP/1.1" CRLF;
static ngx_keyval_t ngx_http_proxy_headers[] = {
@@ -486,6 +518,7 @@ static ngx_keyval_t ngx_http_proxy_head
{ ngx_string("Connection"), ngx_string("close") },
{ ngx_string("Keep-Alive"), ngx_string("") },
{ ngx_string("Expect"), ngx_string("") },
+ { ngx_string("Upgrade"), ngx_string("") },
{ ngx_null_string, ngx_null_string }
};
@@ -610,7 +643,12 @@ ngx_http_proxy_handler(ngx_http_request_
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
- u->pipe->input_filter = ngx_event_pipe_copy_input_filter;
+ u->pipe->input_filter = ngx_http_proxy_copy_filter;
+ u->pipe->input_ctx = r;
+
+ u->input_filter_init = ngx_http_proxy_input_filter_init;
+ u->input_filter = ngx_http_proxy_non_buffered_copy_filter;
+ u->input_filter_ctx = r;
u->accel = 1;
@@ -864,14 +902,20 @@ ngx_http_proxy_create_request(ngx_http_r
method.len++;
}
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+ if (method.len == 5
+ && ngx_strncasecmp(method.data, (u_char *) "HEAD ", 5) == 0)
+ {
+ ctx->head = 1;
+ }
+
len = method.len + sizeof(ngx_http_proxy_version) - 1 + sizeof(CRLF) - 1;
escape = 0;
loc_len = 0;
unparsed_uri = 0;
- ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
-
if (plcf->proxy_lengths) {
uri_len = ctx->vars.uri.len;
@@ -1007,8 +1051,14 @@ ngx_http_proxy_create_request(ngx_http_r
u->uri.len = b->last - u->uri.data;
- b->last = ngx_cpymem(b->last, ngx_http_proxy_version,
- sizeof(ngx_http_proxy_version) - 1);
+ if (plcf->http_version == NGX_HTTP_VERSION_11) {
+ b->last = ngx_cpymem(b->last, ngx_http_proxy_version_11,
+ sizeof(ngx_http_proxy_version_11) - 1);
+
+ } else {
+ b->last = ngx_cpymem(b->last, ngx_http_proxy_version,
+ sizeof(ngx_http_proxy_version) - 1);
+ }
ngx_memzero(&e, sizeof(ngx_http_script_engine_t));
@@ -1157,8 +1207,11 @@ ngx_http_proxy_reinit_request(ngx_http_r
ctx->status.count = 0;
ctx->status.start = NULL;
ctx->status.end = NULL;
+ ctx->state = 0;
r->upstream->process_header = ngx_http_proxy_process_status_line;
+ r->upstream->pipe->input_filter = ngx_http_proxy_copy_filter;
+ r->upstream->input_filter = ngx_http_proxy_non_buffered_copy_filter;
r->state = 0;
return NGX_OK;
@@ -1244,6 +1297,8 @@ ngx_http_proxy_process_header(ngx_http_r
{
ngx_int_t rc;
ngx_table_elt_t *h;
+ ngx_http_upstream_t *u;
+ ngx_http_proxy_ctx_t *ctx;
ngx_http_upstream_header_t *hh;
ngx_http_upstream_main_conf_t *umcf;
@@ -1339,6 +1394,23 @@ ngx_http_proxy_process_header(ngx_http_r
h->lowcase_key = (u_char *) "date";
}
+ /*
+ * set u->keepalive if response has no body; this allows to keep
+ * connections alive in case of r->header_only or X-Accel-Redirect
+ */
+
+ u = r->upstream;
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+ if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT
+ || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED
+ || ctx->head
+ || (!u->headers_in.chunked
+ && u->headers_in.content_length_n == 0))
+ {
+ u->keepalive = 1;
+ }
+
return NGX_OK;
}
@@ -1356,6 +1428,690 @@ ngx_http_proxy_process_header(ngx_http_r
}
+static ngx_int_t
+ngx_http_proxy_input_filter_init(void *data)
+{
+ ngx_http_request_t *r = data;
+ ngx_http_upstream_t *u;
+ ngx_http_proxy_ctx_t *ctx;
+
+ u = r->upstream;
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy filter init s:%d h:%d c:%d l:%O",
+ u->headers_in.status_n, ctx->head, u->headers_in.chunked,
+ u->headers_in.content_length_n);
+
+ /* as per RFC2616, 4.4 Message Length */
+
+ if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT
+ || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED
+ || ctx->head)
+ {
+ /* 1xx, 204, and 304 and replies to HEAD requests */
+ /* no 1xx since we don't send Expect and Upgrade */
+
+ u->pipe->length = 0;
+ u->length = 0;
+ u->keepalive = 1;
+
+ } else if (u->headers_in.chunked) {
+ /* chunked */
+
+ u->pipe->input_filter = ngx_http_proxy_chunked_filter;
+ u->pipe->length = 3; /* "0" LF LF */
+
+ u->input_filter = ngx_http_proxy_non_buffered_chunked_filter;
+ u->length = -1;
+
+ } else if (u->headers_in.content_length_n == 0) {
+ /* empty body: special case as filter won't be called */
+
+ u->pipe->length = 0;
+ u->length = 0;
+ u->keepalive = 1;
+
+ } else {
+ /* content length or connection close */
+
+ u->pipe->length = u->headers_in.content_length_n;
+ u->length = u->headers_in.content_length_n;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_copy_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
+{
+ ngx_buf_t *b;
+ ngx_chain_t *cl;
+ ngx_http_request_t *r;
+
+ if (buf->pos == buf->last) {
+ return NGX_OK;
+ }
+
+ if (p->free) {
+ cl = p->free;
+ b = cl->buf;
+ p->free = cl->next;
+ ngx_free_chain(p->pool, cl);
+
+ } else {
+ b = ngx_alloc_buf(p->pool);
+ if (b == NULL) {
+ return NGX_ERROR;
+ }
+ }
+
+ ngx_memcpy(b, buf, sizeof(ngx_buf_t));
+ b->shadow = buf;
+ b->tag = p->tag;
+ b->last_shadow = 1;
+ b->recycled = 1;
+ buf->shadow = b;
+
+ cl = ngx_alloc_chain_link(p->pool);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ cl->buf = b;
+ cl->next = NULL;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
+
+ if (p->in) {
+ *p->last_in = cl;
+ } else {
+ p->in = cl;
+ }
+ p->last_in = &cl->next;
+
+ if (p->length == -1) {
+ return NGX_OK;
+ }
+
+ p->length -= b->last - b->pos;
+
+ if (p->length == 0) {
+ r = p->input_ctx;
+ p->upstream_done = 1;
+ r->upstream->keepalive = 1;
+
+ } else if (p->length < 0) {
+ r = p->input_ctx;
+ p->upstream_done = 1;
+
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "upstream sent too many data");
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_inline ngx_int_t
+ngx_http_proxy_parse_chunked(ngx_http_request_t *r, ngx_buf_t *buf)
+{
+ u_char *pos, ch, c;
+ ngx_int_t rc;
+ ngx_http_proxy_ctx_t *ctx;
+ enum {
+ sw_chunk_start = 0,
+ sw_chunk_size,
+ sw_chunk_extension,
+ sw_chunk_extension_almost_done,
+ sw_chunk_data,
+ sw_after_data,
+ sw_after_data_almost_done,
+ sw_last_chunk_extension,
+ sw_last_chunk_extension_almost_done,
+ sw_trailer,
+ sw_trailer_almost_done,
+ sw_trailer_header,
+ sw_trailer_header_almost_done
+ } state;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+ state = ctx->state;
+
+ if (state == sw_chunk_data && ctx->size == 0) {
+ state = sw_after_data;
+ }
+
+ rc = NGX_AGAIN;
+
+ for (pos = buf->pos; pos < buf->last; pos++) {
+
+ ch = *pos;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy chunked byte: %02Xd s:%d", ch, state);
+
+ switch (state) {
+
+ case sw_chunk_start:
+ if (ch >= '0' && ch <= '9') {
+ state = sw_chunk_size;
+ ctx->size = ch - '0';
+ break;
+ }
+
+ c = (u_char) (ch | 0x20);
+
+ if (c >= 'a' && c <= 'f') {
+ state = sw_chunk_size;
+ ctx->size = c - 'a' + 10;
+ break;
+ }
+
+ goto invalid;
+
+ case sw_chunk_size:
+ if (ch >= '0' && ch <= '9') {
+ ctx->size = ctx->size * 16 + (ch - '0');
+ break;
+ }
+
+ c = (u_char) (ch | 0x20);
+
+ if (c >= 'a' && c <= 'f') {
+ ctx->size = ctx->size * 16 + (c - 'a' + 10);
+ break;
+ }
+
+ if (ctx->size == 0) {
+
+ switch (ch) {
+ case CR:
+ state = sw_last_chunk_extension_almost_done;
+ break;
+ case LF:
+ state = sw_trailer;
+ break;
+ case ';':
+ state = sw_last_chunk_extension;
+ break;
+ default:
+ goto invalid;
+ }
+
+ break;
+ }
+
+ switch (ch) {
+ case CR:
+ state = sw_chunk_extension_almost_done;
+ break;
+ case LF:
+ state = sw_chunk_data;
+ break;
+ case ';':
+ state = sw_chunk_extension;
+ break;
+ default:
+ goto invalid;
+ }
+
+ break;
+
+ case sw_chunk_extension:
+ switch (ch) {
+ case CR:
+ state = sw_chunk_extension_almost_done;
+ break;
+ case LF:
+ state = sw_chunk_data;
+ }
+ break;
+
+ case sw_chunk_extension_almost_done:
+ if (ch == LF) {
+ state = sw_chunk_data;
+ break;
+ }
+ goto invalid;
+
+ case sw_chunk_data:
+ rc = NGX_OK;
+ goto data;
+
+ case sw_after_data:
+ switch (ch) {
+ case CR:
+ state = sw_after_data_almost_done;
+ break;
+ case LF:
+ state = sw_chunk_start;
+ }
+ break;
+
+ case sw_after_data_almost_done:
+ if (ch == LF) {
+ state = sw_chunk_start;
+ break;
+ }
+ goto invalid;
+
+ case sw_last_chunk_extension:
+ switch (ch) {
+ case CR:
+ state = sw_last_chunk_extension_almost_done;
+ break;
+ case LF:
+ state = sw_trailer;
+ }
+ break;
+
+ case sw_last_chunk_extension_almost_done:
+ if (ch == LF) {
+ state = sw_trailer;
+ break;
+ }
+ goto invalid;
+
+ case sw_trailer:
+ switch (ch) {
+ case CR:
+ state = sw_trailer_almost_done;
+ break;
+ case LF:
+ goto done;
+ default:
+ state = sw_trailer_header;
+ }
+ break;
+
+ case sw_trailer_almost_done:
+ if (ch == LF) {
+ goto done;
+ }
+ goto invalid;
+
+ case sw_trailer_header:
+ switch (ch) {
+ case CR:
+ state = sw_trailer_header_almost_done;
+ break;
+ case LF:
+ state = sw_trailer;
+ }
+ break;
+
+ case sw_trailer_header_almost_done:
+ if (ch == LF) {
+ state = sw_trailer;
+ break;
+ }
+ goto invalid;
+
+ }
+ }
+
+data:
+
+ ctx->state = state;
+ buf->pos = pos;
+
+ switch (state) {
+
+ case sw_chunk_start:
+ ctx->length = 3 /* "0" LF LF */;
+ break;
+ case sw_chunk_size:
+ ctx->length = 2 /* LF LF */
+ + (ctx->size ? ctx->size + 4 /* LF "0" LF LF */ : 0);
+ break;
+ case sw_chunk_extension:
+ case sw_chunk_extension_almost_done:
+ ctx->length = 1 /* LF */ + ctx->size + 4 /* LF "0" LF LF */;
+ break;
+ case sw_chunk_data:
+ ctx->length = ctx->size + 4 /* LF "0" LF LF */;
+ break;
+ case sw_after_data:
+ case sw_after_data_almost_done:
+ ctx->length = 4 /* LF "0" LF LF */;
+ break;
+ case sw_last_chunk_extension:
+ case sw_last_chunk_extension_almost_done:
+ ctx->length = 2 /* LF LF */;
+ break;
+ case sw_trailer:
+ case sw_trailer_almost_done:
+ ctx->length = 1 /* LF */;
+ break;
+ case sw_trailer_header:
+ case sw_trailer_header_almost_done:
+ ctx->length = 2 /* LF LF */;
+ break;
+
+ }
+
+ return rc;
+
+done:
+
+ return NGX_DONE;
+
+invalid:
+
+ ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+ "upstream sent invalid chunked response");
+
+ return NGX_ERROR;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
+{
+ ngx_int_t rc;
+ ngx_buf_t *b, **prev;
+ ngx_chain_t *cl;
+ ngx_http_request_t *r;
+ ngx_http_proxy_ctx_t *ctx;
+
+ if (buf->pos == buf->last) {
+ return NGX_OK;
+ }
+
+ r = p->input_ctx;
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+ b = NULL;
+ prev = &buf->shadow;
+
+ for ( ;; ) {
+
+ rc = ngx_http_proxy_parse_chunked(r, buf);
+
+ if (rc == NGX_OK) {
+
+ /* a chunk has been parsed successfully */
+
+ if (p->free) {
+ cl = p->free;
+ b = cl->buf;
+ p->free = cl->next;
+ ngx_free_chain(p->pool, cl);
+
+ } else {
+ b = ngx_alloc_buf(p->pool);
+ if (b == NULL) {
+ return NGX_ERROR;
+ }
+ }
+
+ ngx_memzero(b, sizeof(ngx_buf_t));
+
+ b->pos = buf->pos;
+ b->start = buf->start;
+ b->end = buf->end;
+ b->tag = p->tag;
+ b->temporary = 1;
+ b->recycled = 1;
+
+ *prev = b;
+ prev = &b->shadow;
+
+ cl = ngx_alloc_chain_link(p->pool);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ cl->buf = b;
+ cl->next = NULL;
+
+ if (p->in) {
+ *p->last_in = cl;
+ } else {
+ p->in = cl;
+ }
+ p->last_in = &cl->next;
+
+ /* STUB */ b->num = buf->num;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "input buf #%d %p", b->num, b->pos);
+
+ if (buf->last - buf->pos >= ctx->size) {
+
+ buf->pos += ctx->size;
+ b->last = buf->pos;
+ ctx->size = 0;
+
+ continue;
+ }
+
+ ctx->size -= buf->last - buf->pos;
+ buf->pos = buf->last;
+ b->last = buf->last;
+
+ continue;
+ }
+
+ if (rc == NGX_DONE) {
+
+ /* a whole response has been parsed successfully */
+
+ p->upstream_done = 1; /* or p->length = 0; ? */
+ r->upstream->keepalive = 1;
+
+ break;
+ }
+
+ if (rc == NGX_AGAIN) {
+
+ /* set p->length, minimal amount of data we want to see */
+
+ p->length = ctx->length;
+
+ break;
+ }
+
+ /* invalid response */
+
+ ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+ "upstream sent invalid chunked response");
+
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy chunked state %d, length %d",
+ ctx->state, p->length);
+
+ if (b) {
+ b->shadow = buf;
+ b->last_shadow = 1;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "input buf %p %z", b->pos, b->last - b->pos);
+
+ return NGX_OK;
+ }
+
+ /* there is no data record in the buf, add it to free chain */
+
+ if (ngx_event_pipe_add_free_buf(p, buf) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_non_buffered_copy_filter(void *data, ssize_t bytes)
+{
+ ngx_http_request_t *r = data;
+
+ ngx_buf_t *b;
+ ngx_chain_t *cl, **ll;
+ ngx_http_upstream_t *u;
+
+ u = r->upstream;
+
+ for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
+ ll = &cl->next;
+ }
+
+ cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ *ll = cl;
+
+ cl->buf->flush = 1;
+ cl->buf->memory = 1;
+
+ b = &u->buffer;
+
+ cl->buf->pos = b->last;
+ b->last += bytes;
+ cl->buf->last = b->last;
+ cl->buf->tag = u->output.tag;
+
+ if (u->length == -1) {
+ return NGX_OK;
+ }
+
+ u->length -= bytes;
+
+ if (u->length == 0) {
+ u->keepalive = 1;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_non_buffered_chunked_filter(void *data, ssize_t bytes)
+{
+ ngx_http_request_t *r = data;
+
+ ngx_int_t rc;
+ ngx_buf_t *b, *buf;
+ ngx_chain_t *cl, **ll;
+ ngx_http_upstream_t *u;
+ ngx_http_proxy_ctx_t *ctx;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+ u = r->upstream;
+ buf = &u->buffer;
+
+ buf->pos = buf->last;
+ buf->last += bytes;
+
+ for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
+ ll = &cl->next;
+ }
+
+ for ( ;; ) {
+
+ rc = ngx_http_proxy_parse_chunked(r, buf);
+
+ if (rc == NGX_OK) {
+
+ /* a chunk has been parsed successfully */
+
+ cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ *ll = cl;
+ ll = &cl->next;
+
+ b = cl->buf;
+
+ b->flush = 1;
+ b->memory = 1;
+
+ b->pos = buf->pos;
+ b->tag = u->output.tag;
+
+ if (buf->last - buf->pos >= ctx->size) {
+ buf->pos += ctx->size;
+ b->last = buf->pos;
+ ctx->size = 0;
+
+ } else {
+ ctx->size -= buf->last - buf->pos;
+ buf->pos = buf->last;
+ b->last = buf->last;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy out buf %p %z",
+ b->pos, b->last - b->pos);
+
+ continue;
+ }
+
+ if (rc == NGX_DONE) {
+
+ /* a whole response has been parsed successfully */
+
+ u->keepalive = 1;
+ u->length = 0;
+
+ break;
+ }
+
+ if (rc == NGX_AGAIN) {
+ break;
+ }
+
+ /* invalid response */
+
+ ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+ "upstream sent invalid chunked response");
+
+ return NGX_ERROR;
+ }
+
+ /* provide continuous buffer for subrequests in memory */
+
+ if (r->subrequest_in_memory) {
+
+ cl = u->out_bufs;
+
+ if (cl) {
+ buf->pos = cl->buf->pos;
+ }
+
+ buf->last = buf->pos;
+
+ for (cl = u->out_bufs; cl; cl = cl->next) {
+ ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy in memory %p-%p %uz",
+ cl->buf->pos, cl->buf->last, ngx_buf_size(cl->buf));
+
+ if (buf->last == cl->buf->pos) {
+ buf->last = cl->buf->last;
+ continue;
+ }
+
+ buf->last = ngx_movemem(buf->last, cl->buf->pos,
+ cl->buf->last - cl->buf->pos);
+
+ cl->buf->pos = buf->last - (cl->buf->last - cl->buf->pos);
+ cl->buf->last = buf->last;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
static void
ngx_http_proxy_abort_request(ngx_http_request_t *r)
{
@@ -1704,6 +2460,8 @@ ngx_http_proxy_create_loc_conf(ngx_conf_
conf->redirect = NGX_CONF_UNSET;
conf->upstream.change_buffering = 1;
+ conf->http_version = NGX_CONF_UNSET_UINT;
+
conf->headers_hash_max_size = NGX_CONF_UNSET_UINT;
conf->headers_hash_bucket_size = NGX_CONF_UNSET_UINT;
@@ -2005,6 +2763,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t
}
#endif
+ ngx_conf_merge_uint_value(conf->http_version, prev->http_version,
+ NGX_HTTP_VERSION_10);
+
ngx_conf_merge_uint_value(conf->headers_hash_max_size,
prev->headers_hash_max_size, 512);
diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c
--- a/src/http/ngx_http_upstream.c
+++ b/src/http/ngx_http_upstream.c
@@ -72,6 +72,8 @@ static void ngx_http_upstream_finalize_r
static ngx_int_t ngx_http_upstream_process_header_line(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
+static ngx_int_t ngx_http_upstream_process_content_length(ngx_http_request_t *r,
+ ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_process_set_cookie(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t
@@ -89,6 +91,9 @@ static ngx_int_t ngx_http_upstream_proce
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_process_charset(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
+static ngx_int_t
+ ngx_http_upstream_process_transfer_encoding(ngx_http_request_t *r,
+ ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_copy_header_line(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t
@@ -96,8 +101,6 @@ static ngx_int_t
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_copy_content_type(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
-static ngx_int_t ngx_http_upstream_copy_content_length(ngx_http_request_t *r,
- ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_copy_last_modified(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_rewrite_location(ngx_http_request_t *r,
@@ -149,9 +152,9 @@ ngx_http_upstream_header_t ngx_http_ups
ngx_http_upstream_copy_content_type, 0, 1 },
{ ngx_string("Content-Length"),
- ngx_http_upstream_process_header_line,
+ ngx_http_upstream_process_content_length,
offsetof(ngx_http_upstream_headers_in_t, content_length),
- ngx_http_upstream_copy_content_length, 0, 0 },
+ ngx_http_upstream_ignore_header_line, 0, 0 },
{ ngx_string("Date"),
ngx_http_upstream_process_header_line,
@@ -247,6 +250,10 @@ ngx_http_upstream_header_t ngx_http_ups
ngx_http_upstream_process_charset, 0,
ngx_http_upstream_copy_header_line, 0, 0 },
+ { ngx_string("Transfer-Encoding"),
+ ngx_http_upstream_process_transfer_encoding, 0,
+ ngx_http_upstream_ignore_header_line, 0, 0 },
+
#if (NGX_HTTP_GZIP)
{ ngx_string("Content-Encoding"),
ngx_http_upstream_process_header_line,
@@ -396,6 +403,8 @@ ngx_http_upstream_create(ngx_http_reques
r->cache = NULL;
#endif
+ u->headers_in.content_length_n = -1;
+
return NGX_OK;
}
@@ -798,6 +807,7 @@ ngx_http_upstream_cache_send(ngx_http_re
u->buffer.pos += c->header_start;
ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
+ u->headers_in.content_length_n = -1;
if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
sizeof(ngx_table_elt_t))
@@ -1280,7 +1290,10 @@ ngx_http_upstream_reinit(ngx_http_reques
return NGX_ERROR;
}
+ u->keepalive = 0;
+
ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
+ u->headers_in.content_length_n = -1;
if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
sizeof(ngx_table_elt_t))
@@ -1922,14 +1935,9 @@ ngx_http_upstream_process_headers(ngx_ht
r->headers_out.status = u->headers_in.status_n;
r->headers_out.status_line = u->headers_in.status_line;
- u->headers_in.content_length_n = r->headers_out.content_length_n;
-
- if (r->headers_out.content_length_n != -1) {
- u->length = (size_t) r->headers_out.content_length_n;
-
- } else {
- u->length = NGX_MAX_SIZE_T_VALUE;
- }
+ r->headers_out.content_length_n = u->headers_in.content_length_n;
+
+ u->length = u->headers_in.content_length_n;
return NGX_OK;
}
@@ -1993,6 +2001,11 @@ ngx_http_upstream_process_body_in_memory
}
}
+ if (u->length == 0) {
+ ngx_http_upstream_finalize_request(r, u, 0);
+ return;
+ }
+
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
@@ -2294,6 +2307,15 @@ ngx_http_upstream_send_response(ngx_http
u->read_event_handler = ngx_http_upstream_process_upstream;
r->write_event_handler = ngx_http_upstream_process_downstream;
+ p->length = -1;
+
+ if (u->input_filter_init
+ && u->input_filter_init(p->input_ctx) != NGX_OK)
+ {
+ ngx_http_upstream_finalize_request(r, u, 0);
+ return;
+ }
+
ngx_http_upstream_process_upstream(r, u);
}
@@ -2386,6 +2408,10 @@ ngx_http_upstream_process_non_buffered_r
if (u->busy_bufs == NULL) {
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http upstream non buffered length:%O",
+ u->length);
+
if (u->length == 0
|| upstream->read->eof
|| upstream->read->error)
@@ -2401,10 +2427,6 @@ ngx_http_upstream_process_non_buffered_r
size = b->end - b->last;
- if (size > u->length) {
- size = u->length;
- }
-
if (size && upstream->read->ready) {
n = upstream->recv(upstream, b->last, size);
@@ -2501,12 +2523,16 @@ ngx_http_upstream_non_buffered_filter(vo
cl->buf->last = b->last;
cl->buf->tag = u->output.tag;
- if (u->length == NGX_MAX_SIZE_T_VALUE) {
+ if (u->length == -1) {
return NGX_OK;
}
u->length -= bytes;
+ if (u->length == 0) {
+ u->keepalive = 1;
+ }
+
return NGX_OK;
}
@@ -3057,6 +3083,21 @@ ngx_http_upstream_ignore_header_line(ngx
static ngx_int_t
+ngx_http_upstream_process_content_length(ngx_http_request_t *r,
+ ngx_table_elt_t *h, ngx_uint_t offset)
+{
+ ngx_http_upstream_t *u;
+
+ u = r->upstream;
+
+ u->headers_in.content_length = h;
+ u->headers_in.content_length_n = ngx_atoof(h->value.data, h->value.len);
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
ngx_http_upstream_process_set_cookie(ngx_http_request_t *r, ngx_table_elt_t *h,
ngx_uint_t offset)
{
@@ -3318,6 +3359,23 @@ ngx_http_upstream_process_charset(ngx_ht
static ngx_int_t
+ngx_http_upstream_process_transfer_encoding(ngx_http_request_t *r,
+ ngx_table_elt_t *h, ngx_uint_t offset)
+{
+ r->upstream->headers_in.transfer_encoding = h;
+
+ if (ngx_strlcasestrn(h->value.data, h->value.data + h->value.len,
+ (u_char *) "chunked", 7 - 1)
+ != NULL)
+ {
+ r->upstream->headers_in.chunked = 1;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
ngx_http_upstream_copy_header_line(ngx_http_request_t *r, ngx_table_elt_t *h,
ngx_uint_t offset)
{
@@ -3425,26 +3483,6 @@ ngx_http_upstream_copy_content_type(ngx_
static ngx_int_t
-ngx_http_upstream_copy_content_length(ngx_http_request_t *r, ngx_table_elt_t *h,
- ngx_uint_t offset)
-{
- ngx_table_elt_t *ho;
-
- ho = ngx_list_push(&r->headers_out.headers);
- if (ho == NULL) {
- return NGX_ERROR;
- }
-
- *ho = *h;
-
- r->headers_out.content_length = ho;
- r->headers_out.content_length_n = ngx_atoof(h->value.data, h->value.len);
-
- return NGX_OK;
-}
-
-
-static ngx_int_t
ngx_http_upstream_copy_last_modified(ngx_http_request_t *r, ngx_table_elt_t *h,
ngx_uint_t offset)
{
diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h
--- a/src/http/ngx_http_upstream.h
+++ b/src/http/ngx_http_upstream.h
@@ -216,6 +216,7 @@ typedef struct {
ngx_table_elt_t *location;
ngx_table_elt_t *accept_ranges;
ngx_table_elt_t *www_authenticate;
+ ngx_table_elt_t *transfer_encoding;
#if (NGX_HTTP_GZIP)
ngx_table_elt_t *content_encoding;
@@ -224,6 +225,8 @@ typedef struct {
off_t content_length_n;
ngx_array_t cache_control;
+
+ unsigned chunked:1;
} ngx_http_upstream_headers_in_t;
@@ -266,7 +269,7 @@ struct ngx_http_upstream_s {
ngx_http_upstream_resolved_t *resolved;
ngx_buf_t buffer;
- size_t length;
+ off_t length;
ngx_chain_t *out_bufs;
ngx_chain_t *busy_bufs;
@@ -307,6 +310,7 @@ struct ngx_http_upstream_s {
#endif
unsigned buffering:1;
+ unsigned keepalive:1;
unsigned request_sent:1;
unsigned header_sent:1;
More information about the nginx-devel
mailing list