[PATCH 5 of 6] Upstream: allow any worker to resolve upstream servers

Aleksei Bavshin a.bavshin at nginx.com
Wed Feb 1 01:37:00 UTC 2023


# HG changeset patch
# User Aleksei Bavshin <a.bavshin at nginx.com>
# Date 1670883784 28800
#      Mon Dec 12 14:23:04 2022 -0800
# Node ID f8eb6b94d8f46008eb5f2f1dbc747750d5755506
# Parent  cfae397f1ea87a35c41febab6162fe5142aa767b
Upstream: allow any worker to resolve upstream servers.

This change addresses one of the limitations of the current re-resolve
code, dependency on the worker 0.  Now, each worker is able to pick a
resolve task from a shared priority queue.

The single worker implementation relies on the fact that each peer is
assigned to a specific worker and no other process may access its data.
Thus, it's safe to keep `peer->host.event` in the shared zone and modify
as necessary.  That assumption becomes invalid once we allow any free
worker to update the peer.  Now, all the workers have to know when the
previous resolution result expires and maintain their own timers.  A
single shared event structure is no longer sufficient.

The obvious solution is to make timer events local to a worker by moving
them up to the nearest object in a local memory, upstream.  From the
upstream timer event handler we can walk the list of the peers and pick
these that are expired and not already owned by another process.

To reduce the time spent under a lock we can keep a priority queue of
pending tasks, sorted by expiration time.  Each worker is able to get an
expired server from the head of the queue, perform the name resolution
and put the peer back with a new expiration time.
Per-upstream or per-zone rbtree was considered as a store for pending
tasks, but there won't be any performance benefit until a certain large
number of servers in the upstream.  Per-zone queues also require more
intricate locking.

The benefits of the change are obvious: we're no longer tied to a
lifetime of the first worker process and the distribution of the tasks
is more even.  There are certain disadvantages though:

- SRV record may resolve into multiple A/AAAA records with different TTL
  kept in a worker-local cache of a resolver.  The next query in the
  same worker will reuse all the cache entries that are still valid.
  With the task distribution implemented, any worker may schedule the
  next update of a peer and thus we lose the benefit of a local cache.

- The change introduces an additional short lock on the list of peers
  and allows to acquire existing long locks from different processes.
  For example, it's possible that different workers will resolve large
  SRV records from the same upstream and attempt to update the list of
  peers at the same time.

diff --git a/src/http/modules/ngx_http_upstream_zone_module.c b/src/http/modules/ngx_http_upstream_zone_module.c
--- a/src/http/modules/ngx_http_upstream_zone_module.c
+++ b/src/http/modules/ngx_http_upstream_zone_module.c
@@ -10,6 +10,9 @@
 #include <ngx_http.h>
 
 
+#define NGX_UPSTREAM_RESOLVE_NO_WORKER  (ngx_uint_t) -1
+
+
 static char *ngx_http_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf);
 static ngx_int_t ngx_http_upstream_init_zone(ngx_shm_zone_t *shm_zone,
@@ -40,6 +43,13 @@ static ngx_command_t  ngx_http_upstream_
 static ngx_int_t ngx_http_upstream_zone_init_worker(ngx_cycle_t *cycle);
 static void ngx_http_upstream_zone_resolve_timer(ngx_event_t *event);
 static void ngx_http_upstream_zone_resolve_handler(ngx_resolver_ctx_t *ctx);
+static void ngx_http_upstream_zone_resolve_queue_insert(ngx_queue_t *queue,
+    ngx_http_upstream_host_t *host);
+static void ngx_http_upstream_zone_start_resolve(
+    ngx_http_upstream_srv_conf_t *uscf, ngx_http_upstream_host_t *host);
+static void ngx_http_upstream_zone_schedule_resolve(
+    ngx_http_upstream_srv_conf_t *uscf, ngx_http_upstream_host_t *host,
+    ngx_msec_t timer);
 
 
 static ngx_http_module_t  ngx_http_upstream_zone_module_ctx = {
@@ -231,6 +241,8 @@ ngx_http_upstream_zone_copy_peers(ngx_sl
     peers->shpool = shpool;
     peers->config = config;
 
+    ngx_queue_init(&peers->resolve_queue);
+
     for (peerp = &peers->peer; *peerp; peerp = &peer->next) {
         /* pool is unlocked */
         peer = ngx_http_upstream_zone_copy_peer(peers, *peerp);
@@ -248,6 +260,9 @@ ngx_http_upstream_zone_copy_peers(ngx_sl
             return NULL;
         }
 
+        ngx_http_upstream_rr_peer_ref(peers, peer);
+        ngx_queue_insert_tail(&peers->resolve_queue, &peer->host->queue);
+
         *peerp = peer;
         peer->id = (*peers->config)++;
     }
@@ -268,6 +283,8 @@ ngx_http_upstream_zone_copy_peers(ngx_sl
     backup->shpool = shpool;
     backup->config = config;
 
+    ngx_queue_init(&backup->resolve_queue);
+
     for (peerp = &backup->peer; *peerp; peerp = &peer->next) {
         /* pool is unlocked */
         peer = ngx_http_upstream_zone_copy_peer(backup, *peerp);
@@ -285,6 +302,9 @@ ngx_http_upstream_zone_copy_peers(ngx_sl
             return NULL;
         }
 
+        ngx_http_upstream_rr_peer_ref(backup, peer);
+        ngx_queue_insert_tail(&backup->resolve_queue, &peer->host->queue);
+
         *peerp = peer;
         peer->id = (*backup->config)++;
     }
@@ -357,6 +377,8 @@ ngx_http_upstream_zone_copy_peer(ngx_htt
 
             dst->host->peers = peers;
             dst->host->peer = dst;
+            dst->host->expires = ngx_current_msec;
+            dst->host->worker = NGX_UPSTREAM_RESOLVE_NO_WORKER;
 
             dst->host->name.len = src->host->name.len;
             ngx_memcpy(dst->host->name.data, src->host->name.data,
@@ -438,13 +460,124 @@ ngx_http_upstream_zone_remove_peer_locke
 }
 
 
+static void
+ngx_http_upstream_zone_resolve_queue_insert(ngx_queue_t *queue,
+                                            ngx_http_upstream_host_t *host)
+{
+    ngx_queue_t               *q;
+    ngx_http_upstream_host_t  *item;
+
+    q = ngx_queue_last(queue);
+
+    while (q != ngx_queue_sentinel(queue)) {
+        item = ngx_queue_data(q, ngx_http_upstream_host_t, queue);
+
+        if ((ngx_msec_int_t) (item->expires - host->expires) <= 0) {
+            break;
+        }
+
+        q = ngx_queue_prev(q);
+    }
+
+    ngx_queue_insert_after(q, &host->queue);
+}
+
+
+static void
+ngx_http_upstream_zone_schedule_resolve(ngx_http_upstream_srv_conf_t *uscf,
+                                        ngx_http_upstream_host_t *host,
+                                        ngx_msec_t timer)
+{
+    ngx_msec_t                     now;
+    ngx_event_t                   *event;
+    ngx_http_upstream_host_t      *head;
+    ngx_http_upstream_rr_peers_t  *peers;
+
+    now = ngx_current_msec;
+    event = &uscf->event;
+    peers = host->peers;
+
+    ngx_http_upstream_rr_peers_wlock(peers);
+
+    host->expires = now + timer;
+    host->worker = NGX_UPSTREAM_RESOLVE_NO_WORKER;
+    ngx_http_upstream_zone_resolve_queue_insert(&peers->resolve_queue, host);
+
+    head = ngx_queue_data(ngx_queue_head(&peers->resolve_queue),
+                          ngx_http_upstream_host_t, queue);
+    if ((ngx_msec_int_t) (head->expires - host->expires) < 0) {
+        timer = ngx_max((ngx_msec_int_t) (head->expires - now), 1);
+    }
+
+    ngx_http_upstream_rr_peers_unlock(peers);
+
+    if (!event->timer_set
+        || (ngx_msec_int_t) (now + timer - event->timer.key) < 0)
+    {
+        ngx_add_timer(event, timer);
+    }
+}
+
+
+static void
+ngx_http_upstream_zone_resolve_timer(ngx_event_t *event)
+{
+    ngx_msec_t                     now, timer;
+    ngx_msec_int_t                 expires;
+    ngx_http_upstream_host_t      *host;
+    ngx_http_upstream_srv_conf_t  *uscf;
+    ngx_http_upstream_rr_peers_t  *peers;
+
+    uscf = event->data;
+    peers = uscf->peer.data;
+    now = ngx_current_msec;
+
+    timer = (ngx_msec_t) 1000 *
+            (uscf->resolver->valid ? uscf->resolver->valid : 10);
+
+    do {
+        for ( ;; ) {
+            ngx_http_upstream_rr_peers_wlock(peers);
+
+            if (ngx_queue_empty(&peers->resolve_queue)) {
+                ngx_http_upstream_rr_peers_unlock(peers);
+                break;
+            }
+
+            host = ngx_queue_data(ngx_queue_head(&peers->resolve_queue),
+                                  ngx_http_upstream_host_t, queue);
+            expires = host->expires - now;
+
+            if (expires > 0) {
+                ngx_http_upstream_rr_peers_unlock(peers);
+                timer = ngx_min(timer, (ngx_msec_t) expires);
+                break;
+            }
+
+            ngx_queue_remove(&host->queue);
+            host->worker = ngx_worker;
+
+            ngx_http_upstream_rr_peers_unlock(peers);
+            ngx_http_upstream_zone_start_resolve(uscf, host);
+        }
+
+        peers = peers->next;
+
+    } while (peers);
+
+    if (!event->timer_set
+        || ((ngx_msec_int_t) (now + timer - event->timer.key)) < 0)
+    {
+        ngx_add_timer(event, timer);
+    }
+}
+
+
 static ngx_int_t
 ngx_http_upstream_zone_init_worker(ngx_cycle_t *cycle)
 {
     ngx_uint_t                      i;
     ngx_event_t                    *event;
-    ngx_http_upstream_rr_peer_t    *peer;
-    ngx_http_upstream_rr_peers_t   *peers;
     ngx_http_upstream_srv_conf_t   *uscf, **uscfp;
     ngx_http_upstream_main_conf_t  *umcf;
 
@@ -470,34 +603,13 @@ ngx_http_upstream_zone_init_worker(ngx_c
             continue;
         }
 
-        peers = uscf->peer.data;
-
-        do {
-            ngx_http_upstream_rr_peers_wlock(peers);
-
-            for (peer = peers->resolve; peer; peer = peer->next) {
-
-                if (peer->host->worker != ngx_worker) {
-                    continue;
-                }
-
-                event = &peer->host->event;
-                ngx_memzero(event, sizeof(ngx_event_t));
+        event = &uscf->event;
+        event->data = uscf;
+        event->handler = ngx_http_upstream_zone_resolve_timer;
+        event->log = cycle->log;
+        event->cancelable = 1;
 
-                event->data = uscf;
-                event->handler = ngx_http_upstream_zone_resolve_timer;
-                event->log = cycle->log;
-                event->cancelable = 1;
-
-                ngx_http_upstream_rr_peer_ref(peers, peer);
-                ngx_add_timer(event, 1);
-            }
-
-            ngx_http_upstream_rr_peers_unlock(peers);
-
-            peers = peers->next;
-
-        } while (peers);
+        ngx_add_timer(event, 1);
     }
 
     return NGX_OK;
@@ -505,16 +617,13 @@ ngx_http_upstream_zone_init_worker(ngx_c
 
 
 static void
-ngx_http_upstream_zone_resolve_timer(ngx_event_t *event)
+ngx_http_upstream_zone_start_resolve(ngx_http_upstream_srv_conf_t *uscf,
+                                     ngx_http_upstream_host_t *host)
 {
     ngx_resolver_ctx_t            *ctx;
-    ngx_http_upstream_host_t      *host;
     ngx_http_upstream_rr_peer_t   *template;
     ngx_http_upstream_rr_peers_t  *peers;
-    ngx_http_upstream_srv_conf_t  *uscf;
 
-    host = (ngx_http_upstream_host_t *) event;
-    uscf = event->data;
     peers = host->peers;
     template = host->peer;
 
@@ -540,11 +649,13 @@ ngx_http_upstream_zone_resolve_timer(ngx
     }
 
     if (ctx == NGX_NO_RESOLVER) {
-        ngx_log_error(NGX_LOG_ERR, event->log, 0,
+        ngx_log_error(NGX_LOG_ERR, uscf->event.log, 0,
                       "no resolver defined to resolve %V", &host->name);
         return;
     }
 
+    host->upstream = uscf;
+
     ctx->name = host->name;
     ctx->handler = ngx_http_upstream_zone_resolve_handler;
     ctx->data = host;
@@ -558,7 +669,8 @@ ngx_http_upstream_zone_resolve_timer(ngx
 
 retry:
 
-    ngx_add_timer(event, ngx_max(uscf->resolver_timeout, 1000));
+    ngx_http_upstream_zone_schedule_resolve(uscf, host,
+                                        ngx_max(uscf->resolver_timeout, 1000));
 }
 
 
@@ -590,8 +702,8 @@ ngx_http_upstream_zone_resolve_handler(n
     ngx_http_upstream_srv_conf_t  *uscf;
 
     host = ctx->data;
-    event = &host->event;
-    uscf = event->data;
+    uscf = host->upstream;
+    event = &uscf->event;
     peers = host->peers;
     template = host->peer;
 
@@ -651,7 +763,8 @@ ngx_http_upstream_zone_resolve_handler(n
 
             ngx_resolve_name_done(ctx);
 
-            ngx_add_timer(event, ngx_max(uscf->resolver_timeout, 1000));
+            ngx_http_upstream_zone_schedule_resolve(uscf, host,
+                                        ngx_max(uscf->resolver_timeout, 1000));
             return;
         }
 
@@ -851,5 +964,5 @@ done:
 
     ngx_resolve_name_done(ctx);
 
-    ngx_add_timer(event, timer);
+    ngx_http_upstream_zone_schedule_resolve(uscf, host, timer);
 }
diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h
--- a/src/http/ngx_http_upstream.h
+++ b/src/http/ngx_http_upstream.h
@@ -138,6 +138,7 @@ struct ngx_http_upstream_srv_conf_s {
     ngx_uint_t                       no_port;  /* unsigned no_port:1 */
 
 #if (NGX_HTTP_UPSTREAM_ZONE)
+    ngx_event_t                      event;
     ngx_shm_zone_t                  *shm_zone;
     ngx_resolver_t                  *resolver;
     ngx_msec_t                       resolver_timeout;
diff --git a/src/http/ngx_http_upstream_round_robin.h b/src/http/ngx_http_upstream_round_robin.h
--- a/src/http/ngx_http_upstream_round_robin.h
+++ b/src/http/ngx_http_upstream_round_robin.h
@@ -21,12 +21,14 @@ typedef struct ngx_http_upstream_rr_peer
 #if (NGX_HTTP_UPSTREAM_ZONE)
 
 typedef struct {
-    ngx_event_t                     event;         /* must be first */
+    ngx_queue_t                     queue;
     ngx_uint_t                      worker;
+    ngx_msec_t                      expires;
     ngx_str_t                       name;
     ngx_str_t                       service;
     ngx_http_upstream_rr_peers_t   *peers;
     ngx_http_upstream_rr_peer_t    *peer;
+    ngx_http_upstream_srv_conf_t   *upstream;  /* local */
 } ngx_http_upstream_host_t;
 
 #endif
@@ -101,6 +103,7 @@ struct ngx_http_upstream_rr_peers_s {
 #if (NGX_HTTP_UPSTREAM_ZONE)
     ngx_uint_t                     *config;
     ngx_http_upstream_rr_peer_t    *resolve;
+    ngx_queue_t                     resolve_queue;
     ngx_uint_t                      zombies;
 #endif
 };
diff --git a/src/stream/ngx_stream_upstream.h b/src/stream/ngx_stream_upstream.h
--- a/src/stream/ngx_stream_upstream.h
+++ b/src/stream/ngx_stream_upstream.h
@@ -85,6 +85,7 @@ struct ngx_stream_upstream_srv_conf_s {
     ngx_uint_t                         no_port;  /* unsigned no_port:1 */
 
 #if (NGX_STREAM_UPSTREAM_ZONE)
+    ngx_event_t                        event;
     ngx_shm_zone_t                    *shm_zone;
     ngx_resolver_t                    *resolver;
     ngx_msec_t                         resolver_timeout;
diff --git a/src/stream/ngx_stream_upstream_round_robin.h b/src/stream/ngx_stream_upstream_round_robin.h
--- a/src/stream/ngx_stream_upstream_round_robin.h
+++ b/src/stream/ngx_stream_upstream_round_robin.h
@@ -21,12 +21,14 @@ typedef struct ngx_stream_upstream_rr_pe
 #if (NGX_STREAM_UPSTREAM_ZONE)
 
 typedef struct {
-    ngx_event_t                      event;         /* must be first */
+    ngx_queue_t                      queue;
     ngx_uint_t                       worker;
+    ngx_msec_t                       expires;
     ngx_str_t                        name;
     ngx_str_t                        service;
     ngx_stream_upstream_rr_peers_t  *peers;
     ngx_stream_upstream_rr_peer_t   *peer;
+    ngx_stream_upstream_srv_conf_t  *upstream;  /* local */
 } ngx_stream_upstream_host_t;
 
 #endif
@@ -99,6 +101,7 @@ struct ngx_stream_upstream_rr_peers_s {
 #if (NGX_STREAM_UPSTREAM_ZONE)
     ngx_uint_t                      *config;
     ngx_stream_upstream_rr_peer_t   *resolve;
+    ngx_queue_t                      resolve_queue;
     ngx_uint_t                       zombies;
 #endif
 };
diff --git a/src/stream/ngx_stream_upstream_zone_module.c b/src/stream/ngx_stream_upstream_zone_module.c
--- a/src/stream/ngx_stream_upstream_zone_module.c
+++ b/src/stream/ngx_stream_upstream_zone_module.c
@@ -10,6 +10,9 @@
 #include <ngx_stream.h>
 
 
+#define NGX_UPSTREAM_RESOLVE_NO_WORKER  (ngx_uint_t) -1
+
+
 static char *ngx_stream_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf);
 static ngx_int_t ngx_stream_upstream_init_zone(ngx_shm_zone_t *shm_zone,
@@ -40,6 +43,13 @@ static ngx_command_t  ngx_stream_upstrea
 static ngx_int_t ngx_stream_upstream_zone_init_worker(ngx_cycle_t *cycle);
 static void ngx_stream_upstream_zone_resolve_timer(ngx_event_t *event);
 static void ngx_stream_upstream_zone_resolve_handler(ngx_resolver_ctx_t *ctx);
+static void ngx_stream_upstream_zone_resolve_queue_insert(ngx_queue_t *queue,
+    ngx_stream_upstream_host_t *host);
+static void ngx_stream_upstream_zone_start_resolve(
+    ngx_stream_upstream_srv_conf_t *uscf, ngx_stream_upstream_host_t *host);
+static void ngx_stream_upstream_zone_schedule_resolve(
+    ngx_stream_upstream_srv_conf_t *uscf, ngx_stream_upstream_host_t *host,
+    ngx_msec_t timer);
 
 
 static ngx_stream_module_t  ngx_stream_upstream_zone_module_ctx = {
@@ -228,6 +238,8 @@ ngx_stream_upstream_zone_copy_peers(ngx_
     peers->shpool = shpool;
     peers->config = config;
 
+    ngx_queue_init(&peers->resolve_queue);
+
     for (peerp = &peers->peer; *peerp; peerp = &peer->next) {
         /* pool is unlocked */
         peer = ngx_stream_upstream_zone_copy_peer(peers, *peerp);
@@ -245,6 +257,9 @@ ngx_stream_upstream_zone_copy_peers(ngx_
             return NULL;
         }
 
+        ngx_stream_upstream_rr_peer_ref(peers, peer);
+        ngx_queue_insert_tail(&peers->resolve_queue, &peer->host->queue);
+
         *peerp = peer;
         peer->id = (*peers->config)++;
     }
@@ -265,6 +280,8 @@ ngx_stream_upstream_zone_copy_peers(ngx_
     backup->shpool = shpool;
     backup->config = config;
 
+    ngx_queue_init(&backup->resolve_queue);
+
     for (peerp = &backup->peer; *peerp; peerp = &peer->next) {
         /* pool is unlocked */
         peer = ngx_stream_upstream_zone_copy_peer(backup, *peerp);
@@ -282,6 +299,9 @@ ngx_stream_upstream_zone_copy_peers(ngx_
             return NULL;
         }
 
+        ngx_stream_upstream_rr_peer_ref(backup, peer);
+        ngx_queue_insert_tail(&backup->resolve_queue, &peer->host->queue);
+
         *peerp = peer;
         peer->id = (*backup->config)++;
     }
@@ -354,6 +374,8 @@ ngx_stream_upstream_zone_copy_peer(ngx_s
 
             dst->host->peers = peers;
             dst->host->peer = dst;
+            dst->host->expires = ngx_current_msec;
+            dst->host->worker = NGX_UPSTREAM_RESOLVE_NO_WORKER;
 
             dst->host->name.len = src->host->name.len;
             ngx_memcpy(dst->host->name.data, src->host->name.data,
@@ -435,13 +457,124 @@ ngx_stream_upstream_zone_remove_peer_loc
 }
 
 
+static void
+ngx_stream_upstream_zone_resolve_queue_insert(ngx_queue_t *queue,
+                                              ngx_stream_upstream_host_t *host)
+{
+    ngx_queue_t                 *q;
+    ngx_stream_upstream_host_t  *item;
+
+    q = ngx_queue_last(queue);
+
+    while (q != ngx_queue_sentinel(queue)) {
+        item = ngx_queue_data(q, ngx_stream_upstream_host_t, queue);
+
+        if ((ngx_msec_int_t) (item->expires - host->expires) <= 0) {
+            break;
+        }
+
+        q = ngx_queue_prev(q);
+    }
+
+    ngx_queue_insert_after(q, &host->queue);
+}
+
+
+static void
+ngx_stream_upstream_zone_schedule_resolve(ngx_stream_upstream_srv_conf_t *uscf,
+                                          ngx_stream_upstream_host_t *host,
+                                          ngx_msec_t timer)
+{
+    ngx_msec_t                       now;
+    ngx_event_t                     *event;
+    ngx_stream_upstream_host_t      *head;
+    ngx_stream_upstream_rr_peers_t  *peers;
+
+    now = ngx_current_msec;
+    event = &uscf->event;
+    peers = host->peers;
+
+    ngx_stream_upstream_rr_peers_wlock(peers);
+
+    host->expires = now + timer;
+    host->worker = NGX_UPSTREAM_RESOLVE_NO_WORKER;
+    ngx_stream_upstream_zone_resolve_queue_insert(&peers->resolve_queue, host);
+
+    head = ngx_queue_data(ngx_queue_head(&peers->resolve_queue),
+                          ngx_stream_upstream_host_t, queue);
+    if ((ngx_msec_int_t) (head->expires - host->expires) < 0) {
+        timer = ngx_max((ngx_msec_int_t) (head->expires - now), 1);
+    }
+
+    ngx_stream_upstream_rr_peers_unlock(peers);
+
+    if (!event->timer_set
+        || (ngx_msec_int_t) (now + timer - event->timer.key) < 0)
+    {
+        ngx_add_timer(event, timer);
+    }
+}
+
+
+static void
+ngx_stream_upstream_zone_resolve_timer(ngx_event_t *event)
+{
+    ngx_msec_t                       now, timer;
+    ngx_msec_int_t                   expires;
+    ngx_stream_upstream_host_t      *host;
+    ngx_stream_upstream_srv_conf_t  *uscf;
+    ngx_stream_upstream_rr_peers_t  *peers;
+
+    uscf = event->data;
+    peers = uscf->peer.data;
+    now = ngx_current_msec;
+
+    timer = (ngx_msec_t) 1000 *
+            (uscf->resolver->valid ? uscf->resolver->valid : 10);
+
+    do {
+        for ( ;; ) {
+            ngx_stream_upstream_rr_peers_wlock(peers);
+
+            if (ngx_queue_empty(&peers->resolve_queue)) {
+                ngx_stream_upstream_rr_peers_unlock(peers);
+                break;
+            }
+
+            host = ngx_queue_data(ngx_queue_head(&peers->resolve_queue),
+                                  ngx_stream_upstream_host_t, queue);
+            expires = host->expires - now;
+
+            if (expires > 0) {
+                ngx_stream_upstream_rr_peers_unlock(peers);
+                timer = ngx_min(timer, (ngx_msec_t) expires);
+                break;
+            }
+
+            ngx_queue_remove(&host->queue);
+            host->worker = ngx_worker;
+
+            ngx_stream_upstream_rr_peers_unlock(peers);
+            ngx_stream_upstream_zone_start_resolve(uscf, host);
+        }
+
+        peers = peers->next;
+
+    } while (peers);
+
+    if (!event->timer_set
+        || (ngx_msec_int_t) (now + timer - event->timer.key) < 0)
+    {
+        ngx_add_timer(event, timer);
+    }
+}
+
+
 static ngx_int_t
 ngx_stream_upstream_zone_init_worker(ngx_cycle_t *cycle)
 {
     ngx_uint_t                        i;
     ngx_event_t                      *event;
-    ngx_stream_upstream_rr_peer_t    *peer;
-    ngx_stream_upstream_rr_peers_t   *peers;
     ngx_stream_upstream_srv_conf_t   *uscf, **uscfp;
     ngx_stream_upstream_main_conf_t  *umcf;
 
@@ -468,34 +601,13 @@ ngx_stream_upstream_zone_init_worker(ngx
             continue;
         }
 
-        peers = uscf->peer.data;
-
-        do {
-            ngx_stream_upstream_rr_peers_wlock(peers);
-
-            for (peer = peers->resolve; peer; peer = peer->next) {
-
-                if (peer->host->worker != ngx_worker) {
-                    continue;
-                }
-
-                event = &peer->host->event;
-                ngx_memzero(event, sizeof(ngx_event_t));
+        event = &uscf->event;
+        event->data = uscf;
+        event->handler = ngx_stream_upstream_zone_resolve_timer;
+        event->log = cycle->log;
+        event->cancelable = 1;
 
-                event->data = uscf;
-                event->handler = ngx_stream_upstream_zone_resolve_timer;
-                event->log = cycle->log;
-                event->cancelable = 1;
-
-                ngx_stream_upstream_rr_peer_ref(peers, peer);
-                ngx_add_timer(event, 1);
-            }
-
-            ngx_stream_upstream_rr_peers_unlock(peers);
-
-            peers = peers->next;
-
-        } while (peers);
+        ngx_add_timer(event, 1);
     }
 
     return NGX_OK;
@@ -503,16 +615,13 @@ ngx_stream_upstream_zone_init_worker(ngx
 
 
 static void
-ngx_stream_upstream_zone_resolve_timer(ngx_event_t *event)
+ngx_stream_upstream_zone_start_resolve(ngx_stream_upstream_srv_conf_t *uscf,
+                                       ngx_stream_upstream_host_t *host)
 {
     ngx_resolver_ctx_t              *ctx;
-    ngx_stream_upstream_host_t      *host;
     ngx_stream_upstream_rr_peer_t   *template;
     ngx_stream_upstream_rr_peers_t  *peers;
-    ngx_stream_upstream_srv_conf_t  *uscf;
 
-    host = (ngx_stream_upstream_host_t *) event;
-    uscf = event->data;
     peers = host->peers;
     template = host->peer;
 
@@ -538,11 +647,13 @@ ngx_stream_upstream_zone_resolve_timer(n
     }
 
     if (ctx == NGX_NO_RESOLVER) {
-        ngx_log_error(NGX_LOG_ERR, event->log, 0,
+        ngx_log_error(NGX_LOG_ERR, uscf->event.log, 0,
                       "no resolver defined to resolve %V", &host->name);
         return;
     }
 
+    host->upstream = uscf;
+
     ctx->name = host->name;
     ctx->handler = ngx_stream_upstream_zone_resolve_handler;
     ctx->data = host;
@@ -556,7 +667,8 @@ ngx_stream_upstream_zone_resolve_timer(n
 
 retry:
 
-    ngx_add_timer(event, ngx_max(uscf->resolver_timeout, 1000));
+    ngx_stream_upstream_zone_schedule_resolve(uscf, host,
+                                        ngx_max(uscf->resolver_timeout, 1000));
 }
 
 
@@ -588,8 +700,8 @@ ngx_stream_upstream_zone_resolve_handler
     ngx_stream_upstream_srv_conf_t  *uscf;
 
     host = ctx->data;
-    event = &host->event;
-    uscf = event->data;
+    uscf = host->upstream;
+    event = &uscf->event;
     peers = host->peers;
     template = host->peer;
 
@@ -649,7 +761,8 @@ ngx_stream_upstream_zone_resolve_handler
 
             ngx_resolve_name_done(ctx);
 
-            ngx_add_timer(event, ngx_max(uscf->resolver_timeout, 1000));
+            ngx_stream_upstream_zone_schedule_resolve(uscf, host,
+                                        ngx_max(uscf->resolver_timeout, 1000));
             return;
         }
 
@@ -849,5 +962,5 @@ done:
 
     ngx_resolve_name_done(ctx);
 
-    ngx_add_timer(event, timer);
+    ngx_stream_upstream_zone_schedule_resolve(uscf, host, timer);
 }


More information about the nginx-devel mailing list