[njs] Modules: added worker_affinity parameter for js_periodic directive.

Dmitry Volyntsev xeioex at nginx.com
Tue Sep 5 16:17:39 UTC 2023


details:   https://hg.nginx.org/njs/rev/e3c442561889
branches:  
changeset: 2190:e3c442561889
user:      Dmitry Volyntsev <xeioex at nginx.com>
date:      Tue Sep 05 09:17:10 2023 -0700
description:
Modules: added worker_affinity parameter for js_periodic directive.

worker_affinity specifies on what set of workers the js_periodic handler
should be executed. By default the js_handler is executed only on worker 0.
The parameter accepts a binary mask or "all" to specify all workers.

example.conf:
worker_processes 4;

...
    location @periodics {
        # to be run at 1 minute intervals in worker 0
        js_periodic main.handler interval=60s;

        # to be run at 1 minute intervals in all the workers
        js_periodic main.handler interval=60s worker_affinity=all;

        # to be run at 1 minute intervals in workers 1 and 3
        js_periodic main.handler interval=60s worker_affinity=0101;
    }

diffstat:

 nginx/ngx_http_js_module.c   |  68 +++++++++++++++++++++++++++++++++++++++++++-
 nginx/ngx_stream_js_module.c |  68 +++++++++++++++++++++++++++++++++++++++++++-
 nginx/t/js_periodic.t        |  67 +++++++++++++++---------------------------
 nginx/t/stream_js_periodic.t |  66 +++++++++++++++---------------------------
 4 files changed, 182 insertions(+), 87 deletions(-)

diffs (587 lines):

diff -r 58d40fc80c52 -r e3c442561889 nginx/ngx_http_js_module.c
--- a/nginx/ngx_http_js_module.c	Thu Aug 31 08:24:17 2023 -0700
+++ b/nginx/ngx_http_js_module.c	Tue Sep 05 09:17:10 2023 -0700
@@ -25,7 +25,7 @@ typedef struct {
 typedef struct {
     ngx_http_conf_ctx_t   *conf_ctx;
     ngx_connection_t      *connection;
-    void                  *padding;
+    uint8_t               *worker_affinity;
 
     /**
      * fd is used for event debug and should be at the same position
@@ -4544,6 +4544,16 @@ ngx_http_js_init_worker(ngx_cycle_t *cyc
     periodics = jmcf->periodics->elts;
 
     for (i = 0; i < jmcf->periodics->nelts; i++) {
+        if (periodics[i].worker_affinity != NULL
+            && !periodics[i].worker_affinity[ngx_worker])
+        {
+            continue;
+        }
+
+        if (periodics[i].worker_affinity == NULL && ngx_worker != 0) {
+            continue;
+        }
+
         periodics[i].fd = 1000000 + i;
 
         if (ngx_http_js_periodic_init(&periodics[i]) != NGX_OK) {
@@ -4558,9 +4568,11 @@ ngx_http_js_init_worker(ngx_cycle_t *cyc
 static char *
 ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
 {
+    uint8_t             *mask;
     ngx_str_t           *value, s;
     ngx_msec_t           interval, jitter;
     ngx_uint_t           i;
+    ngx_core_conf_t     *ccf;
     ngx_js_periodic_t   *periodic;
     ngx_js_main_conf_t  *jmcf;
 
@@ -4586,6 +4598,7 @@ ngx_http_js_periodic(ngx_conf_t *cf, ngx
 
     ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
 
+    mask = NULL;
     jitter = 0;
     interval = 5000;
 
@@ -4619,6 +4632,58 @@ ngx_http_js_periodic(ngx_conf_t *cf, ngx
             continue;
         }
 
+        if (ngx_strncmp(value[i].data, "worker_affinity=", 16) == 0) {
+            s.len = value[i].len - 16;
+            s.data = value[i].data + 16;
+
+            ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
+                                                   ngx_core_module);
+
+            if (ccf->worker_processes == NGX_CONF_UNSET) {
+                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+                                   "\"worker_affinity\" is not supported "
+                                   "with unset \"worker_processes\" directive");
+                return NGX_CONF_ERROR;
+            }
+
+            mask = ngx_palloc(cf->pool, ccf->worker_processes);
+            if (mask == NULL) {
+                return NGX_CONF_ERROR;
+            }
+
+            if (ngx_strncmp(s.data, "all", 3) == 0) {
+                memset(mask, 1, ccf->worker_processes);
+                continue;
+            }
+
+            if ((size_t) ccf->worker_processes != s.len) {
+                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "the number of "
+                                   "\"worker_processes\" is not equal to the "
+                                   "size of \"worker_affinity\" mask");
+                return NGX_CONF_ERROR;
+            }
+
+            for (i = 0; i < s.len; i++) {
+                if (s.data[i] == '0') {
+                    mask[i] = 0;
+                    continue;
+                }
+
+                if (s.data[i] == '1') {
+                    mask[i] = 1;
+                    continue;
+                }
+
+                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+                          "invalid character \"%c\" in \"worker_affinity=\"",
+                          s.data[i]);
+
+                return NGX_CONF_ERROR;
+            }
+
+            continue;
+        }
+
 invalid:
 
         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
@@ -4629,6 +4694,7 @@ invalid:
     periodic->method = value[1];
     periodic->interval = interval;
     periodic->jitter = jitter;
+    periodic->worker_affinity = mask;
     periodic->conf_ctx = cf->ctx;
 
     return NGX_CONF_OK;
diff -r 58d40fc80c52 -r e3c442561889 nginx/ngx_stream_js_module.c
--- a/nginx/ngx_stream_js_module.c	Thu Aug 31 08:24:17 2023 -0700
+++ b/nginx/ngx_stream_js_module.c	Tue Sep 05 09:17:10 2023 -0700
@@ -30,7 +30,7 @@ typedef struct {
 typedef struct {
     ngx_stream_conf_ctx_t  *conf_ctx;
     ngx_connection_t       *connection;
-    void                   *padding;
+    uint8_t                *worker_affinity;
 
     /**
      * fd is used for event debug and should be at the same position
@@ -2049,6 +2049,16 @@ ngx_stream_js_init_worker(ngx_cycle_t *c
     periodics = jmcf->periodics->elts;
 
     for (i = 0; i < jmcf->periodics->nelts; i++) {
+        if (periodics[i].worker_affinity != NULL
+            && !periodics[i].worker_affinity[ngx_worker])
+        {
+            continue;
+        }
+
+        if (periodics[i].worker_affinity == NULL && ngx_worker != 0) {
+            continue;
+        }
+
         periodics[i].fd = 1000000 + i;
 
         if (ngx_stream_js_periodic_init(&periodics[i]) != NGX_OK) {
@@ -2063,9 +2073,11 @@ ngx_stream_js_init_worker(ngx_cycle_t *c
 static char *
 ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
 {
+    uint8_t             *mask;
     ngx_str_t           *value, s;
     ngx_msec_t           interval, jitter;
     ngx_uint_t           i;
+    ngx_core_conf_t     *ccf;
     ngx_js_periodic_t   *periodic;
     ngx_js_main_conf_t  *jmcf;
 
@@ -2091,6 +2103,7 @@ ngx_stream_js_periodic(ngx_conf_t *cf, n
 
     ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
 
+    mask = NULL;
     jitter = 0;
     interval = 5000;
 
@@ -2124,6 +2137,58 @@ ngx_stream_js_periodic(ngx_conf_t *cf, n
             continue;
         }
 
+        if (ngx_strncmp(value[i].data, "worker_affinity=", 16) == 0) {
+            s.len = value[i].len - 16;
+            s.data = value[i].data + 16;
+
+            ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
+                                                   ngx_core_module);
+
+            if (ccf->worker_processes == NGX_CONF_UNSET) {
+                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+                                   "\"worker_affinity\" is not supported "
+                                   "with unset \"worker_processes\" directive");
+                return NGX_CONF_ERROR;
+            }
+
+            mask = ngx_palloc(cf->pool, ccf->worker_processes);
+            if (mask == NULL) {
+                return NGX_CONF_ERROR;
+            }
+
+            if (ngx_strncmp(s.data, "all", 3) == 0) {
+                memset(mask, 1, ccf->worker_processes);
+                continue;
+            }
+
+            if ((size_t) ccf->worker_processes != s.len) {
+                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "the number of "
+                                   "\"worker_processes\" is not equal to the "
+                                   "size of \"worker_affinity\" mask");
+                return NGX_CONF_ERROR;
+            }
+
+            for (i = 0; i < s.len; i++) {
+                if (s.data[i] == '0') {
+                    mask[i] = 0;
+                    continue;
+                }
+
+                if (s.data[i] == '1') {
+                    mask[i] = 1;
+                    continue;
+                }
+
+                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+                          "invalid character \"%c\" in \"worker_affinity=\"",
+                          s.data[i]);
+
+                return NGX_CONF_ERROR;
+            }
+
+            continue;
+        }
+
 invalid:
 
         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
@@ -2134,6 +2199,7 @@ invalid:
     periodic->method = value[1];
     periodic->interval = interval;
     periodic->jitter = jitter;
+    periodic->worker_affinity = mask;
     periodic->conf_ctx = cf->ctx;
 
     return NGX_CONF_OK;
diff -r 58d40fc80c52 -r e3c442561889 nginx/t/js_periodic.t
--- a/nginx/t/js_periodic.t	Thu Aug 31 08:24:17 2023 -0700
+++ b/nginx/t/js_periodic.t	Tue Sep 05 09:17:10 2023 -0700
@@ -23,12 +23,13 @@ use Test::Nginx;
 select STDERR; $| = 1;
 select STDOUT; $| = 1;
 
-my $t = Test::Nginx->new()->has(qw/http/)
+my $t = Test::Nginx->new()->has(qw/http rewrite/)
 	->write_file_expand('nginx.conf', <<'EOF');
 
 %%TEST_GLOBALS%%
 
 daemon off;
+worker_processes 4;
 
 events {
 }
@@ -42,6 +43,7 @@ http {
 
     js_shared_dict_zone zone=nums:32k type=number;
     js_shared_dict_zone zone=strings:32k;
+    js_shared_dict_zone zone=workers:32k type=number;
 
     server {
         listen       127.0.0.1:8080;
@@ -49,11 +51,12 @@ http {
 
         location @periodic {
             js_periodic test.tick interval=30ms jitter=1ms;
-            js_periodic test.timer interval=1s;
+            js_periodic test.timer interval=1s worker_affinity=all;
             js_periodic test.overrun interval=30ms;
             js_periodic test.file interval=1s;
             js_periodic test.fetch interval=40ms;
             js_periodic test.multiple_fetches interval=1s;
+            js_periodic test.affinity interval=50ms worker_affinity=0101;
 
             js_periodic test.fetch_exception interval=1s;
             js_periodic test.tick_exception interval=1s;
@@ -69,6 +72,10 @@ http {
             return 200 'foo';
         }
 
+        location /test_affinity {
+            js_content test.test_affinity;
+        }
+
         location /test_fetch {
             js_content test.test_fetch;
         }
@@ -102,11 +109,11 @@ my $p0 = port(8080);
 $t->write_file('test.js', <<EOF);
     import fs from 'fs';
 
+    function affinity() {
+        ngx.shared.workers.set(ngx.worker_id, 1);
+    }
+
     async function fetch() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
         let body = await reply.text();
 
@@ -115,10 +122,6 @@ my $p0 = port(8080);
     }
 
     async function multiple_fetches() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
         let reply2 = await ngx.fetch('http://127.0.0.1:$p0/fetch_foo');
         let body = await reply.text();
@@ -128,18 +131,10 @@ my $p0 = port(8080);
     }
 
     async function fetch_exception() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         let reply = await ngx.fetch('garbage');
      }
 
     async function file() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
 
         await fh.write('abc');
@@ -147,27 +142,15 @@ my $p0 = port(8080);
     }
 
     async function overrun() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         setTimeout(() => {}, 100000);
     }
 
 
     function tick() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         ngx.shared.nums.incr('tick', 1);
     }
 
     function tick_exception() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         throw new Error("EXCEPTION");
     }
 
@@ -180,19 +163,11 @@ my $p0 = port(8080);
     }
 
     function timer_exception() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
         throw new Error("EXCEPTION");
     }
 
     function timeout_exception() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         setTimeout(() => {
             var v = ngx.shared.nums.get('timeout_exception') || 0;
 
@@ -206,6 +181,10 @@ my $p0 = port(8080);
         }, 1);
     }
 
+    function test_affinity(r) {
+        r.return(200, `[\${ngx.shared.workers.keys().toSorted()}]`);
+    }
+
     function test_fetch(r) {
         r.return(200, ngx.shared.strings.get('fetch').startsWith('okok'));
     }
@@ -232,18 +211,20 @@ my $p0 = port(8080);
         r.return(200, ngx.shared.nums.get('timeout_exception') >= 2);
     }
 
-    export default { fetch, fetch_exception, file, multiple_fetches, overrun,
-                     test_fetch, test_file, test_multiple_fetches, test_tick,
-                     test_timeout_exception, test_timer, tick, tick_exception,
-                     timer, timer_exception, timeout_exception };
+    export default { affinity, fetch, fetch_exception, file, multiple_fetches,
+                     overrun, test_affinity, test_fetch, test_file,
+                     test_multiple_fetches, test_tick, test_timeout_exception,
+                     test_timer, tick, tick_exception, timer, timer_exception,
+                     timeout_exception };
 EOF
 
-$t->try_run('no js_periodic')->plan(7);
+$t->try_run('no js_periodic')->plan(8);
 
 ###############################################################################
 
 select undef, undef, undef, 0.1;
 
+like(http_get('/test_affinity'), qr/\[1,3]/, 'affinity test');
 like(http_get('/test_tick'), qr/true/, '3x tick test');
 like(http_get('/test_timer'), qr/true/, 'timer test');
 like(http_get('/test_file'), qr/true/, 'file test');
diff -r 58d40fc80c52 -r e3c442561889 nginx/t/stream_js_periodic.t
--- a/nginx/t/stream_js_periodic.t	Thu Aug 31 08:24:17 2023 -0700
+++ b/nginx/t/stream_js_periodic.t	Tue Sep 05 09:17:10 2023 -0700
@@ -24,12 +24,13 @@ use Test::Nginx::Stream qw/ stream /;
 select STDERR; $| = 1;
 select STDOUT; $| = 1;
 
-my $t = Test::Nginx->new()->has(qw/http stream/)
+my $t = Test::Nginx->new()->has(qw/http rewrite stream/)
 	->write_file_expand('nginx.conf', <<'EOF');
 
 %%TEST_GLOBALS%%
 
 daemon off;
+worker_processes 4;
 
 events {
 }
@@ -43,16 +44,18 @@ stream {
 
     js_shared_dict_zone zone=nums:32k type=number;
     js_shared_dict_zone zone=strings:32k;
+    js_shared_dict_zone zone=workers:32k type=number;
 
     server {
         listen       127.0.0.1:8080;
 
         js_periodic test.tick interval=30ms jitter=1ms;
-        js_periodic test.timer interval=1s;
+        js_periodic test.timer interval=1s worker_affinity=all;
         js_periodic test.overrun interval=30ms;
         js_periodic test.file interval=1s;
         js_periodic test.fetch interval=40ms;
         js_periodic test.multiple_fetches interval=1s;
+        js_periodic test.affinity interval=50ms worker_affinity=0101;
 
         js_periodic test.fetch_exception interval=1s;
         js_periodic test.tick_exception interval=1s;
@@ -89,11 +92,11 @@ my $p1 = port(8081);
 $t->write_file('test.js', <<EOF);
     import fs from 'fs';
 
+    function affinity() {
+        ngx.shared.workers.set(ngx.worker_id, 1);
+    }
+
     async function fetch() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
         let body = await reply.text();
 
@@ -102,18 +105,10 @@ my $p1 = port(8081);
     }
 
     async function fetch_exception() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         let reply = await ngx.fetch('garbage');
      }
 
     async function multiple_fetches() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
         let reply2 = await ngx.fetch('http://127.0.0.1:$p1/fetch_foo');
         let body = await reply.text();
@@ -123,10 +118,6 @@ my $p1 = port(8081);
     }
 
     async function file() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
 
         await fh.write('abc');
@@ -134,26 +125,14 @@ my $p1 = port(8081);
     }
 
     async function overrun() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         setTimeout(() => {}, 100000);
     }
 
     function tick() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         ngx.shared.nums.incr('tick', 1);
     }
 
     function tick_exception() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         throw new Error("EXCEPTION");
     }
 
@@ -166,19 +145,11 @@ my $p1 = port(8081);
     }
 
     function timer_exception() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
         throw new Error("EXCEPTION");
     }
 
     function timeout_exception() {
-        if (ngx.worker_id != 0) {
-            return;
-        }
-
         setTimeout(() => {
             var v = ngx.shared.nums.get('timeout_exception') || 0;
 
@@ -196,6 +167,15 @@ my $p1 = port(8081);
         s.on('upload', function (data) {
             if (data.length > 0) {
                 switch (data) {
+                case 'affinity':
+                    if (ngx.shared.workers.keys().toSorted().toString()
+                        == '1,3')
+                    {
+                        s.done();
+                        return;
+                    }
+
+                    break;
                 case 'fetch':
                     if (ngx.shared.strings.get('fetch').startsWith('okok')) {
                         s.done();
@@ -258,19 +238,21 @@ my $p1 = port(8081);
         });
     }
 
-    export default { fetch, fetch_exception, multiple_fetches, file, overrun,
-                     test, tick, tick_exception, timer, timer_exception,
-                     timeout_exception };
+    export default { affinity, fetch, fetch_exception, multiple_fetches, file,
+                     overrun, test, tick, tick_exception, timer,
+                     timer_exception, timeout_exception };
 EOF
 
 $t->run_daemon(\&stream_daemon, port(8090));
-$t->try_run('no js_periodic')->plan(7);
+$t->try_run('no js_periodic')->plan(8);
 $t->waitforsocket('127.0.0.1:' . port(8090));
 
 ###############################################################################
 
 select undef, undef, undef, 0.1;
 
+is(stream('127.0.0.1:' . port(8080))->io('affinity'), 'affinity',
+	'affinity test');
 is(stream('127.0.0.1:' . port(8080))->io('tick'), 'tick', '3x tick test');
 is(stream('127.0.0.1:' . port(8080))->io('timer'), 'timer', 'timer test');
 is(stream('127.0.0.1:' . port(8080))->io('file'), 'file', 'file test');


More information about the nginx-devel mailing list