[njs] Modules: added worker_affinity parameter for js_periodic directive.
Dmitry Volyntsev
xeioex at nginx.com
Tue Sep 5 16:17:39 UTC 2023
details: https://hg.nginx.org/njs/rev/e3c442561889
branches:
changeset: 2190:e3c442561889
user: Dmitry Volyntsev <xeioex at nginx.com>
date: Tue Sep 05 09:17:10 2023 -0700
description:
Modules: added worker_affinity parameter for js_periodic directive.
worker_affinity specifies on what set of workers the js_periodic handler
should be executed. By default the js_handler is executed only on worker 0.
The parameter accepts a binary mask or "all" to specify all workers.
example.conf:
worker_processes 4;
...
location @periodics {
# to be run at 1 minute intervals in worker 0
js_periodic main.handler interval=60s;
# to be run at 1 minute intervals in all the workers
js_periodic main.handler interval=60s worker_affinity=all;
# to be run at 1 minute intervals in workers 1 and 3
js_periodic main.handler interval=60s worker_affinity=0101;
}
diffstat:
nginx/ngx_http_js_module.c | 68 +++++++++++++++++++++++++++++++++++++++++++-
nginx/ngx_stream_js_module.c | 68 +++++++++++++++++++++++++++++++++++++++++++-
nginx/t/js_periodic.t | 67 +++++++++++++++---------------------------
nginx/t/stream_js_periodic.t | 66 +++++++++++++++---------------------------
4 files changed, 182 insertions(+), 87 deletions(-)
diffs (587 lines):
diff -r 58d40fc80c52 -r e3c442561889 nginx/ngx_http_js_module.c
--- a/nginx/ngx_http_js_module.c Thu Aug 31 08:24:17 2023 -0700
+++ b/nginx/ngx_http_js_module.c Tue Sep 05 09:17:10 2023 -0700
@@ -25,7 +25,7 @@ typedef struct {
typedef struct {
ngx_http_conf_ctx_t *conf_ctx;
ngx_connection_t *connection;
- void *padding;
+ uint8_t *worker_affinity;
/**
* fd is used for event debug and should be at the same position
@@ -4544,6 +4544,16 @@ ngx_http_js_init_worker(ngx_cycle_t *cyc
periodics = jmcf->periodics->elts;
for (i = 0; i < jmcf->periodics->nelts; i++) {
+ if (periodics[i].worker_affinity != NULL
+ && !periodics[i].worker_affinity[ngx_worker])
+ {
+ continue;
+ }
+
+ if (periodics[i].worker_affinity == NULL && ngx_worker != 0) {
+ continue;
+ }
+
periodics[i].fd = 1000000 + i;
if (ngx_http_js_periodic_init(&periodics[i]) != NGX_OK) {
@@ -4558,9 +4568,11 @@ ngx_http_js_init_worker(ngx_cycle_t *cyc
static char *
ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
+ uint8_t *mask;
ngx_str_t *value, s;
ngx_msec_t interval, jitter;
ngx_uint_t i;
+ ngx_core_conf_t *ccf;
ngx_js_periodic_t *periodic;
ngx_js_main_conf_t *jmcf;
@@ -4586,6 +4598,7 @@ ngx_http_js_periodic(ngx_conf_t *cf, ngx
ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+ mask = NULL;
jitter = 0;
interval = 5000;
@@ -4619,6 +4632,58 @@ ngx_http_js_periodic(ngx_conf_t *cf, ngx
continue;
}
+ if (ngx_strncmp(value[i].data, "worker_affinity=", 16) == 0) {
+ s.len = value[i].len - 16;
+ s.data = value[i].data + 16;
+
+ ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
+ ngx_core_module);
+
+ if (ccf->worker_processes == NGX_CONF_UNSET) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "\"worker_affinity\" is not supported "
+ "with unset \"worker_processes\" directive");
+ return NGX_CONF_ERROR;
+ }
+
+ mask = ngx_palloc(cf->pool, ccf->worker_processes);
+ if (mask == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ if (ngx_strncmp(s.data, "all", 3) == 0) {
+ memset(mask, 1, ccf->worker_processes);
+ continue;
+ }
+
+ if ((size_t) ccf->worker_processes != s.len) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "the number of "
+ "\"worker_processes\" is not equal to the "
+ "size of \"worker_affinity\" mask");
+ return NGX_CONF_ERROR;
+ }
+
+ for (i = 0; i < s.len; i++) {
+ if (s.data[i] == '0') {
+ mask[i] = 0;
+ continue;
+ }
+
+ if (s.data[i] == '1') {
+ mask[i] = 1;
+ continue;
+ }
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid character \"%c\" in \"worker_affinity=\"",
+ s.data[i]);
+
+ return NGX_CONF_ERROR;
+ }
+
+ continue;
+ }
+
invalid:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
@@ -4629,6 +4694,7 @@ invalid:
periodic->method = value[1];
periodic->interval = interval;
periodic->jitter = jitter;
+ periodic->worker_affinity = mask;
periodic->conf_ctx = cf->ctx;
return NGX_CONF_OK;
diff -r 58d40fc80c52 -r e3c442561889 nginx/ngx_stream_js_module.c
--- a/nginx/ngx_stream_js_module.c Thu Aug 31 08:24:17 2023 -0700
+++ b/nginx/ngx_stream_js_module.c Tue Sep 05 09:17:10 2023 -0700
@@ -30,7 +30,7 @@ typedef struct {
typedef struct {
ngx_stream_conf_ctx_t *conf_ctx;
ngx_connection_t *connection;
- void *padding;
+ uint8_t *worker_affinity;
/**
* fd is used for event debug and should be at the same position
@@ -2049,6 +2049,16 @@ ngx_stream_js_init_worker(ngx_cycle_t *c
periodics = jmcf->periodics->elts;
for (i = 0; i < jmcf->periodics->nelts; i++) {
+ if (periodics[i].worker_affinity != NULL
+ && !periodics[i].worker_affinity[ngx_worker])
+ {
+ continue;
+ }
+
+ if (periodics[i].worker_affinity == NULL && ngx_worker != 0) {
+ continue;
+ }
+
periodics[i].fd = 1000000 + i;
if (ngx_stream_js_periodic_init(&periodics[i]) != NGX_OK) {
@@ -2063,9 +2073,11 @@ ngx_stream_js_init_worker(ngx_cycle_t *c
static char *
ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
+ uint8_t *mask;
ngx_str_t *value, s;
ngx_msec_t interval, jitter;
ngx_uint_t i;
+ ngx_core_conf_t *ccf;
ngx_js_periodic_t *periodic;
ngx_js_main_conf_t *jmcf;
@@ -2091,6 +2103,7 @@ ngx_stream_js_periodic(ngx_conf_t *cf, n
ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+ mask = NULL;
jitter = 0;
interval = 5000;
@@ -2124,6 +2137,58 @@ ngx_stream_js_periodic(ngx_conf_t *cf, n
continue;
}
+ if (ngx_strncmp(value[i].data, "worker_affinity=", 16) == 0) {
+ s.len = value[i].len - 16;
+ s.data = value[i].data + 16;
+
+ ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
+ ngx_core_module);
+
+ if (ccf->worker_processes == NGX_CONF_UNSET) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "\"worker_affinity\" is not supported "
+ "with unset \"worker_processes\" directive");
+ return NGX_CONF_ERROR;
+ }
+
+ mask = ngx_palloc(cf->pool, ccf->worker_processes);
+ if (mask == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ if (ngx_strncmp(s.data, "all", 3) == 0) {
+ memset(mask, 1, ccf->worker_processes);
+ continue;
+ }
+
+ if ((size_t) ccf->worker_processes != s.len) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "the number of "
+ "\"worker_processes\" is not equal to the "
+ "size of \"worker_affinity\" mask");
+ return NGX_CONF_ERROR;
+ }
+
+ for (i = 0; i < s.len; i++) {
+ if (s.data[i] == '0') {
+ mask[i] = 0;
+ continue;
+ }
+
+ if (s.data[i] == '1') {
+ mask[i] = 1;
+ continue;
+ }
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid character \"%c\" in \"worker_affinity=\"",
+ s.data[i]);
+
+ return NGX_CONF_ERROR;
+ }
+
+ continue;
+ }
+
invalid:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
@@ -2134,6 +2199,7 @@ invalid:
periodic->method = value[1];
periodic->interval = interval;
periodic->jitter = jitter;
+ periodic->worker_affinity = mask;
periodic->conf_ctx = cf->ctx;
return NGX_CONF_OK;
diff -r 58d40fc80c52 -r e3c442561889 nginx/t/js_periodic.t
--- a/nginx/t/js_periodic.t Thu Aug 31 08:24:17 2023 -0700
+++ b/nginx/t/js_periodic.t Tue Sep 05 09:17:10 2023 -0700
@@ -23,12 +23,13 @@ use Test::Nginx;
select STDERR; $| = 1;
select STDOUT; $| = 1;
-my $t = Test::Nginx->new()->has(qw/http/)
+my $t = Test::Nginx->new()->has(qw/http rewrite/)
->write_file_expand('nginx.conf', <<'EOF');
%%TEST_GLOBALS%%
daemon off;
+worker_processes 4;
events {
}
@@ -42,6 +43,7 @@ http {
js_shared_dict_zone zone=nums:32k type=number;
js_shared_dict_zone zone=strings:32k;
+ js_shared_dict_zone zone=workers:32k type=number;
server {
listen 127.0.0.1:8080;
@@ -49,11 +51,12 @@ http {
location @periodic {
js_periodic test.tick interval=30ms jitter=1ms;
- js_periodic test.timer interval=1s;
+ js_periodic test.timer interval=1s worker_affinity=all;
js_periodic test.overrun interval=30ms;
js_periodic test.file interval=1s;
js_periodic test.fetch interval=40ms;
js_periodic test.multiple_fetches interval=1s;
+ js_periodic test.affinity interval=50ms worker_affinity=0101;
js_periodic test.fetch_exception interval=1s;
js_periodic test.tick_exception interval=1s;
@@ -69,6 +72,10 @@ http {
return 200 'foo';
}
+ location /test_affinity {
+ js_content test.test_affinity;
+ }
+
location /test_fetch {
js_content test.test_fetch;
}
@@ -102,11 +109,11 @@ my $p0 = port(8080);
$t->write_file('test.js', <<EOF);
import fs from 'fs';
+ function affinity() {
+ ngx.shared.workers.set(ngx.worker_id, 1);
+ }
+
async function fetch() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
let body = await reply.text();
@@ -115,10 +122,6 @@ my $p0 = port(8080);
}
async function multiple_fetches() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
let reply2 = await ngx.fetch('http://127.0.0.1:$p0/fetch_foo');
let body = await reply.text();
@@ -128,18 +131,10 @@ my $p0 = port(8080);
}
async function fetch_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('garbage');
}
async function file() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
await fh.write('abc');
@@ -147,27 +142,15 @@ my $p0 = port(8080);
}
async function overrun() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {}, 100000);
}
function tick() {
- if (ngx.worker_id != 0) {
- return;
- }
-
ngx.shared.nums.incr('tick', 1);
}
function tick_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
throw new Error("EXCEPTION");
}
@@ -180,19 +163,11 @@ my $p0 = port(8080);
}
function timer_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
throw new Error("EXCEPTION");
}
function timeout_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {
var v = ngx.shared.nums.get('timeout_exception') || 0;
@@ -206,6 +181,10 @@ my $p0 = port(8080);
}, 1);
}
+ function test_affinity(r) {
+ r.return(200, `[\${ngx.shared.workers.keys().toSorted()}]`);
+ }
+
function test_fetch(r) {
r.return(200, ngx.shared.strings.get('fetch').startsWith('okok'));
}
@@ -232,18 +211,20 @@ my $p0 = port(8080);
r.return(200, ngx.shared.nums.get('timeout_exception') >= 2);
}
- export default { fetch, fetch_exception, file, multiple_fetches, overrun,
- test_fetch, test_file, test_multiple_fetches, test_tick,
- test_timeout_exception, test_timer, tick, tick_exception,
- timer, timer_exception, timeout_exception };
+ export default { affinity, fetch, fetch_exception, file, multiple_fetches,
+ overrun, test_affinity, test_fetch, test_file,
+ test_multiple_fetches, test_tick, test_timeout_exception,
+ test_timer, tick, tick_exception, timer, timer_exception,
+ timeout_exception };
EOF
-$t->try_run('no js_periodic')->plan(7);
+$t->try_run('no js_periodic')->plan(8);
###############################################################################
select undef, undef, undef, 0.1;
+like(http_get('/test_affinity'), qr/\[1,3]/, 'affinity test');
like(http_get('/test_tick'), qr/true/, '3x tick test');
like(http_get('/test_timer'), qr/true/, 'timer test');
like(http_get('/test_file'), qr/true/, 'file test');
diff -r 58d40fc80c52 -r e3c442561889 nginx/t/stream_js_periodic.t
--- a/nginx/t/stream_js_periodic.t Thu Aug 31 08:24:17 2023 -0700
+++ b/nginx/t/stream_js_periodic.t Tue Sep 05 09:17:10 2023 -0700
@@ -24,12 +24,13 @@ use Test::Nginx::Stream qw/ stream /;
select STDERR; $| = 1;
select STDOUT; $| = 1;
-my $t = Test::Nginx->new()->has(qw/http stream/)
+my $t = Test::Nginx->new()->has(qw/http rewrite stream/)
->write_file_expand('nginx.conf', <<'EOF');
%%TEST_GLOBALS%%
daemon off;
+worker_processes 4;
events {
}
@@ -43,16 +44,18 @@ stream {
js_shared_dict_zone zone=nums:32k type=number;
js_shared_dict_zone zone=strings:32k;
+ js_shared_dict_zone zone=workers:32k type=number;
server {
listen 127.0.0.1:8080;
js_periodic test.tick interval=30ms jitter=1ms;
- js_periodic test.timer interval=1s;
+ js_periodic test.timer interval=1s worker_affinity=all;
js_periodic test.overrun interval=30ms;
js_periodic test.file interval=1s;
js_periodic test.fetch interval=40ms;
js_periodic test.multiple_fetches interval=1s;
+ js_periodic test.affinity interval=50ms worker_affinity=0101;
js_periodic test.fetch_exception interval=1s;
js_periodic test.tick_exception interval=1s;
@@ -89,11 +92,11 @@ my $p1 = port(8081);
$t->write_file('test.js', <<EOF);
import fs from 'fs';
+ function affinity() {
+ ngx.shared.workers.set(ngx.worker_id, 1);
+ }
+
async function fetch() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
let body = await reply.text();
@@ -102,18 +105,10 @@ my $p1 = port(8081);
}
async function fetch_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('garbage');
}
async function multiple_fetches() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
let reply2 = await ngx.fetch('http://127.0.0.1:$p1/fetch_foo');
let body = await reply.text();
@@ -123,10 +118,6 @@ my $p1 = port(8081);
}
async function file() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
await fh.write('abc');
@@ -134,26 +125,14 @@ my $p1 = port(8081);
}
async function overrun() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {}, 100000);
}
function tick() {
- if (ngx.worker_id != 0) {
- return;
- }
-
ngx.shared.nums.incr('tick', 1);
}
function tick_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
throw new Error("EXCEPTION");
}
@@ -166,19 +145,11 @@ my $p1 = port(8081);
}
function timer_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
throw new Error("EXCEPTION");
}
function timeout_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {
var v = ngx.shared.nums.get('timeout_exception') || 0;
@@ -196,6 +167,15 @@ my $p1 = port(8081);
s.on('upload', function (data) {
if (data.length > 0) {
switch (data) {
+ case 'affinity':
+ if (ngx.shared.workers.keys().toSorted().toString()
+ == '1,3')
+ {
+ s.done();
+ return;
+ }
+
+ break;
case 'fetch':
if (ngx.shared.strings.get('fetch').startsWith('okok')) {
s.done();
@@ -258,19 +238,21 @@ my $p1 = port(8081);
});
}
- export default { fetch, fetch_exception, multiple_fetches, file, overrun,
- test, tick, tick_exception, timer, timer_exception,
- timeout_exception };
+ export default { affinity, fetch, fetch_exception, multiple_fetches, file,
+ overrun, test, tick, tick_exception, timer,
+ timer_exception, timeout_exception };
EOF
$t->run_daemon(\&stream_daemon, port(8090));
-$t->try_run('no js_periodic')->plan(7);
+$t->try_run('no js_periodic')->plan(8);
$t->waitforsocket('127.0.0.1:' . port(8090));
###############################################################################
select undef, undef, undef, 0.1;
+is(stream('127.0.0.1:' . port(8080))->io('affinity'), 'affinity',
+ 'affinity test');
is(stream('127.0.0.1:' . port(8080))->io('tick'), 'tick', '3x tick test');
is(stream('127.0.0.1:' . port(8080))->io('timer'), 'timer', 'timer test');
is(stream('127.0.0.1:' . port(8080))->io('file'), 'file', 'file test');
More information about the nginx-devel
mailing list