[njs] Modules: introduced js_periodic directive.
Dmitry Volyntsev
xeioex at nginx.com
Tue Aug 22 18:13:40 UTC 2023
details: https://hg.nginx.org/njs/rev/f1bd0b1db065
branches:
changeset: 2184:f1bd0b1db065
user: Dmitry Volyntsev <xeioex at nginx.com>
date: Tue Aug 22 11:13:09 2023 -0700
description:
Modules: introduced js_periodic directive.
The directive specifies a JS handler to run at regular intervals. The JS
handler will be executed in each worker process. The handler receives no
arguments. It has access to ngx and other global objects.
example.conf:
location @periodics {
# Specifies a JS handler to be run at 1 minute intervals
js_periodic main.handler interval=60s jitter=5s;
resolver 10.0.0.1;
js_fetch_trusted_certificate /path/to/ISRG_Root_X1.pem;
}
example.js:
async function handler() {
if (ngx.worker_id != 0) {
/* using ngx.worker_id to run handler only in one worker. */
return;
}
let reply = async ngx.fetch('https://nginx.org/en/docs/njs/');
let body = async reply.text();
ngx.log(ngx.INFO, body);
}
This closes #660 issue on Github.
diffstat:
nginx/ngx_http_js_module.c | 399 +++++++++++++++++++++++++++++++++++++++++-
nginx/ngx_js.h | 3 +-
nginx/ngx_stream_js_module.c | 402 +++++++++++++++++++++++++++++++++++++++++-
nginx/t/js_periodic.t | 258 +++++++++++++++++++++++++++
nginx/t/stream_js_periodic.t | 322 ++++++++++++++++++++++++++++++++++
5 files changed, 1362 insertions(+), 22 deletions(-)
diffs (truncated from 1628 to 1000 lines):
diff -r 4cb8e873e8c6 -r f1bd0b1db065 nginx/ngx_http_js_module.c
--- a/nginx/ngx_http_js_module.c Tue Aug 22 11:12:02 2023 -0700
+++ b/nginx/ngx_http_js_module.c Tue Aug 22 11:13:09 2023 -0700
@@ -22,6 +22,27 @@ typedef struct {
} ngx_http_js_loc_conf_t;
+typedef struct {
+ ngx_http_conf_ctx_t *conf_ctx;
+ ngx_connection_t *connection;
+ void *padding;
+
+ /**
+ * fd is used for event debug and should be at the same position
+ * as in ngx_connection_t: after a 3rd pointer.
+ */
+ ngx_socket_t fd;
+
+ ngx_str_t method;
+ ngx_msec_t interval;
+ ngx_msec_t jitter;
+
+ ngx_log_t log;
+ ngx_http_log_ctx_t log_ctx;
+ ngx_event_t event;
+} ngx_js_periodic_t;
+
+
#define NJS_HEADER_SEMICOLON 0x1
#define NJS_HEADER_SINGLE 0x2
#define NJS_HEADER_ARRAY 0x4
@@ -45,6 +66,8 @@ typedef struct {
ngx_chain_t **last_out;
ngx_chain_t *free;
ngx_chain_t *busy;
+
+ ngx_js_periodic_t *periodic;
} ngx_http_js_ctx_t;
@@ -88,7 +111,8 @@ static ngx_int_t ngx_http_js_variable_se
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_js_variable_var(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
-static ngx_int_t ngx_http_js_init_vm(ngx_http_request_t *r);
+static ngx_int_t ngx_http_js_init_vm(ngx_http_request_t *r,
+ unsigned inject_request);
static void ngx_http_js_cleanup_ctx(void *data);
static njs_int_t ngx_http_js_ext_keys_header(njs_vm_t *vm, njs_value_t *value,
@@ -256,8 +280,18 @@ static size_t ngx_http_js_max_response_b
static void ngx_http_js_handle_vm_event(ngx_http_request_t *r,
njs_vm_event_t vm_event, njs_value_t *args, njs_uint_t nargs);
+static void ngx_http_js_periodic_handler(ngx_event_t *ev);
+static void ngx_http_js_periodic_write_event_handler(ngx_http_request_t *r);
+static void ngx_http_js_periodic_shutdown_handler(ngx_event_t *ev);
+static void ngx_http_js_periodic_finalize(ngx_http_request_t *r, ngx_int_t rc);
+static void ngx_http_js_periodic_destroy(ngx_http_request_t *r,
+ ngx_js_periodic_t *periodic);
+
static njs_int_t ngx_js_http_init(njs_vm_t *vm);
static ngx_int_t ngx_http_js_init(ngx_conf_t *cf);
+static ngx_int_t ngx_http_js_init_worker(ngx_cycle_t *cycle);
+static char *ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
static char *ngx_http_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *ngx_http_js_var(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *ngx_http_js_content(ngx_conf_t *cf, ngx_command_t *cmd,
@@ -297,6 +331,13 @@ static ngx_command_t ngx_http_js_comman
0,
NULL },
+ { ngx_string("js_periodic"),
+ NGX_HTTP_LOC_CONF|NGX_CONF_ANY,
+ ngx_http_js_periodic,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ 0,
+ NULL },
+
{ ngx_string("js_preload_object"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE13,
ngx_js_preload_object,
@@ -439,7 +480,7 @@ ngx_module_t ngx_http_js_module = {
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
- NULL, /* init process */
+ ngx_http_js_init_worker, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
@@ -863,7 +904,7 @@ ngx_http_js_content_event_handler(ngx_ht
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http js content event handler");
- rc = ngx_http_js_init_vm(r);
+ rc = ngx_http_js_init_vm(r, 1);
if (rc == NGX_ERROR || rc == NGX_DECLINED) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
@@ -999,7 +1040,7 @@ ngx_http_js_header_filter(ngx_http_reque
return ngx_http_next_header_filter(r);
}
- rc = ngx_http_js_init_vm(r);
+ rc = ngx_http_js_init_vm(r, 1);
if (rc == NGX_ERROR || rc == NGX_DECLINED) {
return NGX_ERROR;
@@ -1051,7 +1092,7 @@ ngx_http_js_body_filter(ngx_http_request
return ngx_http_next_body_filter(r, in);
}
- rc = ngx_http_js_init_vm(r);
+ rc = ngx_http_js_init_vm(r, 1);
if (rc == NGX_ERROR || rc == NGX_DECLINED) {
return NGX_ERROR;
@@ -1165,7 +1206,7 @@ ngx_http_js_variable_set(ngx_http_reques
ngx_str_t value;
ngx_http_js_ctx_t *ctx;
- rc = ngx_http_js_init_vm(r);
+ rc = ngx_http_js_init_vm(r, 1);
if (rc == NGX_ERROR) {
return NGX_ERROR;
@@ -1239,7 +1280,7 @@ ngx_http_js_variable_var(ngx_http_reques
static ngx_int_t
-ngx_http_js_init_vm(ngx_http_request_t *r)
+ngx_http_js_init_vm(ngx_http_request_t *r, unsigned inject_request)
{
njs_int_t rc;
ngx_str_t exception;
@@ -1318,10 +1359,12 @@ ngx_http_js_init_vm(ngx_http_request_t *
return NGX_ERROR;
}
- rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->request),
- ngx_http_js_request_proto_id, r, 0);
- if (rc != NJS_OK) {
- return NGX_ERROR;
+ if (inject_request) {
+ rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->request),
+ ngx_http_js_request_proto_id, r, 0);
+ if (rc != NJS_OK) {
+ return NGX_ERROR;
+ }
}
return NGX_OK;
@@ -4048,6 +4091,221 @@ ngx_http_js_location(njs_vm_t *vm, ngx_h
}
+static void
+ngx_http_js_periodic_handler(ngx_event_t *ev)
+{
+ ngx_int_t rc;
+ ngx_msec_t timer;
+ ngx_connection_t *c;
+ ngx_js_periodic_t *periodic;
+ ngx_http_js_ctx_t *ctx;
+ ngx_http_request_t *r;
+ ngx_http_connection_t hc;
+
+ periodic = ev->data;
+
+ timer = periodic->interval;
+
+ if (periodic->jitter) {
+ timer += (ngx_msec_t) ngx_random() % periodic->jitter;
+ }
+
+ ngx_add_timer(&periodic->event, timer);
+
+ c = periodic->connection;
+
+ if (c != NULL) {
+ ngx_log_error(NGX_LOG_ERR, c->log, 0,
+ "http js periodic \"%V\" is already running, killing "
+ "previous instance", &periodic->method);
+
+ ngx_http_js_periodic_finalize(c->data, NGX_ERROR);
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, &periodic->log, 0,
+ "http js periodic handler: \"%V\"", &periodic->method);
+
+ c = ngx_get_connection(0, &periodic->log);
+
+ if (c == NULL) {
+ return;
+ }
+
+ ngx_memzero(&hc, sizeof(ngx_http_connection_t));
+
+ hc.conf_ctx = periodic->conf_ctx;
+
+ c->data = &hc;
+
+ r = ngx_http_create_request(c);
+
+ if (r == NULL) {
+ ngx_free_connection(c);
+ c->fd = (ngx_socket_t) -1;
+ return;
+ }
+
+ c->data = r;
+ c->destroyed = 0;
+ c->pool = r->pool;
+ c->read->handler = ngx_http_js_periodic_shutdown_handler;
+
+ periodic->connection = c;
+ periodic->log_ctx.request = r;
+ periodic->log_ctx.connection = c;
+
+ r->method = NGX_HTTP_GET;
+ r->method_name = ngx_http_core_get_method;
+
+ ngx_str_set(&r->uri, "/");
+ r->unparsed_uri = r->uri;
+ r->valid_unparsed_uri = 1;
+
+ r->health_check = 1;
+ r->write_event_handler = ngx_http_js_periodic_write_event_handler;
+
+ rc = ngx_http_js_init_vm(r, 0);
+
+ if (rc != NGX_OK) {
+ ngx_http_js_periodic_destroy(r, periodic);
+ return;
+ }
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_js_module);
+
+ ctx->periodic = periodic;
+
+ r->count++;
+
+ rc = ngx_js_invoke(ctx->vm, &periodic->method, &periodic->log, NULL, 0,
+ &ctx->retval);
+
+ if (rc == NGX_AGAIN) {
+ rc = NGX_OK;
+ }
+
+ r->count--;
+
+ ngx_http_js_periodic_finalize(r, rc);
+}
+
+
+static void
+ngx_http_js_periodic_write_event_handler(ngx_http_request_t *r)
+{
+ ngx_http_js_ctx_t *ctx;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http js periodic write event handler");
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_js_module);
+
+ if (!njs_vm_pending(ctx->vm)) {
+ ngx_http_js_periodic_finalize(r, NGX_OK);
+ return;
+ }
+}
+
+
+static void
+ngx_http_js_periodic_shutdown_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+
+ c = ev->data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
+ "http js periodic shutdown handler");
+
+ if (c->close) {
+ ngx_http_js_periodic_finalize(c->data, NGX_ERROR);
+ return;
+ }
+
+ ngx_log_error(NGX_LOG_ERR, c->log, 0, "http js periodic shutdown handler "
+ "while not closing");
+}
+
+
+static void
+ngx_http_js_periodic_finalize(ngx_http_request_t *r, ngx_int_t rc)
+{
+ ngx_http_js_ctx_t *ctx;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_js_module);
+
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http js periodic finalize: \"%V\" rc: %i c: %i pending: %i",
+ &ctx->periodic->method, rc, r->count,
+ njs_vm_pending(ctx->vm));
+
+ if (r->count > 1 || (rc == NGX_OK && njs_vm_pending(ctx->vm))) {
+ return;
+ }
+
+ ngx_http_js_periodic_destroy(r, ctx->periodic);
+}
+
+
+static void
+ngx_http_js_periodic_destroy(ngx_http_request_t *r, ngx_js_periodic_t *periodic)
+{
+ ngx_connection_t *c;
+ ngx_http_cleanup_t *cln;
+
+ c = r->connection;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
+ "http js periodic destroy: \"%V\"",
+ &periodic->method);
+
+ periodic->connection = NULL;
+
+ for (cln = r->cleanup; cln; cln = cln->next) {
+ if (cln->handler) {
+ cln->handler(cln->data);
+ }
+ }
+
+ ngx_free_connection(c);
+
+ c->fd = (ngx_socket_t) -1;
+ c->pool = NULL;
+ c->destroyed = 1;
+
+ ngx_destroy_pool(r->pool);
+}
+
+
+static ngx_int_t
+ngx_http_js_periodic_init(ngx_js_periodic_t *periodic)
+{
+ ngx_log_t *log;
+ ngx_msec_t jitter;
+ ngx_http_core_loc_conf_t *clcf;
+
+ clcf = ngx_http_get_module_loc_conf(periodic->conf_ctx,
+ ngx_http_core_module);
+ log = clcf->error_log;
+
+ ngx_memcpy(&periodic->log, log, sizeof(ngx_log_t));
+
+ periodic->log.data = &periodic->log_ctx;
+ periodic->connection = NULL;
+
+ periodic->event.handler = ngx_http_js_periodic_handler;
+ periodic->event.data = periodic;
+ periodic->event.log = log;
+ periodic->event.cancelable = 1;
+
+ jitter = periodic->jitter ? (ngx_msec_t) ngx_random() % periodic->jitter
+ : 0;
+ ngx_add_timer(&periodic->event, jitter + 1);
+
+ return NGX_OK;
+}
+
+
static njs_host_event_t
ngx_http_js_set_timer(njs_external_ptr_t external, uint64_t delay,
njs_vm_event_t vm_event)
@@ -4193,6 +4451,11 @@ ngx_http_js_handle_vm_event(ngx_http_req
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"js exception: %V", &exception);
+ if (r->health_check) {
+ ngx_http_js_periodic_finalize(r, NGX_ERROR);
+ return;
+ }
+
ngx_http_finalize_request(r, NGX_ERROR);
return;
}
@@ -4255,6 +4518,119 @@ ngx_http_js_init(ngx_conf_t *cf)
}
+static ngx_int_t
+ngx_http_js_init_worker(ngx_cycle_t *cycle)
+{
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodics;
+ ngx_js_main_conf_t *jmcf;
+
+ if ((ngx_process != NGX_PROCESS_WORKER)
+ && ngx_process != NGX_PROCESS_SINGLE)
+ {
+ return NGX_OK;
+ }
+
+ jmcf = ngx_http_cycle_get_module_main_conf(cycle, ngx_http_js_module);
+
+ if (jmcf == NULL || jmcf->periodics == NULL) {
+ return NGX_OK;
+ }
+
+ periodics = jmcf->periodics->elts;
+
+ for (i = 0; i < jmcf->periodics->nelts; i++) {
+ periodics[i].fd = 1000000 + i;
+
+ if (ngx_http_js_periodic_init(&periodics[i]) != NGX_OK) {
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
+static char *
+ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_str_t *value, s;
+ ngx_msec_t interval, jitter;
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodic;
+ ngx_js_main_conf_t *jmcf;
+
+ if (cf->args->nelts < 2) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "method name is required");
+ return NGX_CONF_ERROR;
+ }
+
+ jmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_js_module);
+
+ if (jmcf->periodics == NULL) {
+ jmcf->periodics = ngx_array_create(cf->pool, 1,
+ sizeof(ngx_js_periodic_t));
+ if (jmcf->periodics == NULL) {
+ return NGX_CONF_ERROR;
+ }
+ }
+
+ periodic = ngx_array_push(jmcf->periodics);
+ if (periodic == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+
+ jitter = 0;
+ interval = 5000;
+
+ value = cf->args->elts;
+
+ for (i = 2; i < cf->args->nelts; i++) {
+
+ if (ngx_strncmp(value[i].data, "interval=", 9) == 0) {
+ s.len = value[i].len - 9;
+ s.data = value[i].data + 9;
+
+ interval = ngx_parse_time(&s, 0);
+
+ if (interval == (ngx_msec_t) NGX_ERROR || interval == 0) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+ if (ngx_strncmp(value[i].data, "jitter=", 7) == 0) {
+ s.len = value[i].len - 7;
+ s.data = value[i].data + 7;
+
+ jitter = ngx_parse_time(&s, 0);
+
+ if (jitter == (ngx_msec_t) NGX_ERROR) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+invalid:
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid parameter \"%V\"", &value[i]);
+ return NGX_CONF_ERROR;
+ }
+
+ periodic->method = value[1];
+ periodic->interval = interval;
+ periodic->jitter = jitter;
+ periodic->conf_ctx = cf->ctx;
+
+ return NGX_CONF_OK;
+}
+
+
static char *
ngx_http_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
@@ -4429,6 +4805,7 @@ ngx_http_js_create_main_conf(ngx_conf_t
* set by ngx_pcalloc():
*
* jmcf->dicts = NULL;
+ * jmcf->periodics = NULL;
*/
return jmcf;
diff -r 4cb8e873e8c6 -r f1bd0b1db065 nginx/ngx_js.h
--- a/nginx/ngx_js.h Tue Aug 22 11:12:02 2023 -0700
+++ b/nginx/ngx_js.h Tue Aug 22 11:13:09 2023 -0700
@@ -56,7 +56,8 @@ typedef struct {
#define NGX_JS_COMMON_MAIN_CONF \
- ngx_js_dict_t *dicts \
+ ngx_js_dict_t *dicts; \
+ ngx_array_t *periodics \
#define _NGX_JS_COMMON_LOC_CONF \
diff -r 4cb8e873e8c6 -r f1bd0b1db065 nginx/ngx_stream_js_module.c
--- a/nginx/ngx_stream_js_module.c Tue Aug 22 11:12:02 2023 -0700
+++ b/nginx/ngx_stream_js_module.c Tue Aug 22 11:13:09 2023 -0700
@@ -28,6 +28,26 @@ typedef struct {
typedef struct {
+ ngx_stream_conf_ctx_t *conf_ctx;
+ ngx_connection_t *connection;
+ void *padding;
+
+ /**
+ * fd is used for event debug and should be at the same position
+ * as in ngx_connection_t: after a 3rd pointer.
+ */
+ ngx_socket_t fd;
+
+ ngx_str_t method;
+ ngx_msec_t interval;
+ ngx_msec_t jitter;
+
+ ngx_log_t log;
+ ngx_event_t event;
+} ngx_js_periodic_t;
+
+
+typedef struct {
njs_vm_t *vm;
njs_opaque_value_t retval;
njs_opaque_value_t args[3];
@@ -43,6 +63,7 @@ typedef struct {
ngx_stream_js_ev_t events[2];
unsigned filter:1;
unsigned in_progress:1;
+ ngx_js_periodic_t *periodic;
} ngx_stream_js_ctx_t;
@@ -66,7 +87,8 @@ static ngx_int_t ngx_stream_js_variable_
ngx_stream_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_stream_js_variable_var(ngx_stream_session_t *s,
ngx_stream_variable_value_t *v, uintptr_t data);
-static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s);
+static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s,
+ unsigned inject_session);
static void ngx_stream_js_drop_events(ngx_stream_js_ctx_t *ctx);
static void ngx_stream_js_cleanup(void *data);
static njs_int_t ngx_stream_js_run_event(ngx_stream_session_t *s,
@@ -114,8 +136,18 @@ static size_t ngx_stream_js_max_response
static void ngx_stream_js_handle_event(ngx_stream_session_t *s,
njs_vm_event_t vm_event, njs_value_t *args, njs_uint_t nargs);
+static void ngx_stream_js_periodic_handler(ngx_event_t *ev);
+static void ngx_stream_js_periodic_event_handler(ngx_event_t *ev);
+static void ngx_stream_js_periodic_finalize(ngx_stream_session_t *s,
+ ngx_int_t rc);
+static void ngx_stream_js_periodic_destroy(ngx_stream_session_t *s,
+ ngx_js_periodic_t *periodic);
+
static njs_int_t ngx_js_stream_init(njs_vm_t *vm);
static ngx_int_t ngx_stream_js_init(ngx_conf_t *cf);
+static ngx_int_t ngx_stream_js_init_worker(ngx_cycle_t *cycle);
+static char *ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
static char *ngx_stream_js_set(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_stream_js_var(ngx_conf_t *cf, ngx_command_t *cmd,
@@ -154,6 +186,13 @@ static ngx_command_t ngx_stream_js_comm
0,
NULL },
+ { ngx_string("js_periodic"),
+ NGX_STREAM_SRV_CONF|NGX_CONF_ANY,
+ ngx_stream_js_periodic,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
{ ngx_string("js_preload_object"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE13,
ngx_js_preload_object,
@@ -293,7 +332,7 @@ ngx_module_t ngx_stream_js_module = {
NGX_STREAM_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
- NULL, /* init process */
+ ngx_stream_js_init_worker, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
@@ -647,7 +686,7 @@ ngx_stream_js_phase_handler(ngx_stream_s
return NGX_DECLINED;
}
- rc = ngx_stream_js_init_vm(s);
+ rc = ngx_stream_js_init_vm(s, 1);
if (rc != NGX_OK) {
return rc;
}
@@ -728,7 +767,7 @@ ngx_stream_js_body_filter(ngx_stream_ses
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "stream js filter u:%ui",
from_upstream);
- rc = ngx_stream_js_init_vm(s);
+ rc = ngx_stream_js_init_vm(s, 1);
if (rc == NGX_ERROR) {
return NGX_ERROR;
@@ -836,7 +875,7 @@ ngx_stream_js_variable_set(ngx_stream_se
ngx_str_t value;
ngx_stream_js_ctx_t *ctx;
- rc = ngx_stream_js_init_vm(s);
+ rc = ngx_stream_js_init_vm(s, 1);
if (rc == NGX_ERROR) {
return NGX_ERROR;
@@ -910,7 +949,7 @@ ngx_stream_js_variable_var(ngx_stream_se
static ngx_int_t
-ngx_stream_js_init_vm(ngx_stream_session_t *s)
+ngx_stream_js_init_vm(ngx_stream_session_t *s, unsigned inject_session)
{
njs_int_t rc;
njs_str_t key;
@@ -987,10 +1026,12 @@ ngx_stream_js_init_vm(ngx_stream_session
return NGX_ERROR;
}
- rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]),
- ngx_stream_js_session_proto_id, s, 0);
- if (rc != NJS_OK) {
- return NGX_ERROR;
+ if (inject_session) {
+ rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]),
+ ngx_stream_js_session_proto_id, s, 0);
+ if (rc != NJS_OK) {
+ return NGX_ERROR;
+ }
}
return NGX_OK;
@@ -1695,12 +1736,21 @@ ngx_stream_js_handle_event(ngx_stream_se
rc = njs_vm_run(ctx->vm);
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "stream js post event handler rc: %i event: %p",
+ (ngx_int_t) rc, vm_event);
+
if (rc == NJS_ERROR) {
ngx_js_retval(ctx->vm, NULL, &exception);
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"js exception: %V", &exception);
+ if (s->health_check) {
+ ngx_stream_js_periodic_finalize(s, NGX_ERROR);
+ return;
+ }
+
ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
}
@@ -1754,6 +1804,337 @@ ngx_stream_js_init_conf_vm(ngx_conf_t *c
}
+static void
+ngx_stream_js_periodic_handler(ngx_event_t *ev)
+{
+ ngx_int_t rc;
+ ngx_msec_t timer;
+ ngx_js_periodic_t *periodic;
+ ngx_connection_t *c;
+ ngx_stream_js_ctx_t *ctx;
+ ngx_stream_session_t *s;
+ ngx_stream_core_main_conf_t *cmcf;
+
+ periodic = ev->data;
+
+ timer = periodic->interval;
+
+ if (periodic->jitter) {
+ timer += (ngx_msec_t) ngx_random() % periodic->jitter;
+ }
+
+ ngx_add_timer(&periodic->event, timer);
+
+ c = periodic->connection;
+
+ if (c != NULL) {
+ ngx_log_error(NGX_LOG_ERR, c->log, 0,
+ "js periodic \"%V\" is already running, killing previous "
+ "instance", &periodic->method);
+
+ ngx_stream_js_periodic_finalize(c->data, NGX_ERROR);
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, &periodic->log, 0,
+ "stream js periodic handler: \"%V\"", &periodic->method);
+
+ c = ngx_get_connection(0, &periodic->log);
+
+ if (c == NULL) {
+ return;
+ }
+
+ c->pool = ngx_create_pool(1024, c->log);
+ if (c->pool == NULL) {
+ goto free_connection;
+ }
+
+ s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t));
+ if (s == NULL) {
+ goto free_pool;
+ }
+
+ s->main_conf = periodic->conf_ctx->main_conf;
+ s->srv_conf = periodic->conf_ctx->srv_conf;
+
+ s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module);
+ if (s->ctx == NULL) {
+ goto free_pool;
+ }
+
+ cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
+
+ s->variables = ngx_pcalloc(c->pool, cmcf->variables.nelts
+ * sizeof(ngx_stream_variable_value_t));
+ if (s->variables == NULL) {
+ goto free_pool;
+ }
+
+ c->data = s;
+ c->destroyed = 0;
+ c->read->log = &periodic->log;
+ c->read->handler = ngx_stream_js_periodic_event_handler;
+
+ s->received = 1;
+ s->connection = c;
+ s->signature = NGX_STREAM_MODULE;
+
+ s->health_check = 1;
+
+ rc = ngx_stream_js_init_vm(s, 0);
+
+ if (rc != NGX_OK) {
+ ngx_stream_js_periodic_destroy(s, periodic);
+ return;
+ }
+
+ periodic->connection = c;
+
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+ ctx->periodic = periodic;
+
+ s->received++;
+
+ rc = ngx_js_invoke(ctx->vm, &periodic->method, &periodic->log, NULL, 0,
+ &ctx->retval);
+
+ if (rc == NGX_AGAIN) {
+ rc = NGX_OK;
+ }
+
+ s->received--;
+
+ ngx_stream_js_periodic_finalize(s, rc);
+
+ return;
+
+free_pool:
+
+ ngx_destroy_pool(c->pool);
+
+free_connection:
+
+ ngx_close_connection(c);
+}
+
+
+static void
+ngx_stream_js_periodic_event_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+ ngx_stream_js_ctx_t *ctx;
+ ngx_stream_session_t *s;
+
+ c = ev->data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream js periodic event handler");
+
+ if (c->close) {
+ ngx_stream_js_periodic_finalize(c->data, NGX_ERROR);
+ return;
+ }
+
+ s = c->data;
+
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+ if (!njs_vm_pending(ctx->vm)) {
+ ngx_stream_js_periodic_finalize(s, NGX_OK);
+ return;
+ }
+}
+
+
+static void
+ngx_stream_js_periodic_finalize(ngx_stream_session_t *s, ngx_int_t rc)
+{
+ ngx_stream_js_ctx_t *ctx;
+
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+ ngx_log_debug4(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "stream js periodic finalize: \"%V\" rc: %i c: %i "
+ "pending: %i", &ctx->periodic->method, rc, s->received,
+ njs_vm_pending(ctx->vm));
+
+ if (s->received > 1 || (rc == NGX_OK && njs_vm_pending(ctx->vm))) {
+ return;
+ }
+
+ ngx_stream_js_periodic_destroy(s, ctx->periodic);
+}
+
+
+static void
+ngx_stream_js_periodic_destroy(ngx_stream_session_t *s,
+ ngx_js_periodic_t *periodic)
+{
+ ngx_connection_t *c;
+
+ c = s->connection;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream js periodic request destroy: \"%V\"",
+ &periodic->method);
+
+ periodic->connection = NULL;
+
+ ngx_free_connection(c);
+
+ ngx_destroy_pool(c->pool);
+
+ c->fd = (ngx_socket_t) -1;
+ c->pool = NULL;
+ c->destroyed = 1;
+
+ if (c->read->posted) {
+ ngx_delete_posted_event(c->read);
+ }
+}
+
+
+static ngx_int_t
+ngx_stream_js_periodic_init(ngx_js_periodic_t *periodic)
+{
+ ngx_log_t *log;
+ ngx_msec_t jitter;
+ ngx_stream_core_srv_conf_t *cscf;
+
+ cscf = ngx_stream_get_module_srv_conf(periodic->conf_ctx,
+ ngx_stream_core_module);
+ log = cscf->error_log;
+
+ ngx_memcpy(&periodic->log, log, sizeof(ngx_log_t));
+
+ periodic->connection = NULL;
+
+ periodic->event.handler = ngx_stream_js_periodic_handler;
+ periodic->event.data = periodic;
+ periodic->event.log = log;
+ periodic->event.cancelable = 1;
+
+ jitter = periodic->jitter ? (ngx_msec_t) ngx_random() % periodic->jitter
+ : 0;
+ ngx_add_timer(&periodic->event, jitter + 1);
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_js_init_worker(ngx_cycle_t *cycle)
+{
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodics;
+ ngx_js_main_conf_t *jmcf;
+
+ if ((ngx_process != NGX_PROCESS_WORKER)
+ && ngx_process != NGX_PROCESS_SINGLE)
+ {
+ return NGX_OK;
+ }
+
+ jmcf = ngx_stream_cycle_get_module_main_conf(cycle, ngx_stream_js_module);
+
+ if (jmcf == NULL || jmcf->periodics == NULL) {
+ return NGX_OK;
+ }
+
+ periodics = jmcf->periodics->elts;
+
+ for (i = 0; i < jmcf->periodics->nelts; i++) {
+ periodics[i].fd = 1000000 + i;
+
+ if (ngx_stream_js_periodic_init(&periodics[i]) != NGX_OK) {
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
+static char *
+ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_str_t *value, s;
+ ngx_msec_t interval, jitter;
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodic;
+ ngx_js_main_conf_t *jmcf;
+
+ if (cf->args->nelts < 2) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "method name is required");
+ return NGX_CONF_ERROR;
+ }
+
+ jmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_js_module);
+
+ if (jmcf->periodics == NULL) {
+ jmcf->periodics = ngx_array_create(cf->pool, 1,
+ sizeof(ngx_js_periodic_t));
+ if (jmcf->periodics == NULL) {
+ return NGX_CONF_ERROR;
+ }
+ }
+
+ periodic = ngx_array_push(jmcf->periodics);
+ if (periodic == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+
+ jitter = 0;
+ interval = 5000;
+
+ value = cf->args->elts;
+
+ for (i = 2; i < cf->args->nelts; i++) {
+
+ if (ngx_strncmp(value[i].data, "interval=", 9) == 0) {
+ s.len = value[i].len - 9;
+ s.data = value[i].data + 9;
+
+ interval = ngx_parse_time(&s, 0);
+
+ if (interval == (ngx_msec_t) NGX_ERROR || interval == 0) {
+ goto invalid;
+ }
+
+ continue;
+ }
More information about the nginx-devel
mailing list