[PATCH 15 of 15] Upstream keepalive module

Maxim Dounin mdounin at mdounin.ru
Sun Sep 4 11:34:02 UTC 2011


# HG changeset patch
# User Maxim Dounin <mdounin at mdounin.ru>
# Date 1315134383 -14400
# Node ID 4e00f01d22602ad43f7768dc10a9f431673acc09
# Parent  ec4be59d7b9c579474dd79fd5a3270e8fb9eb70b
Upstream keepalive module.

diff --git a/auto/modules b/auto/modules
--- a/auto/modules
+++ b/auto/modules
@@ -339,6 +339,11 @@ if [ $HTTP_UPSTREAM_IP_HASH = YES ]; the
     HTTP_SRCS="$HTTP_SRCS $HTTP_UPSTREAM_IP_HASH_SRCS"
 fi
 
+if [ $HTTP_UPSTREAM_KEEPALIVE = YES ]; then
+    HTTP_MODULES="$HTTP_MODULES $HTTP_UPSTREAM_KEEPALIVE_MODULE"
+    HTTP_SRCS="$HTTP_SRCS $HTTP_UPSTREAM_KEEPALIVE_SRCS"
+fi
+
 if [ $HTTP_STUB_STATUS = YES ]; then
     have=NGX_STAT_STUB . auto/have
     HTTP_MODULES="$HTTP_MODULES ngx_http_stub_status_module"
diff --git a/auto/options b/auto/options
--- a/auto/options
+++ b/auto/options
@@ -94,6 +94,7 @@ HTTP_DEGRADATION=NO
 HTTP_FLV=NO
 HTTP_GZIP_STATIC=NO
 HTTP_UPSTREAM_IP_HASH=YES
+HTTP_UPSTREAM_KEEPALIVE=YES
 
 # STUB
 HTTP_STUB_STATUS=NO
@@ -229,6 +230,7 @@ do
         --without-http_empty_gif_module) HTTP_EMPTY_GIF=NO          ;;
         --without-http_browser_module)   HTTP_BROWSER=NO            ;;
         --without-http_upstream_ip_hash_module) HTTP_UPSTREAM_IP_HASH=NO ;;
+        --without-http_upstream_keepalive_module) HTTP_UPSTREAM_KEEPALIVE=NO ;;
 
         --with-http_perl_module)         HTTP_PERL=YES              ;;
         --with-perl_modules_path=*)      NGX_PERL_MODULES="$value"  ;;
diff --git a/auto/sources b/auto/sources
--- a/auto/sources
+++ b/auto/sources
@@ -471,6 +471,11 @@ HTTP_UPSTREAM_IP_HASH_MODULE=ngx_http_up
 HTTP_UPSTREAM_IP_HASH_SRCS=src/http/modules/ngx_http_upstream_ip_hash_module.c
 
 
+HTTP_UPSTREAM_KEEPALIVE_MODULE=ngx_http_upstream_keepalive_module
+HTTP_UPSTREAM_KEEPALIVE_SRCS=" \
+    src/http/modules/ngx_http_upstream_keepalive_module.c"
+
+
 MAIL_INCS="src/mail"
 
 MAIL_DEPS="src/mail/ngx_mail.h"
diff --git a/src/http/modules/ngx_http_upstream_keepalive_module.c b/src/http/modules/ngx_http_upstream_keepalive_module.c
new file mode 100644
--- /dev/null
+++ b/src/http/modules/ngx_http_upstream_keepalive_module.c
@@ -0,0 +1,566 @@
+
+/*
+ * Copyright (C) Maxim Dounin
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_http.h>
+
+
+typedef struct {
+    ngx_uint_t                         max_cached;
+    ngx_uint_t                         single;       /* unsigned:1 */
+
+    ngx_queue_t                        cache;
+    ngx_queue_t                        free;
+
+    ngx_http_upstream_init_pt          original_init_upstream;
+    ngx_http_upstream_init_peer_pt     original_init_peer;
+
+} ngx_http_upstream_keepalive_srv_conf_t;
+
+
+typedef struct {
+    ngx_http_upstream_keepalive_srv_conf_t  *conf;
+
+    ngx_http_upstream_t               *upstream;
+
+    void                              *data;
+
+    ngx_event_get_peer_pt              original_get_peer;
+    ngx_event_free_peer_pt             original_free_peer;
+
+#if (NGX_HTTP_SSL)
+    ngx_event_set_peer_session_pt      original_set_session;
+    ngx_event_save_peer_session_pt     original_save_session;
+#endif
+
+    ngx_uint_t                         failed;       /* unsigned:1 */
+
+} ngx_http_upstream_keepalive_peer_data_t;
+
+
+typedef struct {
+    ngx_http_upstream_keepalive_srv_conf_t  *conf;
+
+    ngx_queue_t                        queue;
+    ngx_connection_t                  *connection;
+
+    socklen_t                          socklen;
+    u_char                             sockaddr[NGX_SOCKADDRLEN];
+
+} ngx_http_upstream_keepalive_cache_t;
+
+
+static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
+    ngx_http_upstream_srv_conf_t *us);
+static ngx_int_t ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc,
+    void *data);
+static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc,
+    void *data, ngx_uint_t state);
+
+static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev);
+static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev);
+static void ngx_http_upstream_keepalive_close(ngx_connection_t *c);
+
+
+#if (NGX_HTTP_SSL)
+static ngx_int_t ngx_http_upstream_keepalive_set_session(
+    ngx_peer_connection_t *pc, void *data);
+static void ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc,
+    void *data);
+#endif
+
+static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf);
+static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd,
+    void *conf);
+
+
+static ngx_command_t  ngx_http_upstream_keepalive_commands[] = {
+
+    { ngx_string("keepalive"),
+      NGX_HTTP_UPS_CONF|NGX_CONF_TAKE12,
+      ngx_http_upstream_keepalive,
+      0,
+      0,
+      NULL },
+
+      ngx_null_command
+};
+
+
+static ngx_http_module_t  ngx_http_upstream_keepalive_module_ctx = {
+    NULL,                                  /* preconfiguration */
+    NULL,                                  /* postconfiguration */
+
+    NULL,                                  /* create main configuration */
+    NULL,                                  /* init main configuration */
+
+    ngx_http_upstream_keepalive_create_conf, /* create server configuration */
+    NULL,                                  /* merge server configuration */
+
+    NULL,                                  /* create location configuration */
+    NULL                                   /* merge location configuration */
+};
+
+
+ngx_module_t  ngx_http_upstream_keepalive_module = {
+    NGX_MODULE_V1,
+    &ngx_http_upstream_keepalive_module_ctx, /* module context */
+    ngx_http_upstream_keepalive_commands,    /* module directives */
+    NGX_HTTP_MODULE,                       /* module type */
+    NULL,                                  /* init master */
+    NULL,                                  /* init module */
+    NULL,                                  /* init process */
+    NULL,                                  /* init thread */
+    NULL,                                  /* exit thread */
+    NULL,                                  /* exit process */
+    NULL,                                  /* exit master */
+    NGX_MODULE_V1_PADDING
+};
+
+
+static ngx_int_t
+ngx_http_upstream_init_keepalive(ngx_conf_t *cf,
+    ngx_http_upstream_srv_conf_t *us)
+{
+    ngx_uint_t                               i;
+    ngx_http_upstream_keepalive_srv_conf_t  *kcf;
+    ngx_http_upstream_keepalive_cache_t     *cached;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0,
+                   "init keepalive");
+
+    kcf = ngx_http_conf_upstream_srv_conf(us,
+                                          ngx_http_upstream_keepalive_module);
+
+    if (kcf->original_init_upstream(cf, us) != NGX_OK) {
+        return NGX_ERROR;
+    }
+
+    kcf->original_init_peer = us->peer.init;
+
+    us->peer.init = ngx_http_upstream_init_keepalive_peer;
+
+    /* allocate cache items and add to free queue */
+
+    cached = ngx_pcalloc(cf->pool,
+                sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached);
+    if (cached == NULL) {
+        return NGX_ERROR;
+    }
+
+    ngx_queue_init(&kcf->cache);
+    ngx_queue_init(&kcf->free);
+
+    for (i = 0; i < kcf->max_cached; i++) {
+        ngx_queue_insert_head(&kcf->free, &cached[i].queue);
+        cached[i].conf = kcf;
+    }
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
+    ngx_http_upstream_srv_conf_t *us)
+{
+    ngx_http_upstream_keepalive_peer_data_t  *kp;
+    ngx_http_upstream_keepalive_srv_conf_t   *kcf;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                   "init keepalive peer");
+
+    kcf = ngx_http_conf_upstream_srv_conf(us,
+                                          ngx_http_upstream_keepalive_module);
+
+    kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t));
+    if (kp == NULL) {
+        return NGX_ERROR;
+    }
+
+    if (kcf->original_init_peer(r, us) != NGX_OK) {
+        return NGX_ERROR;
+    }
+
+    kp->conf = kcf;
+    kp->upstream = r->upstream;
+    kp->data = r->upstream->peer.data;
+    kp->original_get_peer = r->upstream->peer.get;
+    kp->original_free_peer = r->upstream->peer.free;
+
+    r->upstream->peer.data = kp;
+    r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer;
+    r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer;
+
+#if (NGX_HTTP_SSL)
+    kp->original_set_session = r->upstream->peer.set_session;
+    kp->original_save_session = r->upstream->peer.save_session;
+    r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session;
+    r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session;
+#endif
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
+{
+    ngx_http_upstream_keepalive_peer_data_t  *kp = data;
+    ngx_http_upstream_keepalive_cache_t      *item;
+
+    ngx_int_t          rc;
+    ngx_queue_t       *q, *cache;
+    ngx_connection_t  *c;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
+                   "get keepalive peer");
+
+    kp->failed = 0;
+
+    /* single pool of cached connections */
+
+    if (kp->conf->single && !ngx_queue_empty(&kp->conf->cache)) {
+
+        q = ngx_queue_head(&kp->conf->cache);
+
+        item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
+        c = item->connection;
+
+        ngx_queue_remove(q);
+        ngx_queue_insert_head(&kp->conf->free, q);
+
+        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
+                       "get keepalive peer: using connection %p", c);
+
+        c->idle = 0;
+        c->log = pc->log;
+        c->read->log = pc->log;
+        c->write->log = pc->log;
+        c->pool->log = pc->log;
+
+        pc->connection = c;
+        pc->cached = 1;
+
+        return NGX_DONE;
+    }
+
+    rc = kp->original_get_peer(pc, kp->data);
+
+    if (kp->conf->single || rc != NGX_OK) {
+        return rc;
+    }
+
+    /* search cache for suitable connection */
+
+    cache = &kp->conf->cache;
+
+    for (q = ngx_queue_head(cache);
+         q != ngx_queue_sentinel(cache);
+         q = ngx_queue_next(q))
+    {
+        item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
+        c = item->connection;
+
+        if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr,
+                         item->socklen, pc->socklen)
+            == 0)
+        {
+            ngx_queue_remove(q);
+            ngx_queue_insert_head(&kp->conf->free, q);
+
+            ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
+                           "get keepalive peer: using connection %p", c);
+
+            c->idle = 0;
+            c->log = pc->log;
+            c->read->log = pc->log;
+            c->write->log = pc->log;
+            c->pool->log = pc->log;
+
+            pc->connection = c;
+            pc->cached = 1;
+
+            return NGX_DONE;
+        }
+    }
+
+    return NGX_OK;
+}
+
+
+static void
+ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,
+    ngx_uint_t state)
+{
+    ngx_http_upstream_keepalive_peer_data_t  *kp = data;
+    ngx_http_upstream_keepalive_cache_t      *item;
+
+    ngx_queue_t          *q;
+    ngx_connection_t     *c;
+    ngx_http_upstream_t  *u;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
+                   "free keepalive peer");
+
+    /* remember failed state - peer.free() may be called more than once */
+
+    if (state & NGX_PEER_FAILED) {
+        kp->failed = 1;
+    }
+
+    /* cache valid connections */
+
+    u = kp->upstream;
+    c = pc->connection;
+
+    if (kp->failed
+        || c == NULL
+        || c->read->eof
+        || c->read->ready
+        || c->read->error
+        || c->read->timedout
+        || c->write->error
+        || c->write->timedout)
+    {
+        goto invalid;
+    }
+
+    if (!u->keepalive) {
+        goto invalid;
+    }
+
+    if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+        goto invalid;
+    }
+
+    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
+                   "free keepalive peer: saving connection %p", c);
+
+    if (ngx_queue_empty(&kp->conf->free)) {
+
+        q = ngx_queue_last(&kp->conf->cache);
+        ngx_queue_remove(q);
+
+        item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
+
+        ngx_http_upstream_keepalive_close(item->connection);
+
+    } else {
+        q = ngx_queue_head(&kp->conf->free);
+        ngx_queue_remove(q);
+
+        item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
+    }
+
+    item->connection = c;
+    ngx_queue_insert_head(&kp->conf->cache, q);
+
+    pc->connection = NULL;
+
+    if (c->read->timer_set) {
+        ngx_del_timer(c->read);
+    }
+    if (c->write->timer_set) {
+        ngx_del_timer(c->write);
+    }
+
+    c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
+    c->read->handler = ngx_http_upstream_keepalive_close_handler;
+
+    c->data = item;
+    c->idle = 1;
+    c->log = ngx_cycle->log;
+    c->read->log = ngx_cycle->log;
+    c->write->log = ngx_cycle->log;
+    c->pool->log = ngx_cycle->log;
+
+    item->socklen = pc->socklen;
+    ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
+
+invalid:
+
+    kp->original_free_peer(pc, kp->data, state);
+}
+
+
+static void
+ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev)
+{
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
+                   "keepalive dummy handler");
+}
+
+
+static void
+ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev)
+{
+    ngx_http_upstream_keepalive_srv_conf_t  *conf;
+    ngx_http_upstream_keepalive_cache_t     *item;
+
+    int                n;
+    char               buf[1];
+    ngx_connection_t  *c;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
+                   "keepalive close handler");
+
+    c = ev->data;
+
+    if (c->close) {
+        goto close;
+    }
+
+    n = recv(c->fd, buf, 1, MSG_PEEK);
+
+    if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
+        /* stale event */
+
+        if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+            goto close;
+        }
+
+        return;
+    }
+
+close:
+
+    item = c->data;
+    conf = item->conf;
+
+    ngx_http_upstream_keepalive_close(c);
+
+    ngx_queue_remove(&item->queue);
+    ngx_queue_insert_head(&conf->free, &item->queue);
+}
+
+
+static void
+ngx_http_upstream_keepalive_close(ngx_connection_t *c)
+{
+
+#if (NGX_HTTP_SSL)
+
+    if (c->ssl) {
+        c->ssl->no_wait_shutdown = 1;
+        c->ssl->no_send_shutdown = 1;
+
+        if (ngx_ssl_shutdown(c) == NGX_AGAIN) {
+            c->ssl->handler = ngx_http_upstream_keepalive_close;
+            return;
+        }
+    }
+
+#endif
+
+    ngx_destroy_pool(c->pool);
+    ngx_close_connection(c);
+}
+
+
+#if (NGX_HTTP_SSL)
+
+static ngx_int_t
+ngx_http_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data)
+{
+    ngx_http_upstream_keepalive_peer_data_t  *kp = data;
+
+    return kp->original_set_session(pc, kp->data);
+}
+
+
+static void
+ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data)
+{
+    ngx_http_upstream_keepalive_peer_data_t  *kp = data;
+
+    kp->original_save_session(pc, kp->data);
+    return;
+}
+
+#endif
+
+
+static void *
+ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf)
+{
+    ngx_http_upstream_keepalive_srv_conf_t  *conf;
+
+    conf = ngx_pcalloc(cf->pool,
+                       sizeof(ngx_http_upstream_keepalive_srv_conf_t));
+    if (conf == NULL) {
+        return NULL;
+    }
+
+    /*
+     * set by ngx_pcalloc():
+     *
+     *     conf->original_init_upstream = NULL;
+     *     conf->original_init_peer = NULL;
+     */
+
+    conf->max_cached = 1;
+
+    return conf;
+}
+
+
+static char *
+ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+    ngx_http_upstream_srv_conf_t            *uscf;
+    ngx_http_upstream_keepalive_srv_conf_t  *kcf;
+
+    ngx_int_t    n;
+    ngx_str_t   *value;
+    ngx_uint_t   i;
+
+    uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
+
+    kcf = ngx_http_conf_upstream_srv_conf(uscf,
+                                          ngx_http_upstream_keepalive_module);
+
+    kcf->original_init_upstream = uscf->peer.init_upstream
+                                  ? uscf->peer.init_upstream
+                                  : ngx_http_upstream_init_round_robin;
+
+    uscf->peer.init_upstream = ngx_http_upstream_init_keepalive;
+
+    /* read options */
+
+    value = cf->args->elts;
+
+    n = ngx_atoi(value[1].data, value[1].len);
+
+    if (n == NGX_ERROR || n == 0) {
+        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+                           "invalid value \"%V\" in \"%V\" directive",
+                           &value[1], &cmd->name);
+        return NGX_CONF_ERROR;
+    }
+
+    kcf->max_cached = n;
+
+    for (i = 2; i < cf->args->nelts; i++) {
+
+        if (ngx_strcmp(value[i].data, "single") == 0) {
+            kcf->single = 1;
+            continue;
+        }
+
+        goto invalid;
+    }
+
+    return NGX_CONF_OK;
+
+invalid:
+
+    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+                       "invalid parameter \"%V\"", &value[i]);
+
+    return NGX_CONF_ERROR;
+}



More information about the nginx-devel mailing list