[njs] Fetch: added keepalive support for ngx.fetch() API.
noreply at nginx.com
noreply at nginx.com
Tue Sep 16 23:08:02 UTC 2025
details: https://github.com/nginx/njs/commit/7b3c8a66879dc0d5250bfeb53b2fbc85d15da637
branches: master
commit: 7b3c8a66879dc0d5250bfeb53b2fbc85d15da637
user: Dmitry Volyntsev <xeioex at nginx.com>
date: Wed, 3 Sep 2025 20:27:16 -0700
description:
Fetch: added keepalive support for ngx.fetch() API.
This closes #957 feature request on Github.
---
nginx/ngx_http_js_module.c | 28 +++
nginx/ngx_js.c | 17 ++
nginx/ngx_js.h | 10 +-
nginx/ngx_js_fetch.c | 30 ++-
nginx/ngx_js_http.c | 390 ++++++++++++++++++++++++++++++++----
nginx/ngx_js_http.h | 12 +-
nginx/ngx_qjs_fetch.c | 30 ++-
nginx/ngx_stream_js_module.c | 28 +++
nginx/t/js_fetch_https_keepalive.t | 345 +++++++++++++++++++++++++++++++
nginx/t/js_fetch_keepalive.t | 289 ++++++++++++++++++++++++++
nginx/t/stream_js_fetch_keepalive.t | 200 ++++++++++++++++++
11 files changed, 1309 insertions(+), 70 deletions(-)
diff --git a/nginx/ngx_http_js_module.c b/nginx/ngx_http_js_module.c
index 736635cd..f9d9721a 100644
--- a/nginx/ngx_http_js_module.c
+++ b/nginx/ngx_http_js_module.c
@@ -565,6 +565,34 @@ static ngx_command_t ngx_http_js_commands[] = {
0,
NULL },
+ { ngx_string("js_fetch_keepalive"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_js_loc_conf_t, fetch_keepalive),
+ NULL },
+
+ { ngx_string("js_fetch_keepalive_requests"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_js_loc_conf_t, fetch_keepalive_requests),
+ NULL },
+
+ { ngx_string("js_fetch_keepalive_time"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_msec_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_js_loc_conf_t, fetch_keepalive_time),
+ NULL },
+
+ { ngx_string("js_fetch_keepalive_timeout"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_msec_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_js_loc_conf_t, fetch_keepalive_timeout),
+ NULL },
+
ngx_null_command
};
diff --git a/nginx/ngx_js.c b/nginx/ngx_js.c
index c385e16e..efef3467 100644
--- a/nginx/ngx_js.c
+++ b/nginx/ngx_js.c
@@ -10,6 +10,7 @@
#include <ngx_core.h>
#include <math.h>
#include "ngx_js.h"
+#include "ngx_js_http.h"
typedef struct {
@@ -3986,6 +3987,11 @@ ngx_js_create_conf(ngx_conf_t *cf, size_t size)
conf->max_response_body_size = NGX_CONF_UNSET_SIZE;
conf->timeout = NGX_CONF_UNSET_MSEC;
+ conf->fetch_keepalive = NGX_CONF_UNSET_UINT;
+ conf->fetch_keepalive_requests = NGX_CONF_UNSET_UINT;
+ conf->fetch_keepalive_time = NGX_CONF_UNSET_MSEC;
+ conf->fetch_keepalive_timeout = NGX_CONF_UNSET_MSEC;
+
return conf;
}
@@ -4097,6 +4103,17 @@ ngx_js_merge_conf(ngx_conf_t *cf, void *parent, void *child,
ngx_conf_merge_size_value(conf->max_response_body_size,
prev->max_response_body_size, 1048576);
+ ngx_conf_merge_uint_value(conf->fetch_keepalive, prev->fetch_keepalive, 0);
+ ngx_conf_merge_uint_value(conf->fetch_keepalive_requests,
+ prev->fetch_keepalive_requests, 1000);
+ ngx_conf_merge_msec_value(conf->fetch_keepalive_time,
+ prev->fetch_keepalive_time, 3600000);
+ ngx_conf_merge_msec_value(conf->fetch_keepalive_timeout,
+ prev->fetch_keepalive_timeout, 60000);
+
+ ngx_queue_init(&conf->fetch_keepalive_cache);
+ ngx_queue_init(&conf->fetch_keepalive_free);
+
if (ngx_js_merge_vm(cf, (ngx_js_loc_conf_t *) conf,
(ngx_js_loc_conf_t *) prev,
init_vm)
diff --git a/nginx/ngx_js.h b/nginx/ngx_js.h
index 257227e5..af19e007 100644
--- a/nginx/ngx_js.h
+++ b/nginx/ngx_js.h
@@ -13,6 +13,7 @@
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>
+#include <ngx_event_connect.h>
#include <njs.h>
#include <njs_rbtree.h>
#include <njs_arr.h>
@@ -133,7 +134,14 @@ typedef struct {
\
size_t buffer_size; \
size_t max_response_body_size; \
- ngx_msec_t timeout
+ ngx_msec_t timeout; \
+ \
+ ngx_uint_t fetch_keepalive; \
+ ngx_uint_t fetch_keepalive_requests; \
+ ngx_msec_t fetch_keepalive_time; \
+ ngx_msec_t fetch_keepalive_timeout; \
+ ngx_queue_t fetch_keepalive_cache; \
+ ngx_queue_t fetch_keepalive_free
#if defined(NGX_HTTP_SSL) || defined(NGX_STREAM_SSL)
diff --git a/nginx/ngx_js_fetch.c b/nginx/ngx_js_fetch.c
index 1c74bde6..ac1c1a27 100644
--- a/nginx/ngx_js_fetch.c
+++ b/nginx/ngx_js_fetch.c
@@ -7,10 +7,6 @@
*/
-#include <ngx_config.h>
-#include <ngx_core.h>
-#include <ngx_event.h>
-#include <ngx_event_connect.h>
#include "ngx_js.h"
#include "ngx_js_http.h"
@@ -550,6 +546,13 @@ ngx_js_ext_fetch(njs_vm_t *vm, njs_value_t *args, njs_uint_t nargs,
goto fail;
}
+ if (u.host.len >= NGX_JS_HOST_MAX_LEN) {
+ njs_vm_error(vm, "Host name too long");
+ goto fail;
+ }
+
+ http->host = u.host;
+ http->port = u.port;
http->response.url = request.url;
http->buffer_size = http->conf->buffer_size;
http->max_response_body_size = http->conf->max_response_body_size;
@@ -681,18 +684,22 @@ ngx_js_ext_fetch(njs_vm_t *vm, njs_value_t *args, njs_uint_t nargs,
continue;
}
+ if (h[i].key.len == 10
+ && ngx_strncasecmp(h[i].key.data, (u_char *) "Connection", 10)
+ == 0)
+ {
+ continue;
+ }
+
njs_chb_append(&http->chain, h[i].key.data, h[i].key.len);
njs_chb_append_literal(&http->chain, ": ");
njs_chb_append(&http->chain, h[i].value.data, h[i].value.len);
njs_chb_append_literal(&http->chain, CRLF);
}
- njs_chb_append_literal(&http->chain, "Connection: close" CRLF);
-
-#if (NGX_SSL)
- http->tls_name.data = u.host.data;
- http->tls_name.len = u.host.len;
-#endif
+ if (!http->keepalive) {
+ njs_chb_append_literal(&http->chain, "Connection: close" CRLF);
+ }
if (request.body.len != 0) {
njs_chb_sprintf(&http->chain, 32, "Content-Length: %uz" CRLF CRLF,
@@ -1154,7 +1161,8 @@ ngx_js_fetch_alloc(njs_vm_t *vm, ngx_pool_t *pool, ngx_log_t *log,
http->log = log;
http->conf = conf;
- http->http_parse.content_length_n = -1;
+ http->content_length_n = -1;
+ http->keepalive = (conf->fetch_keepalive > 0);
http->append_headers = ngx_js_fetch_append_headers;
http->ready_handler = ngx_js_fetch_process_done;
diff --git a/nginx/ngx_js_http.c b/nginx/ngx_js_http.c
index 3f52868e..f07ccceb 100644
--- a/nginx/ngx_js_http.c
+++ b/nginx/ngx_js_http.c
@@ -7,19 +7,35 @@
*/
-#include <ngx_config.h>
-#include <ngx_core.h>
-#include <ngx_event.h>
-#include <ngx_event_connect.h>
#include "ngx_js.h"
#include "ngx_js_http.h"
+typedef struct {
+ ngx_js_loc_conf_t *conf;
+ ngx_queue_t queue;
+ ngx_connection_t *connection;
+
+ ngx_flag_t ssl;
+ size_t host_len;
+ u_char host[NGX_JS_HOST_MAX_LEN];
+ in_port_t port;
+} ngx_js_http_keepalive_cache_t;
+
+
+#define ngx_js_http_version(major, minor) ((major) * 1000 + (minor))
+
+
static void ngx_js_http_resolve_handler(ngx_resolver_ctx_t *ctx);
static void ngx_js_http_next(ngx_js_http_t *http);
static void ngx_js_http_write_handler(ngx_event_t *wev);
static void ngx_js_http_read_handler(ngx_event_t *rev);
static void ngx_js_http_dummy_handler(ngx_event_t *ev);
+static void ngx_js_http_keepalive_close_handler(ngx_event_t *ev);
+static void ngx_js_http_keepalive_dummy_handler(ngx_event_t *ev);
+
+static ngx_int_t ngx_js_http_get_keepalive_connection(ngx_js_http_t *http);
+static ngx_int_t ngx_js_http_free_keepalive_connection(ngx_js_http_t *http);
static ngx_int_t ngx_js_http_process_status_line(ngx_js_http_t *http);
static ngx_int_t ngx_js_http_process_headers(ngx_js_http_t *http);
@@ -177,12 +193,13 @@ failed:
static void
ngx_js_http_close_connection(ngx_connection_t *c)
{
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "js http close connection: %d", c->fd);
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "js http close connection: %p:%d", c, c->fd);
#if (NGX_SSL)
if (c->ssl) {
c->ssl->no_wait_shutdown = 1;
+ c->ssl->no_send_shutdown = 1;
if (ngx_ssl_shutdown(c) == NGX_AGAIN) {
c->ssl->handler = ngx_js_http_close_connection;
@@ -193,6 +210,7 @@ ngx_js_http_close_connection(ngx_connection_t *c)
c->destroyed = 1;
+ ngx_destroy_pool(c->pool);
ngx_close_connection(c);
}
@@ -210,18 +228,32 @@ ngx_js_http_resolve_done(ngx_js_http_t *http)
void
ngx_js_http_close_peer(ngx_js_http_t *http)
{
- if (http->peer.connection != NULL) {
+ if (http->peer.connection == NULL) {
+ return;
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, http->log, 0,
+ "js http close peer");
+
+ if (http->keepalive) {
+ if (ngx_js_http_free_keepalive_connection(http) != NGX_OK) {
+ ngx_js_http_close_connection(http->peer.connection);
+ }
+
+ } else {
ngx_js_http_close_connection(http->peer.connection);
- http->peer.connection = NULL;
}
+
+ http->peer.connection = NULL;
}
void
ngx_js_http_connect(ngx_js_http_t *http)
{
- ngx_int_t rc;
- ngx_addr_t *addr;
+ ngx_int_t rc;
+ ngx_addr_t *addr;
+ ngx_connection_t *c;
addr = &http->addrs[http->naddr];
@@ -235,38 +267,51 @@ ngx_js_http_connect(ngx_js_http_t *http)
http->peer.log = http->log;
http->peer.log_error = NGX_ERROR_ERR;
- rc = ngx_event_connect_peer(&http->peer);
+ rc = ngx_js_http_get_keepalive_connection(http);
+ if (rc != NGX_OK) {
+ rc = ngx_event_connect_peer(&http->peer);
+ if (rc == NGX_ERROR) {
+ ngx_js_http_error(http, "connect failed");
+ return;
+ }
- if (rc == NGX_ERROR) {
- ngx_js_http_error(http, "connect failed");
- return;
+ if (rc == NGX_BUSY || rc == NGX_DECLINED) {
+ ngx_js_http_next(http);
+ return;
+ }
}
- if (rc == NGX_BUSY || rc == NGX_DECLINED) {
- ngx_js_http_next(http);
- return;
- }
+ c = http->peer.connection;
- http->peer.connection->data = http;
- http->peer.connection->pool = http->pool;
+ c->requests++;
+ c->data = http;
- http->peer.connection->write->handler = ngx_js_http_write_handler;
- http->peer.connection->read->handler = ngx_js_http_read_handler;
+ if (c->pool == NULL) {
+ /* we need separate pool here to be able to cache SSL connections */
+ c->pool = ngx_create_pool(128, http->log);
+ if (c->pool == NULL) {
+ ngx_js_http_error(http, "create pool failed");
+ return;
+ }
+ }
+
+ c->write->handler = ngx_js_http_write_handler;
+ c->read->handler = ngx_js_http_read_handler;
http->process = ngx_js_http_process_status_line;
- ngx_add_timer(http->peer.connection->read, http->conf->timeout);
- ngx_add_timer(http->peer.connection->write, http->conf->timeout);
+ ngx_add_timer(c->read, http->conf->timeout);
+ ngx_add_timer(c->write, http->conf->timeout);
#if (NGX_SSL)
- if (http->ssl != NULL && http->peer.connection->ssl == NULL) {
+ if (http->ssl != NULL && c->ssl == NULL) {
ngx_js_http_ssl_init_connection(http);
return;
}
#endif
if (rc == NGX_OK) {
- ngx_js_http_write_handler(http->peer.connection->write);
+ ngx_js_http_write_handler(c->write);
}
}
@@ -346,10 +391,10 @@ ngx_js_http_ssl_handshake(ngx_js_http_t *http)
goto failed;
}
- if (ngx_ssl_check_host(c, &http->tls_name) != NGX_OK) {
+ if (ngx_ssl_check_host(c, &http->host) != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"js http SSL certificate does not match \"%V\"",
- &http->tls_name);
+ &http->host);
goto failed;
}
}
@@ -380,7 +425,7 @@ ngx_js_http_ssl_name(ngx_js_http_t *http)
u_char *p;
/* as per RFC 6066, literal IPv4 and IPv6 addresses are not permitted */
- ngx_str_t *name = &http->tls_name;
+ ngx_str_t *name = &http->host;
if (name->len == 0 || *name->data == '[') {
goto done;
@@ -571,6 +616,10 @@ ngx_js_http_read_handler(ngx_event_t *rev)
return;
}
+ if (rc == NGX_DONE) {
+ break;
+ }
+
continue;
}
@@ -631,6 +680,10 @@ ngx_js_http_process_status_line(ngx_js_http_t *http)
http->response.status_text.len = hp->status_text_end - hp->status_text;
http->process = ngx_js_http_process_headers;
+ if (http->keepalive) {
+ http->keepalive = (hp->http_version >= ngx_js_http_version(1, 1));
+ }
+
return http->process(http);
}
@@ -694,21 +747,33 @@ ngx_js_http_process_headers(ngx_js_http_t *http)
&& ngx_strncasecmp(hp->header_start, (u_char *) "chunked",
vlen) == 0)
{
- hp->chunked = 1;
+ http->chunked = 1;
+ }
+
+ if (len == (sizeof("Connection") - 1)
+ && ngx_strncasecmp(hp->header_name_start,
+ (u_char *) "Connection", len) == 0)
+ {
+ if (vlen == (sizeof("close") - 1)
+ && ngx_strncasecmp(hp->header_start, (u_char *) "close",
+ vlen) == 0)
+ {
+ http->keepalive = 0;
+ }
}
if (len == (sizeof("Content-Length") - 1)
&& ngx_strncasecmp(hp->header_name_start,
(u_char *) "Content-Length", len) == 0)
{
- hp->content_length_n = ngx_atoof(hp->header_start, vlen);
- if (hp->content_length_n == NGX_ERROR) {
+ http->content_length_n = ngx_atoof(hp->header_start, vlen);
+ if (http->content_length_n == NGX_ERROR) {
ngx_js_http_error(http, "invalid http content length");
return NGX_ERROR;
}
if (!http->header_only
- && hp->content_length_n
+ && http->content_length_n
> (off_t) http->max_response_body_size)
{
ngx_js_http_error(http,
@@ -762,22 +827,22 @@ ngx_js_http_process_body(ngx_js_http_t *http)
}
if (!http->header_only
- && http->http_parse.chunked
- && http->http_parse.content_length_n == -1)
+ && http->chunked
+ && http->content_length_n == -1)
{
ngx_js_http_error(http, "invalid http chunked response");
return NGX_ERROR;
}
if (http->header_only
- || http->http_parse.content_length_n == -1
- || size == http->http_parse.content_length_n)
+ || http->content_length_n == -1
+ || size == http->content_length_n)
{
http->ready_handler(http);
return NGX_DONE;
}
- if (size < http->http_parse.content_length_n) {
+ if (size < http->content_length_n) {
return NGX_AGAIN;
}
@@ -787,7 +852,7 @@ ngx_js_http_process_body(ngx_js_http_t *http)
b = http->buffer;
- if (http->http_parse.chunked) {
+ if (http->chunked) {
rc = ngx_js_http_parse_chunked(&http->http_chunk_parse, b,
&http->response.chain);
if (rc == NGX_ERROR) {
@@ -798,7 +863,7 @@ ngx_js_http_process_body(ngx_js_http_t *http)
size = njs_chb_size(&http->response.chain);
if (rc == NGX_OK) {
- http->http_parse.content_length_n = size;
+ http->content_length_n = size;
}
if (size > http->max_response_body_size * 10) {
@@ -814,11 +879,11 @@ ngx_js_http_process_body(ngx_js_http_t *http)
if (http->header_only) {
need = 0;
- } else if (http->http_parse.content_length_n == -1) {
+ } else if (http->content_length_n == -1) {
need = http->max_response_body_size - size;
} else {
- need = http->http_parse.content_length_n - size;
+ need = http->content_length_n - size;
}
chsize = ngx_min(need, b->last - b->pos);
@@ -1074,7 +1139,7 @@ done:
b->pos = p + 1;
hp->state = sw_start;
- hp->http_version = hp->http_major * 1000 + hp->http_minor;
+ hp->http_version = ngx_js_http_version(hp->http_major, hp->http_minor);
return NGX_OK;
}
@@ -1549,3 +1614,242 @@ ngx_js_check_header_name(u_char *name, size_t len)
return NGX_OK;
}
+
+
+static ngx_int_t
+ngx_js_http_get_keepalive_connection(ngx_js_http_t *http)
+{
+ ngx_str_t *host;
+ ngx_queue_t *q;
+ ngx_connection_t *c;
+ ngx_js_loc_conf_t *conf;
+ ngx_js_http_keepalive_cache_t *cache;
+
+ if (!http->keepalive) {
+ return NGX_DECLINED;
+ }
+
+ conf = http->conf;
+
+ host = &http->host;
+
+ for (q = ngx_queue_head(&conf->fetch_keepalive_cache);
+ q != ngx_queue_sentinel(&conf->fetch_keepalive_cache);
+ q = ngx_queue_next(q))
+ {
+ cache = ngx_queue_data(q, ngx_js_http_keepalive_cache_t, queue);
+
+ if (host->len != cache->host_len) {
+ continue;
+ }
+
+ if ((http->ssl != NULL) != (cache->ssl != 0)) {
+ continue;
+ }
+
+ if (ngx_strncasecmp(host->data, cache->host, host->len) != 0) {
+ continue;
+ }
+
+ if (http->port != cache->port) {
+ continue;
+ }
+
+ c = cache->connection;
+ ngx_queue_remove(q);
+ ngx_queue_insert_head(&conf->fetch_keepalive_free, q);
+
+ goto found;
+ }
+
+ return NGX_DECLINED;
+
+found:
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, http->log, 0,
+ "js http keepalive using cached connection: %p:%d",
+ c, c->fd);
+
+ c->idle = 0;
+ c->sent = 0;
+ c->data = NULL;
+ c->log = http->log;
+ c->pool->log = http->log;
+ c->read->log = http->log;
+ c->write->log = http->log;
+
+ if (c->read->timer_set) {
+ ngx_del_timer(c->read);
+ }
+
+ http->peer.cached = 1;
+ http->peer.connection = c;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_js_http_free_keepalive_connection(ngx_js_http_t *http)
+{
+ ngx_uint_t i;
+ ngx_queue_t *q;
+ ngx_connection_t *c;
+ ngx_js_loc_conf_t *conf;
+ ngx_js_http_keepalive_cache_t *cache;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, http->log, 0,
+ "js http free keepalive connection");
+
+ c = http->peer.connection;
+
+ if (c == NULL
+ || c->read->eof
+ || c->read->error
+ || c->read->timedout
+ || c->write->error
+ || c->write->timedout)
+ {
+ return NGX_ERROR;
+ }
+
+ if (c->requests >= http->conf->fetch_keepalive_requests) {
+ return NGX_DONE;
+ }
+
+ if (ngx_current_msec - c->start_time > http->conf->fetch_keepalive_time) {
+ return NGX_DONE;
+ }
+
+ if (ngx_terminate || ngx_exiting) {
+ return NGX_DONE;
+ }
+
+ if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, http->log, 0,
+ "js http free keepalive connection, "
+ "saving connection: %p:%d", c, c->fd);
+
+ conf = http->conf;
+
+ if (ngx_queue_empty(&conf->fetch_keepalive_cache)
+ && ngx_queue_empty(&conf->fetch_keepalive_free))
+ {
+ cache = ngx_pcalloc(ngx_cycle->pool,
+ sizeof(ngx_js_http_keepalive_cache_t) * conf->fetch_keepalive);
+ if (cache == NULL) {
+ return NGX_ERROR;
+ }
+
+ for (i = 0; i < conf->fetch_keepalive; i++) {
+ ngx_queue_insert_head(&conf->fetch_keepalive_free,
+ &cache[i].queue);
+ cache[i].conf = conf;
+ }
+ }
+
+ if (ngx_queue_empty(&conf->fetch_keepalive_free)) {
+ /* evict from cache */
+ q = ngx_queue_last(&conf->fetch_keepalive_cache);
+ ngx_queue_remove(q);
+
+ cache = ngx_queue_data(q, ngx_js_http_keepalive_cache_t, queue);
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, http->log, 0,
+ "js http free keepalive connection, evicting: %d",
+ cache->connection->fd);
+ ngx_js_http_close_connection(cache->connection);
+
+ } else {
+ q = ngx_queue_head(&conf->fetch_keepalive_free);
+ ngx_queue_remove(q);
+ cache = ngx_queue_data(q, ngx_js_http_keepalive_cache_t, queue);
+ }
+
+ ngx_queue_insert_head(&conf->fetch_keepalive_cache, q);
+
+ c = http->peer.connection;
+ http->peer.connection = NULL;
+
+ cache->connection = c;
+
+ cache->ssl = (http->ssl != NULL);
+ ngx_memcpy(cache->host, http->host.data, http->host.len);
+ cache->host_len = http->host.len;
+ cache->port = http->port;
+
+ c->read->delayed = 0;
+ ngx_add_timer(c->read, conf->fetch_keepalive_timeout);
+
+ if (c->write->timer_set) {
+ ngx_del_timer(c->write);
+ }
+
+ c->data = cache;
+ c->write->handler = ngx_js_http_keepalive_dummy_handler;
+ c->read->handler = ngx_js_http_keepalive_close_handler;
+
+ c->idle = 1;
+ c->log = ngx_cycle->log;
+ c->pool->log = ngx_cycle->log;
+ c->read->log = ngx_cycle->log;
+ c->write->log = ngx_cycle->log;
+
+ if (c->read->ready) {
+ ngx_js_http_keepalive_close_handler(c->read);
+ }
+
+ return NGX_OK;
+}
+
+
+static void
+ngx_js_http_keepalive_dummy_handler(ngx_event_t *ev)
+{
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "js http keepalive dummy handler");
+}
+
+
+static void
+ngx_js_http_keepalive_close_handler(ngx_event_t *ev)
+{
+ ssize_t n;
+ ngx_connection_t *c;
+ ngx_js_loc_conf_t *conf;
+ ngx_js_http_keepalive_cache_t *cache;
+ u_char buf[1];
+
+ c = ev->data;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0,
+ "js http keepalive close handler: %d", c->fd);
+
+ if (c->close || ev->timedout) {
+ goto close;
+ }
+
+ n = recv(c->fd, buf, 1, MSG_PEEK);
+
+ if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
+ ev->ready = 0;
+
+ if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+ goto close;
+ }
+
+ return;
+ }
+
+close:
+
+ cache = c->data;
+ conf = cache->conf;
+
+ ngx_js_http_close_connection(c);
+
+ ngx_queue_remove(&cache->queue);
+ ngx_queue_insert_head(&conf->fetch_keepalive_free, &cache->queue);
+}
diff --git a/nginx/ngx_js_http.h b/nginx/ngx_js_http.h
index 7adcc130..acaa5802 100644
--- a/nginx/ngx_js_http.h
+++ b/nginx/ngx_js_http.h
@@ -11,8 +11,10 @@
#define _NGX_JS_HTTP_H_INCLUDED_
-typedef struct ngx_js_http_s ngx_js_http_t;
+#define NGX_JS_HOST_MAX_LEN 256
+
+typedef struct ngx_js_http_s ngx_js_http_t;
typedef struct {
ngx_uint_t state;
@@ -23,8 +25,6 @@ typedef struct {
u_char *status_text;
u_char *status_text_end;
ngx_uint_t count;
- ngx_flag_t chunked;
- off_t content_length_n;
u_char *header_name_start;
u_char *header_name_end;
@@ -114,6 +114,7 @@ struct ngx_js_http_s {
ngx_addr_t *addrs;
ngx_uint_t naddrs;
ngx_uint_t naddr;
+ ngx_str_t host;
in_port_t port;
ngx_peer_connection_t peer;
@@ -124,8 +125,11 @@ struct ngx_js_http_s {
unsigned header_only;
+ ngx_flag_t chunked;
+ ngx_flag_t keepalive;
+ off_t content_length_n;
+
#if (NGX_SSL)
- ngx_str_t tls_name;
ngx_ssl_t *ssl;
njs_bool_t ssl_verify;
#endif
diff --git a/nginx/ngx_qjs_fetch.c b/nginx/ngx_qjs_fetch.c
index f855e099..a211e0fe 100644
--- a/nginx/ngx_qjs_fetch.c
+++ b/nginx/ngx_qjs_fetch.c
@@ -5,10 +5,6 @@
*/
-#include <ngx_config.h>
-#include <ngx_core.h>
-#include <ngx_event.h>
-#include <ngx_event_connect.h>
#include "ngx_js.h"
#include "ngx_js_http.h"
@@ -270,7 +266,14 @@ ngx_qjs_ext_fetch(JSContext *cx, JSValueConst this_val, int argc,
goto fail;
}
+ if (u.host.len >= NGX_JS_HOST_MAX_LEN) {
+ JS_ThrowInternalError(cx, "Host name too long");
+ goto fail;
+ }
+
http = &fetch->http;
+ http->host = u.host;
+ http->port = u.port;
http->response.url = request.url;
http->buffer_size = ngx_qjs_external_buffer_size(cx, external);
http->max_response_body_size =
@@ -418,18 +421,22 @@ ngx_qjs_ext_fetch(JSContext *cx, JSValueConst this_val, int argc,
continue;
}
+ if (h[i].key.len == 10
+ && ngx_strncasecmp(h[i].key.data, (u_char *) "Connection", 10)
+ == 0)
+ {
+ continue;
+ }
+
njs_chb_append(&http->chain, h[i].key.data, h[i].key.len);
njs_chb_append_literal(&http->chain, ": ");
njs_chb_append(&http->chain, h[i].value.data, h[i].value.len);
njs_chb_append_literal(&http->chain, CRLF);
}
- njs_chb_append_literal(&http->chain, "Connection: close" CRLF);
-
-#if (NGX_SSL)
- http->tls_name.data = u.host.data;
- http->tls_name.len = u.host.len;
-#endif
+ if (!http->keepalive) {
+ njs_chb_append_literal(&http->chain, "Connection: close" CRLF);
+ }
if (request.body.len != 0) {
njs_chb_sprintf(&http->chain, 32, "Content-Length: %uz" CRLF CRLF,
@@ -1213,7 +1220,8 @@ ngx_qjs_fetch_alloc(JSContext *cx, ngx_pool_t *pool, ngx_log_t *log,
http->conf = conf;
- http->http_parse.content_length_n = -1;
+ http->content_length_n = -1;
+ http->keepalive = (conf->fetch_keepalive > 0);
ngx_qjs_arg(http->response.header_value) = JS_UNDEFINED;
diff --git a/nginx/ngx_stream_js_module.c b/nginx/ngx_stream_js_module.c
index 1257abb5..ab4e787d 100644
--- a/nginx/ngx_stream_js_module.c
+++ b/nginx/ngx_stream_js_module.c
@@ -351,6 +351,34 @@ static ngx_command_t ngx_stream_js_commands[] = {
offsetof(ngx_stream_js_srv_conf_t, timeout),
NULL },
+ { ngx_string("js_fetch_keepalive"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_js_srv_conf_t, fetch_keepalive),
+ NULL },
+
+ { ngx_string("js_fetch_keepalive_requests"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_js_srv_conf_t, fetch_keepalive_requests),
+ NULL },
+
+ { ngx_string("js_fetch_keepalive_time"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_msec_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_js_srv_conf_t, fetch_keepalive_time),
+ NULL },
+
+ { ngx_string("js_fetch_keepalive_timeout"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_msec_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_js_srv_conf_t, fetch_keepalive_timeout),
+ NULL },
+
#if (NGX_STREAM_SSL)
{ ngx_string("js_fetch_ciphers"),
diff --git a/nginx/t/js_fetch_https_keepalive.t b/nginx/t/js_fetch_https_keepalive.t
new file mode 100644
index 00000000..86f72d0a
--- /dev/null
+++ b/nginx/t/js_fetch_https_keepalive.t
@@ -0,0 +1,345 @@
+#!/usr/bin/perl
+
+# (C) Dmitry Volyntsev
+# (C) F5, Inc.
+
+# Tests for http njs module, fetch method, https keepalive support.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use Test::More;
+
+BEGIN { use FindBin; chdir($FindBin::Bin); }
+
+use lib 'lib';
+use Test::Nginx;
+
+###############################################################################
+
+select STDERR; $| = 1;
+select STDOUT; $| = 1;
+
+my $t = Test::Nginx->new()->has(qw/http http_ssl rewrite/)
+ ->write_file_expand('nginx.conf', <<'EOF');
+
+%%TEST_GLOBALS%%
+
+daemon off;
+
+events {
+}
+
+http {
+ %%TEST_GLOBALS_HTTP%%
+
+ js_import test.js;
+
+ server {
+ listen 127.0.0.1:8080;
+ server_name localhost;
+
+ resolver 127.0.0.1:%%PORT_8981_UDP%%;
+ resolver_timeout 1s;
+
+ location /njs {
+ js_content test.njs;
+ }
+
+ location /https {
+ js_content test.https;
+
+ js_fetch_keepalive 4;
+ js_fetch_ciphers HIGH:!aNull:!MD5;
+ js_fetch_protocols TLSv1.1 TLSv1.2;
+ js_fetch_trusted_certificate myca.crt;
+ }
+
+ location /sni_isolation {
+ js_content test.sni_isolation;
+
+ js_fetch_keepalive 4;
+ js_fetch_ciphers HIGH:!aNull:!MD5;
+ js_fetch_protocols TLSv1.1 TLSv1.2;
+ js_fetch_trusted_certificate myca.crt;
+ }
+
+ location /plain_vs_https_isolation {
+ js_content test.plain_vs_https_isolation;
+
+ js_fetch_keepalive 4;
+ js_fetch_ciphers HIGH:!aNull:!MD5;
+ js_fetch_protocols TLSv1.1 TLSv1.2;
+ js_fetch_trusted_certificate myca.crt;
+ }
+ }
+
+ server {
+ listen 127.0.0.1:8081 ssl;
+ server_name ka.example.com;
+
+ keepalive_requests 100;
+
+ ssl_certificate ka.example.com.chained.crt;
+ ssl_certificate_key ka.example.com.key;
+
+ location /loc {
+ return 200 CONN:$connection_requests;
+ }
+ }
+
+ server {
+ listen 127.0.0.1:8081 ssl;
+ server_name 1.example.com;
+
+ ssl_certificate 1.example.com.chained.crt;
+ ssl_certificate_key 1.example.com.key;
+
+ location /loc {
+ return 200 "You are at 1.example.com.";
+ }
+ }
+
+ server {
+ listen 127.0.0.1:8082;
+ server_name plain.example.com;
+
+ keepalive_requests 100;
+
+ location /loc {
+ return 200 PLAIN:$connection_requests;
+ }
+ }
+}
+
+EOF
+
+my $p1 = port(8081);
+my $p2 = port(8082);
+
+$t->write_file('test.js', <<EOF);
+ function test_njs(r) {
+ r.return(200, njs.version);
+ }
+
+ function https(r) {
+ var url = `https://\${r.args.domain}:$p1/loc`;
+ var opt = {};
+
+ if (r.args.verify != null && r.args.verify == "false") {
+ opt.verify = false;
+ }
+
+ ngx.fetch(url, opt)
+ .then(reply => reply.text())
+ .then(body => r.return(200, body))
+ .catch(e => r.return(501, e.message))
+ }
+
+ async function sni_isolation(r) {
+ try {
+ let resp = await ngx.fetch(`https://ka.example.com:$p1/loc`);
+ let body1 = await resp.text();
+
+ resp = await ngx.fetch(`https://1.example.com:$p1/loc`);
+ let body2 = await resp.text();
+
+ resp = await ngx.fetch(`https://ka.example.com:$p1/loc`);
+ let body3 = await resp.text();
+
+ r.return(200, `\${body1}|\${body2}|\${body3}`);
+
+ } catch (e) {
+ r.return(501, e.message);
+ }
+ }
+
+ async function plain_vs_https_isolation(r) {
+ try {
+ let resp = await ngx.fetch(`https://ka.example.com:$p1/loc`);
+ let body1 = await resp.text();
+
+ resp = await ngx.fetch(`http://plain.example.com:$p2/loc`);
+ let body2 = await resp.text();
+
+ resp = await ngx.fetch(`https://ka.example.com:$p1/loc`);
+ let body3 = await resp.text();
+
+ r.return(200, `\${body1}|\${body2}|\${body3}`);
+
+ } catch (e) {
+ r.return(501, e.message);
+ }
+ }
+
+ export default {njs: test_njs, https, sni_isolation,
+ plain_vs_https_isolation};
+EOF
+
+my $d = $t->testdir();
+
+$t->write_file('openssl.conf', <<EOF);
+[ req ]
+default_bits = 2048
+encrypt_key = no
+distinguished_name = req_distinguished_name
+x509_extensions = myca_extensions
+[ req_distinguished_name ]
+[ myca_extensions ]
+basicConstraints = critical,CA:TRUE
+EOF
+
+$t->write_file('myca.conf', <<EOF);
+[ ca ]
+default_ca = myca
+
+[ myca ]
+new_certs_dir = $d
+database = $d/certindex
+default_md = sha256
+policy = myca_policy
+serial = $d/certserial
+default_days = 1
+x509_extensions = myca_extensions
+
+[ myca_policy ]
+commonName = supplied
+
+[ myca_extensions ]
+basicConstraints = critical,CA:TRUE
+EOF
+
+system('openssl req -x509 -new '
+ . "-config $d/openssl.conf -subj /CN=myca/ "
+ . "-out $d/myca.crt -keyout $d/myca.key "
+ . ">>$d/openssl.out 2>&1") == 0
+ or die "Can't create self-signed certificate for CA: $!\n";
+
+foreach my $name ('intermediate', '1.example.com', 'ka.example.com') {
+ system("openssl req -new "
+ . "-config $d/openssl.conf -subj /CN=$name/ "
+ . "-out $d/$name.csr -keyout $d/$name.key "
+ . ">>$d/openssl.out 2>&1") == 0
+ or die "Can't create certificate signing req for $name: $!\n";
+}
+
+$t->write_file('certserial', '1000');
+$t->write_file('certindex', '');
+
+system("openssl ca -batch -config $d/myca.conf "
+ . "-keyfile $d/myca.key -cert $d/myca.crt "
+ . "-subj /CN=intermediate/ -in $d/intermediate.csr "
+ . "-out $d/intermediate.crt "
+ . ">>$d/openssl.out 2>&1") == 0
+ or die "Can't sign certificate for intermediate: $!\n";
+
+foreach my $name ('1.example.com', 'ka.example.com') {
+ system("openssl ca -batch -config $d/myca.conf "
+ . "-keyfile $d/intermediate.key -cert $d/intermediate.crt "
+ . "-subj /CN=$name/ -in $d/$name.csr -out $d/$name.crt "
+ . ">>$d/openssl.out 2>&1") == 0
+ or die "Can't sign certificate for $name $!\n";
+ $t->write_file("$name.chained.crt", $t->read_file("$name.crt")
+ . $t->read_file('intermediate.crt'));
+}
+
+$t->try_run('no njs.fetch');
+
+$t->plan(5);
+
+$t->run_daemon(\&dns_daemon, port(8981), $t);
+$t->waitforfile($t->testdir . '/' . port(8981));
+
+###############################################################################
+
+like(http_get('/https?domain=localhost'),
+ qr/connect failed/s, 'fetch https wrong CN certificate');
+like(http_get('/https?domain=ka.example.com'),
+ qr/CONN:1$/s, 'fetch https keepalive');
+like(http_get('/https?domain=ka.example.com'),
+ qr/CONN:2$/s, 'fetch https keepalive reused');
+like(http_get('/sni_isolation'),
+ qr/CONN:1\|You are at 1\.example\.com\.\|CONN:2$/s,
+ 'fetch https keepalive SNI isolation');
+like(http_get('/plain_vs_https_isolation'),
+ qr/CONN:1\|PLAIN:1\|CONN:2$/s,
+ 'fetch https->plain->https keepalive isolation');
+
+###############################################################################
+
+sub reply_handler {
+ my ($recv_data, $port, %extra) = @_;
+
+ my (@name, @rdata);
+
+ use constant NOERROR => 0;
+ use constant A => 1;
+ use constant IN => 1;
+
+ # default values
+
+ my ($hdr, $rcode, $ttl) = (0x8180, NOERROR, 3600);
+
+ # decode name
+
+ my ($len, $offset) = (undef, 12);
+ while (1) {
+ $len = unpack("\@$offset C", $recv_data);
+ last if $len == 0;
+ $offset++;
+ push @name, unpack("\@$offset A$len", $recv_data);
+ $offset += $len;
+ }
+
+ $offset -= 1;
+ my ($id, $type, $class) = unpack("n x$offset n2", $recv_data);
+
+ my $name = join('.', @name);
+
+ if ($type == A) {
+ push @rdata, rd_addr($ttl, '127.0.0.1');
+ }
+
+ $len = @name;
+ pack("n6 (C/a*)$len x n2", $id, $hdr | $rcode, 1, scalar @rdata,
+ 0, 0, @name, $type, $class) . join('', @rdata);
+}
+
+sub rd_addr {
+ my ($ttl, $addr) = @_;
+
+ my $code = 'split(/\./, $addr)';
+
+ return pack 'n3N', 0xc00c, A, IN, $ttl if $addr eq '';
+
+ pack 'n3N nC4', 0xc00c, A, IN, $ttl, eval "scalar $code", eval($code);
+}
+
+sub dns_daemon {
+ my ($port, $t) = @_;
+
+ my ($data, $recv_data);
+ my $socket = IO::Socket::INET->new(
+ LocalAddr => '127.0.0.1',
+ LocalPort => $port,
+ Proto => 'udp',
+ )
+ or die "Can't create listening socket: $!\n";
+
+ local $SIG{PIPE} = 'IGNORE';
+
+ # signal we are ready
+
+ open my $fh, '>', $t->testdir() . '/' . $port;
+ close $fh;
+
+ while (1) {
+ $socket->recv($recv_data, 65536);
+ $data = reply_handler($recv_data, $port);
+ $socket->send($data);
+ }
+}
+
+###############################################################################
diff --git a/nginx/t/js_fetch_keepalive.t b/nginx/t/js_fetch_keepalive.t
new file mode 100755
index 00000000..100be40c
--- /dev/null
+++ b/nginx/t/js_fetch_keepalive.t
@@ -0,0 +1,289 @@
+#!/usr/bin/perl
+
+# (C) Dmitry Volyntsev
+# (C) F5, Inc.
+
+# Tests for http njs module, fetch method keepalive.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use Test::More;
+use IO::Socket::INET;
+
+use Socket qw/ CRLF /;
+
+BEGIN { use FindBin; chdir($FindBin::Bin); }
+
+use lib 'lib';
+use Test::Nginx;
+
+###############################################################################
+
+select STDERR; $| = 1;
+select STDOUT; $| = 1;
+
+my $t = Test::Nginx->new()->has(qw/http/)
+ ->write_file_expand('nginx.conf', <<'EOF');
+
+%%TEST_GLOBALS%%
+
+daemon off;
+
+events {
+}
+
+http {
+ %%TEST_GLOBALS_HTTP%%
+
+ js_import test.js;
+
+ server {
+ listen 127.0.0.1:8080;
+ server_name localhost;
+
+ location /engine {
+ js_content test.engine;
+ }
+
+ location /keepalive {
+ js_fetch_keepalive 4;
+ js_fetch_keepalive_requests 100;
+ js_fetch_keepalive_time 60s;
+ js_fetch_keepalive_timeout 60s;
+ js_content test.keepalive;
+ }
+
+ location /keepalive_simultaneous {
+ js_fetch_keepalive 4;
+ js_content test.keepalive_simultaneous;
+ }
+
+ location /keepalive_requests {
+ js_fetch_keepalive 4;
+ js_fetch_keepalive_requests 2;
+ js_content test.keepalive;
+ }
+
+ location /keepalive_time {
+ js_fetch_keepalive 4;
+ js_fetch_keepalive_time 100ms;
+ js_content test.keepalive;
+ }
+
+ location /keepalive_timeout {
+ js_fetch_keepalive 4;
+ js_fetch_keepalive_timeout 100ms;
+ js_content test.keepalive;
+ }
+
+ location /no_keepalive {
+ js_fetch_keepalive 0;
+ js_content test.keepalive;
+ }
+ }
+
+ server {
+ listen 127.0.0.1:8081;
+ keepalive_requests 100;
+ keepalive_timeout 60s;
+
+ location /count {
+ add_header Connection-ID $connection_requests;
+ return 200 $connection_requests;
+ }
+
+ location /count_close {
+ add_header Connection close;
+ add_header Connection-ID $connection_requests;
+ return 200 $connection_requests;
+ }
+
+ location /count_close_mixed {
+ add_header cOnNeCtiOn ClOsE;
+ add_header Connection-ID $connection_requests;
+ return 200 $connection_requests;
+ }
+ }
+}
+
+EOF
+
+my $p1 = port(8081);
+my $p2 = port(8082);
+
+$t->write_file('test.js', <<EOF);
+ function engine(r) {
+ r.return(200, njs.engine);
+ }
+
+ function sleep(milliseconds) {
+ return new Promise(resolve => setTimeout(resolve, milliseconds));
+ }
+
+ async function keepalive(r) {
+ const path = r.args.path;
+ let port = $p1;
+ if (r.args.port) {
+ port = r.args.port;
+ }
+
+ let responses = [];
+ for (let i = 0; i < 3; i++) {
+ let resp = await ngx.fetch(`http://127.0.0.1:\${port}/\${path}`)
+ .then(resp => resp.text())
+ .catch(err => err.message);
+ responses.push(resp.trim());
+
+ if (r.args.sleep) {
+ await sleep(Number(r.args.sleep));
+ }
+ }
+
+ r.return(200, responses.toString());
+ }
+
+ async function keepalive_simultaneous(r) {
+ let promises = [];
+ for (let i = 0; i < Number(r.args.n); i++) {
+ promises.push(ngx.fetch('http://127.0.0.1:$p1/count'));
+ }
+
+ let results = await Promise.all(promises);
+ let bodies = await Promise.all(results.map(r => r.text()));
+ let responses = bodies.map(b => parseInt(b.trim()));
+
+ r.return(200, JSON.stringify(responses));
+ }
+
+ export default {engine, keepalive, keepalive_simultaneous};
+EOF
+
+$t->try_run('no js_fetch_keepalive');
+
+$t->run_daemon(\&http_daemon, $p2);
+$t->waitforsocket('127.0.0.1:' . $p2);
+
+$t->plan(16);
+
+###############################################################################
+
+like(http_get('/no_keepalive?path=count'), qr/1,1,1/,
+ 'no keepalive connections');
+like(http_get('/keepalive?path=count_close'), qr/1,1,1/,
+ 'upstream Connection: close (HTTP/1.1)');
+like(http_get('/keepalive?path=count_close_mixed'), qr/1,1,1/,
+ 'upstream Connection: close, mixed-case (HTTP/1.1)');
+like(http_get('/keepalive?path=count'), qr/1,2,3/,
+ 'keepalive reuses connection');
+like(http_get('/keepalive?path=count'), qr/4,5,6/,
+ 'keepalive reuses connection across requests');
+like(http_get('/keepalive_simultaneous?n=8'), qr/1,1,1,1,1,1,1,1/,
+ 'keepalive simultaneous requests');
+like(http_get('/keepalive_simultaneous?n=8'), qr/2,2,2,2,1,1,1,1/,
+ 'keepalive simultaneous requests reused connections');
+like(http_get('/keepalive_requests?path=count'), qr/1,2,1/,
+ 'keepalive with limited requests per connection');
+
+like(http_get('/keepalive_time?path=count'), qr/1,2,3/,
+ 'keepalive with time limit, first round');
+
+select undef, undef, undef, 0.15;
+
+like(http_get('/keepalive_time?path=count'), qr/4,1,2/,
+ 'keepalive with time limit, second round');
+
+like(http_get('/keepalive_timeout?path=count'), qr/1,2,3/,
+ 'keepalive with timeout limit, first round');
+
+select undef, undef, undef, 0.15;
+
+like(http_get('/keepalive_timeout?path=count'), qr/1,2,3/,
+ 'keepalive with timeout limit, second round');
+
+like(http_get("/keepalive?path=broken_keepalive&port=$p2&sleep=1"), qr/1,1,1/,
+ 'upstream broken keepalive (connection closed by upstream)');
+like(http_get("/keepalive?path=http10&port=$p2"), qr/1,1,1/,
+ 'upstream HTTP/1.0 (no keepalive)');
+like(http_get("/keepalive?path=count&port=$p2&sleep=1"), qr/1,2,3/,
+ 'normal keepalive');
+like(http_get("/keepalive?path=assumed_keepalive&port=$p2&sleep=1"), qr/4,5,6/,
+ 'assumed keepalive');
+
+###############################################################################
+
+sub http_daemon {
+ my $port = shift;
+
+ my $server = IO::Socket::INET->new(
+ Proto => 'tcp',
+ LocalAddr => '127.0.0.1:' . $port,
+ Listen => 5,
+ Reuse => 1
+ ) or die "Can't create listening socket: $!\n";
+
+ my $ccount = 0;
+ my $rcount = 0;
+
+ # dumb server which is able to keep connections alive
+
+ while (my $client = $server->accept()) {
+ Test::Nginx::log_core('||',
+ "connection from " . $client->peerhost());
+ $client->autoflush(1);
+ $ccount++;
+ $rcount = 0;
+
+ while (1) {
+ my $headers = '';
+ my $uri = '';
+
+ while (<$client>) {
+ Test::Nginx::log_core('||', $_);
+ $headers .= $_;
+ last if (/^\x0d?\x0a?$/);
+ }
+
+ last if $headers eq '';
+ $rcount++;
+
+ $uri = $1 if $headers =~ /^\S+\s+([^ ]+)\s+HTTP/i;
+ my $body = $rcount;
+
+ if ($uri eq '/broken_keepalive') {
+ print $client
+ "HTTP/1.1 200 OK" . CRLF .
+ "Content-Length: " . length($body) . CRLF .
+ "Connection: keep-alive" . CRLF . CRLF .
+ $body;
+
+ last;
+
+ } elsif ($uri eq '/assumed_keepalive') {
+ print $client
+ "HTTP/1.1 200 OK" . CRLF .
+ "Content-Length: " . length($body) . CRLF . CRLF .
+ $body;
+
+ } elsif ($uri eq '/count') {
+ print $client
+ "HTTP/1.1 200 OK" . CRLF .
+ "Content-Length: " . length($body) . CRLF .
+ "Connection: keep-alive" . CRLF . CRLF .
+ $body;
+
+ } elsif ($uri eq '/http10') {
+ print $client
+ "HTTP/1.0 200 OK" . CRLF .
+ "Content-Length: " . length($body) . CRLF . CRLF .
+ $body;
+ }
+ }
+
+ close $client;
+ }
+}
+
+###############################################################################
diff --git a/nginx/t/stream_js_fetch_keepalive.t b/nginx/t/stream_js_fetch_keepalive.t
new file mode 100644
index 00000000..e940eea5
--- /dev/null
+++ b/nginx/t/stream_js_fetch_keepalive.t
@@ -0,0 +1,200 @@
+#!/usr/bin/perl
+
+# (C) Dmitry Volyntsev
+# (C) F5, Inc.
+
+# Tests for stream njs module, fetch method keepalive.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use Test::More;
+use IO::Socket::INET;
+
+BEGIN { use FindBin; chdir($FindBin::Bin); }
+
+use lib 'lib';
+use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
+
+###############################################################################
+
+select STDERR; $| = 1;
+select STDOUT; $| = 1;
+
+my $t = Test::Nginx->new()->has(qw/http stream/)
+ ->write_file_expand('nginx.conf', <<'EOF');
+
+%%TEST_GLOBALS%%
+
+daemon off;
+
+events {
+}
+
+http {
+ %%TEST_GLOBALS_HTTP%%
+
+ js_import test.js;
+
+ server {
+ listen 127.0.0.1:8080;
+ server_name localhost;
+
+ location /engine {
+ js_content test.engine;
+ }
+ }
+
+ server {
+ listen 127.0.0.1:8081;
+ keepalive_requests 100;
+ keepalive_timeout 60s;
+
+ location /count {
+ add_header Connection-ID $connection_requests;
+ return 200 $connection_requests;
+ }
+
+ location /headers {
+ return 200 "Connection: $http_connection";
+ }
+ }
+}
+
+stream {
+ %%TEST_GLOBALS_STREAM%%
+
+ js_import test.js;
+ js_var $message;
+
+ server {
+ listen 127.0.0.1:8082;
+ js_fetch_keepalive 4;
+ js_fetch_keepalive_requests 100;
+ js_fetch_keepalive_time 60s;
+ js_fetch_keepalive_timeout 60s;
+ js_preread test.keepalive;
+ return $message;
+ }
+
+ server {
+ listen 127.0.0.1:8083;
+ js_fetch_keepalive 0;
+ js_preread test.keepalive;
+ return $message;
+ }
+
+ server {
+ listen 127.0.0.1:8084;
+ js_fetch_keepalive 4;
+ js_fetch_keepalive_requests 2;
+ js_preread test.keepalive;
+ return $message;
+ }
+
+ server {
+ listen 127.0.0.1:8085;
+ js_fetch_keepalive 4;
+ js_fetch_keepalive_time 100ms;
+ js_preread test.keepalive;
+ return $message;
+ }
+
+ server {
+ listen 127.0.0.1:8086;
+ js_fetch_keepalive 4;
+ js_fetch_keepalive_timeout 100ms;
+ js_preread test.keepalive;
+ return $message;
+ }
+
+ server {
+ listen 127.0.0.1:8087;
+ js_fetch_keepalive 4;
+ js_preread test.keepalive_simultaneous;
+ return $message;
+ }
+}
+
+EOF
+
+my $p1 = port(8081);
+
+$t->write_file('test.js', <<EOF);
+ function engine(r) {
+ r.return(200, njs.engine);
+ }
+
+ async function keepalive(s) {
+ let responses = [];
+
+ for (let i = 0; i < 3; i++) {
+ let resp = await ngx.fetch('http://127.0.0.1:$p1/count');
+ let body = await resp.text();
+ responses.push(parseInt(body.trim()));
+ }
+
+ s.variables.message = JSON.stringify(responses);
+ s.done();
+ }
+
+ async function keepalive_simultaneous(s) {
+ let promises = [];
+ let n = 8;
+ for (let i = 0; i < n; i++) {
+ promises.push(ngx.fetch('http://127.0.0.1:$p1/count'));
+ }
+
+ let results = await Promise.all(promises);
+ let bodies = await Promise.all(results.map(r => r.text()));
+ let responses = bodies.map(b => parseInt(b.trim()));
+
+ s.variables.message = JSON.stringify(responses);
+ s.done();
+ }
+
+ export default {engine, keepalive, keepalive_simultaneous};
+EOF
+
+$t->try_run('no stream js_fetch_keepalive');
+
+$t->plan(10);
+
+###############################################################################
+
+like(stream('127.0.0.1:' . port(8083))->io('GO'), qr/\[1,1,1]/,
+ 'no keepalive connections');
+like(stream('127.0.0.1:' . port(8082))->io('GO'), qr/\[1,2,3]/,
+ 'keepalive reuses connection');
+like(stream('127.0.0.1:' . port(8082))->io('GO'), qr/\[4,5,6]/,
+ 'keepalive reuses connection across sessions');
+
+like(stream('127.0.0.1:' . port(8087))->io('GO'), qr/^\[(1,){7}1\]$/,
+ 'keepalive simultaneous requests');
+like(stream('127.0.0.1:' . port(8087))->io('GO'),
+ qr/\[2,2,2,2,1,1,1,1\]/,
+ 'keepalive simultaneous requests reused connections');
+
+like(stream('127.0.0.1:' . port(8084))->io('GO'), qr/\[1,2,1]/,
+ 'keepalive with limited requests per connection');
+
+like(stream('127.0.0.1:' . port(8085))->io('GO'), qr/\[1,2,3]/,
+ 'keepalive with time limit, first round');
+
+select undef, undef, undef, 0.15;
+
+like(stream('127.0.0.1:' . port(8085))->io('GO'), qr/\[4,1,2]/,
+ 'keepalive with time limit, second round');
+
+like(stream('127.0.0.1:' . port(8086))->io('GO'), qr/\[1,2,3]/,
+ 'keepalive with timeout limit, first round');
+
+select undef, undef, undef, 0.15;
+
+like(stream('127.0.0.1:' . port(8086))->io('GO'), qr/\[1,2,3]/,
+ 'keepalive with timeout limit, second round');
+
+###############################################################################
More information about the nginx-devel
mailing list