[njs] Nginx stream module refactored.

Dmitry Volyntsev xeioex at nginx.com
Tue Sep 11 12:36:07 UTC 2018


details:   http://hg.nginx.org/njs/rev/bbec3cdb747b
branches:  
changeset: 601:bbec3cdb747b
user:      Dmitry Volyntsev <xeioex at nginx.com>
date:      Tue Sep 11 15:34:50 2018 +0300
description:
Nginx stream module refactored.

The module handlers are refactored to make them similar to the HTTP
ones. This change is not backward compatible with the existing njs code.

To allow asynchronous operations the model of evaluation of njs handlers
is changed. A handler function (which is set for a corresponding nginx
directive) is invoked only once. It can register a callback if more data
is necessary, by calling s.on(<event_name>, <callback>). Available
events:
    upload - new data from a client.
    download - new data from an upstream.

Callback prototype:
    callback(data, flags).
    data - string.
    flags - object.
        Available properties:
            last - boolean, data is a last buffer.

A callback can be deregistered by s.off(<event_name>).


Return codes are replaced with special methods: s.allow(), s.deny(),
s.done().
    s.done([code]) (s.OK), can be used to return arbitrary code.
    s.allow() (s.OK)
    s.deny() (s.ABORT)

In addition, s.decline() method is added to allow handlers to stop
processing of the current handler and pass control to the next
handler of the current phase.

A handler is expected to invoke one of these methods when the
processing is done.


For example js_preread can wait for additional data by registering
a callback which will be called whenever new incoming data appears.

function js_preread(s) {
    s.on('upload', function(data, flags) {
            // process data

            // to proceed to the next phase
            s.done()

            // to proceed to next handler
            // of the current phase
            s.decline()
    })
}

js_filter handler is refactored.
    1) The current data chunk is moved from s.buffer to
        the callback argument.
    2) s.fromUpstream is removed.
    3)  The properties related to the current data chunk
        (s.eof) are put into the second callback argument.
        s.eof is renamed to flags.last.
    3) s.send(data[, opts]) is added to replace s.buffer = data;
        opts - object, can be used to override nginx buffer flags
            derived from an incoming data chunk buffer.
            It can contain boolean flags: last, flush.
    4) data toward corresponding direction is not forwarded
        if a callback is active, a callback is expected to
        call s.send(data, flags) if it wants to pass data
        as is.

    5) s.send() can be called multiple times per callback invocation.

    function stream_filter(s) {

        s.on('upload', function (data, flags) {
                // process a data chunk from a client

                // stop filtering data
                s.off('upload')
            })

        s.on('download', function (data, flags) {
                // process data from upstream

                // send my_data as the last buffer.
                s.send(my_data, {last:1});
            })
    }

diffstat:

 nginx/ngx_stream_js_module.c |  933 +++++++++++++++++++++++++++++-------------
 1 files changed, 635 insertions(+), 298 deletions(-)

diffs (truncated from 1184 to 1000 lines):

diff -r 48267f0ebab3 -r bbec3cdb747b nginx/ngx_stream_js_module.c
--- a/nginx/ngx_stream_js_module.c	Tue Sep 11 15:34:48 2018 +0300
+++ b/nginx/ngx_stream_js_module.c	Tue Sep 11 15:34:50 2018 +0300
@@ -1,6 +1,7 @@
 
 /*
  * Copyright (C) Roman Arutyunyan
+ * Copyright (C) Dmitry Volyntsev
  * Copyright (C) NGINX, Inc.
  */
 
@@ -26,16 +27,29 @@ typedef struct {
 
 
 typedef struct {
-    njs_vm_t              *vm;
-    ngx_log_t             *log;
-    njs_opaque_value_t     arg;
-    ngx_buf_t             *buf;
-    ngx_chain_t           *free;
-    ngx_chain_t           *busy;
+    njs_vm_t               *vm;
+    ngx_log_t              *log;
+    njs_opaque_value_t      args[3];
+    ngx_buf_t              *buf;
+    ngx_chain_t           **last_out;
+    ngx_chain_t            *free;
+    ngx_chain_t            *busy;
+    ngx_stream_session_t   *session;
+    ngx_int_t               status;
+    njs_vm_event_t          upload_event;
+    njs_vm_event_t          download_event;
+    unsigned                from_upstream:1;
+    unsigned                filter:1;
+    unsigned                in_progress:1;
+} ngx_stream_js_ctx_t;
+
+
+typedef struct {
     ngx_stream_session_t  *session;
-    unsigned               from_upstream:1;
-    unsigned               filter:1;
-} ngx_stream_js_ctx_t;
+    njs_vm_event_t         vm_event;
+    void                  *unused;
+    ngx_int_t              ident;
+} ngx_stream_js_event_t;
 
 
 static ngx_int_t ngx_stream_js_access_handler(ngx_stream_session_t *s);
@@ -49,17 +63,24 @@ static ngx_int_t ngx_stream_js_variable(
 static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s);
 static void ngx_stream_js_cleanup_ctx(void *data);
 static void ngx_stream_js_cleanup_vm(void *data);
+static njs_ret_t ngx_stream_js_buffer_arg(ngx_stream_session_t *s,
+    njs_value_t *buffer);
+static njs_ret_t ngx_stream_js_flags_arg(ngx_stream_session_t *s,
+    njs_value_t *flags);
+static njs_vm_event_t *ngx_stream_js_event(ngx_stream_session_t *s,
+    nxt_str_t *event);
 
 static njs_ret_t ngx_stream_js_ext_get_remote_address(njs_vm_t *vm,
     njs_value_t *value, void *obj, uintptr_t data);
-static njs_ret_t ngx_stream_js_ext_get_eof(njs_vm_t *vm, njs_value_t *value,
-    void *obj, uintptr_t data);
-static njs_ret_t ngx_stream_js_ext_get_from_upstream(njs_vm_t *vm,
-    njs_value_t *value, void *obj, uintptr_t data);
-static njs_ret_t ngx_stream_js_ext_get_buffer(njs_vm_t *vm, njs_value_t *value,
-    void *obj, uintptr_t data);
-static njs_ret_t ngx_stream_js_ext_set_buffer(njs_vm_t *vm, void *obj,
-    uintptr_t data, nxt_str_t *value);
+
+static njs_ret_t ngx_stream_js_ext_done(njs_vm_t *vm, njs_value_t *args,
+     nxt_uint_t nargs, njs_index_t unused);
+static njs_ret_t ngx_stream_js_ext_deny(njs_vm_t *vm, njs_value_t *args,
+     nxt_uint_t nargs, njs_index_t unused);
+static njs_ret_t ngx_stream_js_ext_decline(njs_vm_t *vm, njs_value_t *args,
+     nxt_uint_t nargs, njs_index_t unused);
+static njs_ret_t ngx_stream_js_ext_set_status(njs_vm_t *vm, njs_value_t *args,
+    nxt_uint_t nargs, ngx_int_t status);
 
 static njs_ret_t ngx_stream_js_ext_log(njs_vm_t *vm, njs_value_t *args,
      nxt_uint_t nargs, njs_index_t unused);
@@ -69,11 +90,23 @@ static njs_ret_t ngx_stream_js_ext_error
      nxt_uint_t nargs, njs_index_t unused);
 static njs_ret_t ngx_stream_js_ext_log_core(njs_vm_t *vm, njs_value_t *args,
     nxt_uint_t nargs, ngx_uint_t level);
+static njs_ret_t ngx_stream_js_ext_on(njs_vm_t *vm, njs_value_t *args,
+     nxt_uint_t nargs, njs_index_t unused);
+static njs_ret_t ngx_stream_js_ext_off(njs_vm_t *vm, njs_value_t *args,
+     nxt_uint_t nargs, njs_index_t unused);
+static njs_ret_t ngx_stream_js_ext_send(njs_vm_t *vm, njs_value_t *args,
+     nxt_uint_t nargs, njs_index_t unused);
 
 static njs_ret_t ngx_stream_js_ext_get_variable(njs_vm_t *vm,
     njs_value_t *value, void *obj, uintptr_t data);
-static njs_ret_t ngx_stream_js_ext_get_code(njs_vm_t *vm,
-    njs_value_t *value, void *obj, uintptr_t data);
+
+static njs_host_event_t ngx_stream_js_set_timer(njs_external_ptr_t external,
+    uint64_t delay, njs_vm_event_t vm_event);
+static void ngx_stream_js_clear_timer(njs_external_ptr_t external,
+    njs_host_event_t event);
+static void ngx_stream_js_timer_handler(ngx_event_t *ev);
+static void ngx_stream_js_handle_event(ngx_stream_session_t *s,
+    njs_vm_event_t vm_event, njs_value_t *args, nxt_uint_t nargs);
 
 static char *ngx_stream_js_include(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf);
@@ -169,43 +202,6 @@ static njs_external_t  ngx_stream_js_ext
       NULL,
       0 },
 
-    { nxt_string("eof"),
-      NJS_EXTERN_PROPERTY,
-      NULL,
-      0,
-      ngx_stream_js_ext_get_eof,
-      NULL,
-      NULL,
-      NULL,
-      NULL,
-      NULL,
-      0 },
-
-    { nxt_string("fromUpstream"),
-      NJS_EXTERN_PROPERTY,
-      NULL,
-      0,
-      ngx_stream_js_ext_get_from_upstream,
-      NULL,
-      NULL,
-      NULL,
-      NULL,
-      NULL,
-      0 },
-
-    { nxt_string("buffer"),
-      NJS_EXTERN_PROPERTY,
-      NULL,
-      0,
-      ngx_stream_js_ext_get_buffer,
-      ngx_stream_js_ext_set_buffer,
-      NULL,
-      NULL,
-      NULL,
-      NULL,
-      0 },
-
-
     { nxt_string("variables"),
       NJS_EXTERN_OBJECT,
       NULL,
@@ -218,6 +214,54 @@ static njs_external_t  ngx_stream_js_ext
       NULL,
       0 },
 
+    { nxt_string("allow"),
+      NJS_EXTERN_METHOD,
+      NULL,
+      0,
+      NULL,
+      NULL,
+      NULL,
+      NULL,
+      NULL,
+      ngx_stream_js_ext_done,
+      0 },
+
+    { nxt_string("deny"),
+      NJS_EXTERN_METHOD,
+      NULL,
+      0,
+      NULL,
+      NULL,
+      NULL,
+      NULL,
+      NULL,
+      ngx_stream_js_ext_deny,
+      0 },
+
+    { nxt_string("decline"),
+      NJS_EXTERN_METHOD,
+      NULL,
+      0,
+      NULL,
+      NULL,
+      NULL,
+      NULL,
+      NULL,
+      ngx_stream_js_ext_decline,
+      0 },
+
+    { nxt_string("done"),
+      NJS_EXTERN_METHOD,
+      NULL,
+      0,
+      NULL,
+      NULL,
+      NULL,
+      NULL,
+      NULL,
+      ngx_stream_js_ext_done,
+      0 },
+
     { nxt_string("log"),
       NJS_EXTERN_METHOD,
       NULL,
@@ -254,65 +298,42 @@ static njs_external_t  ngx_stream_js_ext
       ngx_stream_js_ext_error,
       0 },
 
-    { nxt_string("OK"),
-      NJS_EXTERN_PROPERTY,
+    { nxt_string("on"),
+      NJS_EXTERN_METHOD,
       NULL,
       0,
-      ngx_stream_js_ext_get_code,
-      NULL,
-      NULL,
-      NULL,
-      NULL,
-      NULL,
-      -NGX_OK },
-
-    { nxt_string("DECLINED"),
-      NJS_EXTERN_PROPERTY,
-      NULL,
-      0,
-      ngx_stream_js_ext_get_code,
       NULL,
       NULL,
       NULL,
       NULL,
       NULL,
-      -NGX_DECLINED },
+      ngx_stream_js_ext_on,
+      0 },
 
-    { nxt_string("AGAIN"),
-      NJS_EXTERN_PROPERTY,
+    { nxt_string("off"),
+      NJS_EXTERN_METHOD,
       NULL,
       0,
-      ngx_stream_js_ext_get_code,
       NULL,
       NULL,
       NULL,
       NULL,
       NULL,
-      -NGX_AGAIN },
+      ngx_stream_js_ext_off,
+      0 },
 
-    { nxt_string("ERROR"),
-      NJS_EXTERN_PROPERTY,
+    { nxt_string("send"),
+      NJS_EXTERN_METHOD,
       NULL,
       0,
-      ngx_stream_js_ext_get_code,
       NULL,
       NULL,
       NULL,
       NULL,
       NULL,
-      -NGX_ERROR },
+      ngx_stream_js_ext_send,
+      0 },
 
-    { nxt_string("ABORT"),
-      NJS_EXTERN_PROPERTY,
-      NULL,
-      0,
-      ngx_stream_js_ext_get_code,
-      NULL,
-      NULL,
-      NULL,
-      NULL,
-      NULL,
-      -NGX_ABORT },
 };
 
 
@@ -332,13 +353,18 @@ static njs_external_t  ngx_stream_js_ext
 };
 
 
+static njs_vm_ops_t ngx_stream_js_ops = {
+    ngx_stream_js_set_timer,
+    ngx_stream_js_clear_timer
+};
+
+
 static ngx_stream_filter_pt  ngx_stream_next_filter;
 
 
 static ngx_int_t
 ngx_stream_js_access_handler(ngx_stream_session_t *s)
 {
-    ngx_int_t                  rc;
     ngx_stream_js_srv_conf_t  *jscf;
 
     ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
@@ -346,15 +372,7 @@ ngx_stream_js_access_handler(ngx_stream_
 
     jscf = ngx_stream_get_module_srv_conf(s, ngx_stream_js_module);
 
-    rc = ngx_stream_js_phase_handler(s, &jscf->access);
-
-    if (rc == NGX_ABORT) {
-        ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
-                      "access forbidden by js");
-        rc = NGX_STREAM_FORBIDDEN;
-    }
-
-    return rc;
+    return ngx_stream_js_phase_handler(s, &jscf->access);
 }
 
 
@@ -375,76 +393,111 @@ ngx_stream_js_preread_handler(ngx_stream
 static ngx_int_t
 ngx_stream_js_phase_handler(ngx_stream_session_t *s, ngx_str_t *name)
 {
-    nxt_str_t                  fname, value, exception;
-    ngx_int_t                  rc;
-    njs_function_t            *func;
-    ngx_connection_t          *c;
-    ngx_stream_js_ctx_t       *ctx;
+    nxt_str_t             fname, exception;
+    njs_ret_t             ret;
+    ngx_int_t             rc;
+    njs_function_t       *func;
+    ngx_connection_t     *c;
+    ngx_stream_js_ctx_t  *ctx;
 
     if (name->len == 0) {
         return NGX_DECLINED;
     }
 
-    c = s->connection;
-
     rc = ngx_stream_js_init_vm(s);
     if (rc != NGX_OK) {
         return rc;
     }
 
+    c = s->connection;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                   "http js phase call \"%V\"", name);
+
     ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
 
-    fname.start = name->data;
-    fname.length = name->len;
+    if (!ctx->in_progress) {
+        fname.start = name->data;
+        fname.length = name->len;
+
+        func = njs_vm_function(ctx->vm, &fname);
 
-    func = njs_vm_function(ctx->vm, &fname);
+        if (func == NULL) {
+            ngx_log_error(NGX_LOG_ERR, c->log, 0,
+                          "js function \"%V\" not found", name);
+            return NGX_ERROR;
+        }
 
-    if (func == NULL) {
-        ngx_log_error(NGX_LOG_ERR, c->log, 0, "js function \"%V\" not found",
-                      name);
-        return NGX_ERROR;
-    }
+        /*
+         * status is expected to be overriden by allow(), deny(), decline() or
+         * done() methods.
+         */
 
-    if (njs_vm_call(ctx->vm, func, njs_value_arg(&ctx->arg), 1) != NJS_OK) {
-        njs_vm_retval_to_ext_string(ctx->vm, &exception);
+        ctx->status = NGX_ERROR;
 
-        ngx_log_error(NGX_LOG_ERR, c->log, 0, "js exception: %*s",
-                      exception.length, exception.start);
-
-        return NGX_ERROR;
+        ret = njs_vm_call(ctx->vm, func, njs_value_arg(&ctx->args), 1);
+        if (ret != NJS_OK) {
+            goto exception;
+        }
     }
 
-    if (njs_value_is_void(njs_vm_retval(ctx->vm))) {
-        return NGX_OK;
-    }
+    if (ctx->upload_event != NULL) {
+        ret = ngx_stream_js_buffer_arg(s, njs_value_arg(&ctx->args[1]));
+        if (ret != NJS_OK) {
+            goto exception;
+        }
 
-    if (njs_vm_retval_to_ext_string(ctx->vm, &value) != NJS_OK) {
-        return NGX_ERROR;
+        ret = ngx_stream_js_flags_arg(s, njs_value_arg(&ctx->args[2]));
+        if (ret != NJS_OK) {
+            goto exception;
+        }
+
+        njs_vm_post_event(ctx->vm, ctx->upload_event,
+                          njs_value_arg(&ctx->args[1]), 2);
+
+        rc = njs_vm_run(ctx->vm);
+        if (rc == NJS_ERROR) {
+            goto exception;
+        }
     }
 
-    ngx_log_debug2(NGX_LOG_DEBUG_STREAM, c->log, 0, "js return value: \"%*s\"",
-                   value.length, value.start);
-
-    rc = ngx_atoi(value.start, value.length);
+    if (njs_vm_pending(ctx->vm)) {
+        ctx->in_progress = 1;
+        rc = ctx->upload_event ? NGX_AGAIN : NGX_DONE;
 
-    if (rc == NGX_ERROR) {
-        ngx_log_error(NGX_LOG_ERR, c->log, 0,
-                      "unexpected js return code: \"%*s\"",
-                      value.length, value.start);
-        return NGX_ERROR;
+    } else {
+        ctx->in_progress = 0;
+        rc = ctx->status;
     }
 
-    return -rc;
+    ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "stream js phase rc: %i",
+                   rc);
+
+    return rc;
+
+exception:
+
+    njs_vm_retval_to_ext_string(ctx->vm, &exception);
+
+    ngx_log_error(NGX_LOG_ERR, c->log, 0, "js exception: %*s",
+                  exception.length, exception.start);
+
+    return NGX_ERROR;
 }
 
 
+#define ngx_stream_event(from_upstream)                                 \
+    (from_upstream ? ctx->download_event : ctx->upload_event)
+
+
 static ngx_int_t
 ngx_stream_js_body_filter(ngx_stream_session_t *s, ngx_chain_t *in,
     ngx_uint_t from_upstream)
 {
-    nxt_str_t                  name, value, exception;
+    nxt_str_t                  name, exception;
+    njs_ret_t                  ret;
     ngx_int_t                  rc;
-    ngx_chain_t               *out, *cl, **ll;
+    ngx_chain_t               *out, *cl;
     njs_function_t            *func;
     ngx_connection_t          *c;
     ngx_stream_js_ctx_t       *ctx;
@@ -472,83 +525,88 @@ ngx_stream_js_body_filter(ngx_stream_ses
 
     ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
 
-    ctx->filter = 1;
+    if (!ctx->filter) {
+        name.start = jscf->filter.data;
+        name.length = jscf->filter.len;
 
-    name.start = jscf->filter.data;
-    name.length = jscf->filter.len;
+        func = njs_vm_function(ctx->vm, &name);
 
-    func = njs_vm_function(ctx->vm, &name);
+        if (func == NULL) {
+            ngx_log_error(NGX_LOG_ERR, c->log, 0,
+                          "js function \"%V\" not found", &jscf->filter);
+            return NGX_ERROR;
+        }
 
-    if (func == NULL) {
-        ngx_log_error(NGX_LOG_ERR, c->log, 0, "js function \"%V\" not found",
-                      &jscf->filter);
-        return NGX_ERROR;
+        ret = njs_vm_call(ctx->vm, func, njs_value_arg(&ctx->args), 1);
+        if (ret != NJS_OK) {
+            goto exception;
+        }
     }
 
+    ctx->filter = 1;
     ctx->from_upstream = from_upstream;
 
-    ll = &out;
+    ctx->last_out = &out;
 
     while (in) {
         ctx->buf = in->buf;
 
-        if (njs_vm_call(ctx->vm, func, njs_value_arg(&ctx->arg), 1) != NJS_OK) {
-            njs_vm_retval_to_ext_string(ctx->vm, &exception);
+        if (ngx_stream_event(from_upstream) != NULL) {
+            ret = ngx_stream_js_buffer_arg(s, njs_value_arg(&ctx->args[1]));
+            if (ret != NJS_OK) {
+                goto exception;
+            }
 
-            ngx_log_error(NGX_LOG_ERR, c->log, 0, "js exception: %*s",
-                          exception.length, exception.start);
+            ret = ngx_stream_js_flags_arg(s, njs_value_arg(&ctx->args[2]));
+            if (ret != NJS_OK) {
+                goto exception;
+            }
 
-            return NGX_ERROR;
-        }
+            njs_vm_post_event(ctx->vm, ngx_stream_event(from_upstream),
+                              njs_value_arg(&ctx->args[1]), 2);
 
-        if (!njs_value_is_void(njs_vm_retval(ctx->vm))) {
-            if (njs_vm_retval_to_ext_string(ctx->vm, &value) != NJS_OK) {
+            rc = njs_vm_run(ctx->vm);
+            if (rc == NJS_ERROR) {
+                goto exception;
+            }
+
+        } else {
+            cl = ngx_alloc_chain_link(c->pool);
+            if (cl == NULL) {
                 return NGX_ERROR;
             }
 
-            ngx_log_debug2(NGX_LOG_DEBUG_STREAM, c->log, 0,
-                           "js return value: \"%*s\"",
-                           value.length, value.start);
-
-            if (value.length) {
-                rc = ngx_atoi(value.start, value.length);
-
-                if (rc != NGX_OK && rc != -NGX_ERROR) {
-                    ngx_log_error(NGX_LOG_ERR, c->log, 0,
-                                  "unexpected js return code: \"%*s\"",
-                                  value.length, value.start);
-                    return NGX_ERROR;
-                }
-
-                rc = -rc;
+            cl->buf = ctx->buf;
 
-                if (rc == NGX_ERROR) {
-                    return NGX_ERROR;
-                }
-            }
+            *ctx->last_out = cl;
+            ctx->last_out = &cl->next;
         }
 
-        cl = ngx_alloc_chain_link(c->pool);
-        if (cl == NULL) {
-            return NGX_ERROR;
-        }
-
-        cl->buf = ctx->buf;
-
-        *ll = cl;
-        ll = &cl->next;
-
         in = in->next;
     }
 
-    *ll = NULL;
+    *ctx->last_out = NULL;
+
+    if (out != NULL) {
+        rc = ngx_stream_next_filter(s, out, from_upstream);
 
-    rc = ngx_stream_next_filter(s, out, from_upstream);
+        ngx_chain_update_chains(c->pool, &ctx->free, &ctx->busy, &out,
+                                (ngx_buf_tag_t) &ngx_stream_js_module);
 
-    ngx_chain_update_chains(c->pool, &ctx->free, &ctx->busy, &out,
-                            (ngx_buf_tag_t) &ngx_stream_js_module);
+    } else {
+        rc = NGX_OK;
+    }
 
     return rc;
+
+exception:
+
+    njs_vm_retval_to_ext_string(ctx->vm, &exception);
+
+    ngx_log_error(NGX_LOG_ERR, c->log, 0, "js exception: %*s",
+                  exception.length, exception.start);
+
+    return NGX_ERROR;
 }
 
 
@@ -593,7 +651,7 @@ ngx_stream_js_variable(ngx_stream_sessio
 
     pending = njs_vm_pending(ctx->vm);
 
-    if (njs_vm_call(ctx->vm, func, njs_value_arg(&ctx->arg), 1) != NJS_OK) {
+    if (njs_vm_call(ctx->vm, func, njs_value_arg(&ctx->args), 1) != NJS_OK) {
         njs_vm_retval_to_ext_string(ctx->vm, &exception);
 
         ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
@@ -676,8 +734,8 @@ ngx_stream_js_init_vm(ngx_stream_session
         return NGX_ERROR;
     }
 
-    rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->arg), jmcf->proto,
-                                s);
+    rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]),
+                                jmcf->proto, s);
     if (rc != NXT_OK) {
         return NGX_ERROR;
     }
@@ -709,6 +767,103 @@ ngx_stream_js_cleanup_vm(void *data)
 
 
 static njs_ret_t
+ngx_stream_js_buffer_arg(ngx_stream_session_t *s, njs_value_t *buffer)
+{
+    size_t                 len;
+    u_char                *p;
+    ngx_buf_t             *b;
+    ngx_connection_t      *c;
+    ngx_stream_js_ctx_t   *ctx;
+
+    c = s->connection;
+    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+    b = ctx->filter ? ctx->buf : c->buffer;
+
+    len = b ? b->last - b->pos : 0;
+
+    p = njs_string_alloc(ctx->vm, buffer, len, 0);
+    if (p == NULL) {
+        return NJS_ERROR;
+    }
+
+    if (len) {
+        ngx_memcpy(p, b->pos, len);
+    }
+
+    return NJS_OK;
+}
+
+
+
+static njs_ret_t
+ngx_stream_js_flags_arg(ngx_stream_session_t *s, njs_value_t *flags)
+{
+    ngx_buf_t             *b;
+    ngx_connection_t      *c;
+    njs_opaque_value_t    last_key;
+    njs_opaque_value_t    values[1];
+    ngx_stream_js_ctx_t  *ctx;
+
+    static const nxt_str_t last_str = nxt_string("last");
+
+    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+    njs_string_create(ctx->vm, njs_value_arg(&last_key), last_str.start,
+                      last_str.length, 0);
+
+    c = s->connection;
+
+    b = ctx->filter ? ctx->buf : c->buffer;
+    njs_value_boolean_set(njs_value_arg(&values[0]), b && b->last_buf);
+
+    return njs_vm_object_alloc(ctx->vm, flags,
+                               njs_value_arg(&last_key),
+                               njs_value_arg(&values[0]), NULL);
+}
+
+
+static njs_vm_event_t *
+ngx_stream_js_event(ngx_stream_session_t *s, nxt_str_t *event)
+{
+    ngx_uint_t             i, n;
+    ngx_stream_js_ctx_t  *ctx;
+
+    static const nxt_str_t events[] = {
+        nxt_string("upload"),
+        nxt_string("download")
+    };
+
+    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+    i = 0;
+    n = sizeof(events) / sizeof(events[0]);
+
+    while (i < n) {
+        if (event->length == events[i].length
+            && ngx_memcmp(event->start, events[i].start, event->length) == 0)
+        {
+            break;
+        }
+
+        i++;
+    }
+
+    if (i == n) {
+        njs_vm_error(ctx->vm, "unknown event \"%.*s\"", (int) event->length,
+                     event->start);
+        return NULL;
+    }
+
+    if (i == 0) {
+        return &ctx->upload_event;
+    }
+
+    return &ctx->download_event;
+}
+
+
+static njs_ret_t
 ngx_stream_js_ext_get_remote_address(njs_vm_t *vm, njs_value_t *value,
     void *obj, uintptr_t data)
 {
@@ -723,131 +878,74 @@ ngx_stream_js_ext_get_remote_address(njs
 
 
 static njs_ret_t
-ngx_stream_js_ext_get_eof(njs_vm_t *vm, njs_value_t *value, void *obj,
-    uintptr_t data)
+ngx_stream_js_ext_done(njs_vm_t *vm, njs_value_t *args,
+     nxt_uint_t nargs, njs_index_t unused)
 {
-    ngx_buf_t             *b;
-    ngx_connection_t      *c;
-    ngx_stream_js_ctx_t   *ctx;
-    ngx_stream_session_t  *s;
-
-    s = (ngx_stream_session_t *) obj;
-    c = s->connection;
-    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
-
-    b = ctx->filter ? ctx->buf : c->buffer;
-
-    njs_value_boolean_set(value, b && b->last_buf);
-
-    return NJS_OK;
+    return ngx_stream_js_ext_set_status(vm, args, nargs, NGX_OK);
 }
 
 
 static njs_ret_t
-ngx_stream_js_ext_get_from_upstream(njs_vm_t *vm, njs_value_t *value, void *obj,
-    uintptr_t data)
+ngx_stream_js_ext_deny(njs_vm_t *vm, njs_value_t *args,
+     nxt_uint_t nargs, njs_index_t unused)
 {
-    ngx_stream_js_ctx_t   *ctx;
-    ngx_stream_session_t  *s;
+    return ngx_stream_js_ext_set_status(vm, args, nxt_min(nargs, 1),
+                                        NGX_STREAM_FORBIDDEN);
+}
 
-    s = (ngx_stream_session_t *) obj;
-    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
 
-    njs_value_boolean_set(value, ctx->from_upstream);
-
-    return NJS_OK;
+static njs_ret_t
+ngx_stream_js_ext_decline(njs_vm_t *vm, njs_value_t *args,
+     nxt_uint_t nargs, njs_index_t unused)
+{
+    return ngx_stream_js_ext_set_status(vm, args, nxt_min(nargs, 1),
+                                        NGX_DECLINED);
 }
 
 
 static njs_ret_t
-ngx_stream_js_ext_get_buffer(njs_vm_t *vm, njs_value_t *value, void *obj,
-    uintptr_t data)
+ngx_stream_js_ext_set_status(njs_vm_t *vm, njs_value_t *args, nxt_uint_t nargs,
+    ngx_int_t status)
 {
-    size_t                 len;
-    u_char                *p;
-    ngx_buf_t             *b;
-    ngx_connection_t      *c;
-    ngx_stream_js_ctx_t   *ctx;
-    ngx_stream_session_t  *s;
-
-    s = (ngx_stream_session_t *) obj;
-    c = s->connection;
-    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
-
-    b = ctx->filter ? ctx->buf : c->buffer;
-
-    len = b ? b->last - b->pos : 0;
-
-    p = njs_string_alloc(vm, value, len, 0);
-    if (p == NULL) {
-        return NJS_ERROR;
-    }
-
-    if (len) {
-        ngx_memcpy(p, b->pos, len);
-    }
-
-    return NJS_OK;
-}
-
-
-static njs_ret_t
-ngx_stream_js_ext_set_buffer(njs_vm_t *vm, void *obj, uintptr_t data,
-    nxt_str_t *value)
-{
-    ngx_buf_t             *b;
-    ngx_chain_t           *cl;
-    ngx_connection_t      *c;
+    const njs_value_t     *code;
     ngx_stream_js_ctx_t   *ctx;
     ngx_stream_session_t  *s;
 
-    s = (ngx_stream_session_t *) obj;
-    c = s->connection;
-    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
-
-    ngx_log_debug2(NGX_LOG_DEBUG_STREAM, c->log, 0,
-                   "stream js set buffer \"%*s\"", value->length, value->start);
-
-    if (!ctx->filter) {
-        ngx_log_error(NGX_LOG_WARN, c->log, 0,
-                      "cannot set buffer in this handler");
-        return NJS_OK;
-    }
-
-    cl = ngx_chain_get_free_buf(c->pool, &ctx->free);
-    if (cl == NULL) {
+    s = njs_vm_external(vm, njs_arg(args, nargs, 0));
+    if (nxt_slow_path(s == NULL)) {
         return NJS_ERROR;
     }
 
-    b = cl->buf;
-
-    ngx_free_chain(c->pool, cl);
-
-    b->last_buf = ctx->buf->last_buf;
-    b->memory = (value->length ? 1 : 0);
-    b->sync = (value->length ? 0 : 1);
-    b->tag = (ngx_buf_tag_t) &ngx_stream_js_module;
-
-    b->start = value->start;
-    b->end = value->start + value->length;
-    b->pos = b->start;
-    b->last = b->end;
-
-    if (ctx->buf->tag != (ngx_buf_tag_t) &ngx_stream_js_module) {
-        ctx->buf->pos = ctx->buf->last;
-
-    } else {
-        cl = ngx_alloc_chain_link(c->pool);
-        if (cl == NULL) {
+    if (nargs > 1) {
+        code = njs_arg(args, nargs, 1);
+        if (!njs_value_is_valid_number(code)) {
+            njs_vm_error(vm, "code is not a number");
             return NJS_ERROR;
         }
 
-        cl->buf = ctx->buf;
-        cl->next = ctx->free;
-        ctx->free = cl;
+        status = njs_value_number(code);
+        if (status < NGX_ABORT || status > NGX_STREAM_SERVICE_UNAVAILABLE) {
+            njs_vm_error(vm, "code is out of range");
+            return NJS_ERROR;
+        }
     }
 
-    ctx->buf = b;
+    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+    ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+                   "stream js set status: %i", status);
+
+    ctx->status = status;
+
+    if (ctx->upload_event != NULL) {
+        njs_vm_del_event(ctx->vm, ctx->upload_event);
+        ctx->upload_event = NULL;
+    }
+
+    if (ctx->download_event != NULL) {
+        njs_vm_del_event(ctx->vm, ctx->download_event);
+        ctx->download_event = NULL;
+    }
 
     return NJS_OK;
 }
@@ -911,6 +1009,166 @@ ngx_stream_js_ext_log_core(njs_vm_t *vm,
 
 
 static njs_ret_t
+ngx_stream_js_ext_on(njs_vm_t *vm, njs_value_t *args, nxt_uint_t nargs,
+    njs_index_t unused)
+{
+    nxt_str_t              name;
+    njs_vm_event_t        *event;
+    const njs_value_t     *callback;
+    ngx_stream_session_t  *s;
+
+    s = njs_vm_external(vm, njs_arg(args, nargs, 0));
+    if (nxt_slow_path(s == NULL)) {
+        return NJS_ERROR;
+    }
+
+    if (njs_vm_value_to_ext_string(vm, &name, njs_arg(args, nargs, 1), 0)
+        == NJS_ERROR)
+    {
+        njs_vm_error(vm, "failed to convert event arg");
+        return NJS_ERROR;
+    }
+
+    callback = njs_arg(args, nargs, 2);
+    if (!njs_value_is_function(callback)) {
+        njs_vm_error(vm, "callback is not a function");
+        return NJS_ERROR;
+    }
+
+    event = ngx_stream_js_event(s, &name);
+    if (event == NULL) {
+        return NJS_ERROR;
+    }
+
+    if (*event != NULL) {
+        njs_vm_error(vm, "event handler \"%.*s\" is already set",
+                     (int) name.length, name.start);
+        return NJS_ERROR;
+    }
+
+    *event = njs_vm_add_event(vm, njs_value_function(callback), 0, NULL, NULL);
+    if (*event == NULL) {
+        njs_vm_error(vm, "internal error");
+        return NJS_ERROR;
+    }
+
+    return NJS_OK;
+}
+
+
+static njs_ret_t
+ngx_stream_js_ext_off(njs_vm_t *vm, njs_value_t *args, nxt_uint_t nargs,
+    njs_index_t unused)
+{
+    nxt_str_t              name;
+    njs_vm_event_t        *event;
+    ngx_stream_session_t  *s;
+
+    s = njs_vm_external(vm, njs_arg(args, nargs, 0));
+    if (nxt_slow_path(s == NULL)) {
+        return NJS_ERROR;
+    }
+
+    if (njs_vm_value_to_ext_string(vm, &name, njs_arg(args, nargs, 1), 0)
+        == NJS_ERROR)
+    {
+        njs_vm_error(vm, "failed to convert event arg");
+        return NJS_ERROR;
+    }
+
+    event = ngx_stream_js_event(s, &name);
+    if (event == NULL) {
+        return NJS_ERROR;
+    }
+
+    *event = NULL;
+
+    return NJS_OK;
+}
+
+
+static njs_ret_t
+ngx_stream_js_ext_send(njs_vm_t *vm, njs_value_t *args, nxt_uint_t nargs,
+    njs_index_t unused)
+{
+    unsigned               last_buf, flush;
+    nxt_str_t              buffer;


More information about the nginx-devel mailing list