Subrequests and output filters

Maxime Henrion mhenrion at appnexus.com
Wed Aug 5 11:21:52 UTC 2015


Hello all,


I am currently developing an nginx module in order to implement a software component in our platform.

This module's responsibility is to receive upstream requests, forward them to multiple hosts (one host per pool, with N pools; we call a pool a shard), receive all the responses and aggregate them using custom application logic, and finally return an answer.

At a first glance, it seems like nginx offers everything I could possibly need in order to implement that software component, but now I'm running into issues as it seems my output filter is never being invoked as I expect it to. Surely I'm doing something wrong...

My module (called dispatcher) registers a post-configuration handler to setup the output body filter like so :

static ngx_http_output_body_filter_pt ngx_http_next_body_filter;

static ngx_int_t
ngx_http_dispatcher_filter_init(ngx_conf_t *cf)
{

        ngx_http_next_body_filter = ngx_http_top_body_filter;
        ngx_http_top_body_filter = ngx_http_dispatcher_body_filter;

        return NGX_OK;
}

The only other callbacks in the module context are those to create and merge a location configuration. They are invoked when the "dispatcher_pass" command is encountered. Here is the code for this callback :

static char *
ngx_http_dispatcher_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
        ngx_http_core_loc_conf_t  *clcf;
        ngx_http_dispatcher_loc_conf_t *dlcf;
        ngx_str_t *value, *url;

        clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
        clcf->handler = ngx_http_dispatcher_handler;

        dlcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_dispatcher_module);

        value = cf->args->elts;
        url = &value[1];
        dlcf->url.data = url->data;
        dlcf->url.len = url->len;

        if (dlcf->url.len == 0) {
                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                    "\"dispatcher_pass\" needs a non-empty bounce URL");
                return NGX_CONF_ERROR;
        }
        return NGX_CONF_OK;
}

So this just setups a handler for later use, and ties some configuration information to our request.

Since my handler needs the request body in order to forward those requests, it registers a body read callback :

static ngx_int_t
ngx_http_dispatcher_handler(ngx_http_request_t *r)
{
        ngx_int_t rc;

        rc = ngx_http_read_client_request_body(r,
            ngx_http_dispatcher_post_read_request_body);
        if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
                return rc;
        }

        return NGX_DONE;
}

NOTE: I'm not sure whether NGX_DONE above is the correct enum value to use, but I have tried NGX_AGAIN and others as well to no avail.

Then the request body handler calls into ngx_http_subrequest() to send those N requests to one of the hosts in each of the pools. The so-called "pools" are simple upstream blocks in the configuration file and I use the "proxy_pass" directive os that nginx handles peer selection.

Since I cannot use ngx_http_subrequest() to send an external HTTP request, I use a dummy bounce handler defined in the configuration file like so :

                location /bounce {
                        proxy_pass http://$arg_shard/bid;
                }

So the request body handler just uses a simple HTTP parameter "shard" to direct each of those ngx_http_subrequest() to a different server pool. Finally, here is the request body handler code :

static void
ngx_http_dispatcher_post_read_request_body(ngx_http_request_t *r)
{
        ngx_http_dispatcher_loc_conf_t *dlcf;
        ngx_http_request_t *sr;
        ngx_buf_t *buf;
        ngx_uint_t i;
        ngx_str_t *shard;
        ngx_str_t args;
        ngx_int_t rc;
        ngx_http_dispatcher_ctx_t *ctx;
        ngx_http_post_subrequest_t *psr;

        if (r->request_body == NULL ||
            r->request_body->bufs == NULL) {
                ngx_http_finalize_request(r, NGX_HTTP_BAD_REQUEST);
                return;
        }

        buf = r->request_body->bufs->buf;
        if (ngx_buf_size(buf) == 0) {
                ngx_http_finalize_request(r, NGX_HTTP_BAD_REQUEST);
                return;
        }

        ngx_log_error(NGX_LOG_EMERG, r->connection->log, 0,
            "Request body is: %*s", ngx_buf_size(buf), buf->pos);
  
        ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_dispatcher_ctx_t));
        if (ctx == NULL) {
                ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
        }

// XXX Application logic code here

        ngx_http_set_ctx(r, ctx, ngx_http_dispatcher_module);

        dlcf = ngx_http_get_module_loc_conf(r, ngx_http_dispatcher_module);

        shard = dlcf->shards->elts;
        for (i = 0; i < dlcf->shards->nelts; i++) {
                ngx_log_error(NGX_LOG_EMERG, r->connection->log, 0,
                    "Sending subrequest to shard %V", &shard[i]);

                args.len = sizeof("shard=") + shard[i].len - 1;
                args.data = ngx_pnalloc(r->pool, args.len + 1);
                if (args.data == NULL) {
                        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                        return;
                }
                ngx_sprintf(args.data, "shard=%V", &shard[i]);

                psr = ngx_pcalloc(r->pool, sizeof(ngx_http_post_subrequest_t));
                if (psr == NULL) {
                        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                        return;
                }
                psr->handler = ngx_http_dispatcher_post_subrequest;
                psr->data = ctx;

                ctx->active_requests++;
                rc = ngx_http_subrequest(r, &dlcf->url, &args, &sr, NULL,
                    NGX_HTTP_SUBREQUEST_IN_MEMORY | NGX_HTTP_SUBREQUEST_WAITED);
                sr->header_only = 0;
                if (rc != NGX_OK) {
                        ngx_log_error(NGX_LOG_EMERG, r->connection->log, 0,
                            "ngx_http_subrequest failed with %i", rc);
                        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                        return;
                }
        }

        r->headers_out.status = 200;
        ngx_http_clear_content_length(r);
        ngx_http_clear_last_modified(r);
        ngx_http_clear_accept_ranges(r);
        r->headers_out.content_length_n = 3;
        rc = ngx_http_send_header(r);
        r->header_only = 0;
        ngx_log_error(NGX_LOG_EMERG, r->connection->log, 0,
            "ngx_http_sender_header(r) = %i", rc);

        ngx_chain_t out;

        buf = ngx_calloc_buf(r->pool);
        if (buf == NULL) {
                ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
        }
        buf->start = buf->pos = (u_char *)"foo";
        buf->end = buf->last = buf->start + 3;
        buf->last_buf = 1;
        buf->memory = 1;
        out.buf = buf;
        out.next = NULL;
        ngx_http_finalize_request(r, ngx_http_output_filter(r, &out));
}

As of right now, I have simply never been able to get ateven a response body for one of those subrequests. I have verified that the subrequests are indeed being sent, and that nginx reads the response to completion (using netcat and sending an invalid response prompts a log message about an invalid HTTP response), but my output body filter is simply never being invoked, and as far as I know this is the only way I have to get at the response body here. I have not been able to use the post_subrequest callback either, and am unsure if it'd be useful in my case.

My apologies for the length of this post; I have tried to be as thorough as possible while making sure not to send any confidential code... Please don't hesitate to ask for further information or details if I'm not being clear enough.

Any help would be greatly appreciated!

Cheers,
Maxime


More information about the nginx-devel mailing list