[PATCH 5 of 6] Upstream: allow any worker to resolve upstream servers

J Carter jordanc.carter at outlook.com
Mon Feb 6 03:01:44 UTC 2023


Hi Aleksei,

Why not permanently assign the task of resolving a given upstream server 
group (all servers/peers within it) to a single worker?

It seems that this approach would resolve the SRV issues, and remove the 
need for the shared queue of tasks.

The load would still be spread evenly for the most realistic scenarios - 
which is where there are many upstream server groups of few servers, as 
opposed to few upstream server groups of many servers.


On 01/02/2023 01:37, Aleksei Bavshin wrote:
> # HG changeset patch
> # User Aleksei Bavshin <a.bavshin at nginx.com>
> # Date 1670883784 28800
> #      Mon Dec 12 14:23:04 2022 -0800
> # Node ID f8eb6b94d8f46008eb5f2f1dbc747750d5755506
> # Parent  cfae397f1ea87a35c41febab6162fe5142aa767b
> Upstream: allow any worker to resolve upstream servers.
>
> This change addresses one of the limitations of the current re-resolve
> code, dependency on the worker 0.  Now, each worker is able to pick a
> resolve task from a shared priority queue.
>
> The single worker implementation relies on the fact that each peer is
> assigned to a specific worker and no other process may access its data.
> Thus, it's safe to keep `peer->host.event` in the shared zone and modify
> as necessary.  That assumption becomes invalid once we allow any free
> worker to update the peer.  Now, all the workers have to know when the
> previous resolution result expires and maintain their own timers.  A
> single shared event structure is no longer sufficient.
>
> The obvious solution is to make timer events local to a worker by moving
> them up to the nearest object in a local memory, upstream.  From the
> upstream timer event handler we can walk the list of the peers and pick
> these that are expired and not already owned by another process.
>
> To reduce the time spent under a lock we can keep a priority queue of
> pending tasks, sorted by expiration time.  Each worker is able to get an
> expired server from the head of the queue, perform the name resolution
> and put the peer back with a new expiration time.
> Per-upstream or per-zone rbtree was considered as a store for pending
> tasks, but there won't be any performance benefit until a certain large
> number of servers in the upstream.  Per-zone queues also require more
> intricate locking.
>
> The benefits of the change are obvious: we're no longer tied to a
> lifetime of the first worker process and the distribution of the tasks
> is more even.  There are certain disadvantages though:
>
> - SRV record may resolve into multiple A/AAAA records with different TTL
>    kept in a worker-local cache of a resolver.  The next query in the
>    same worker will reuse all the cache entries that are still valid.
>    With the task distribution implemented, any worker may schedule the
>    next update of a peer and thus we lose the benefit of a local cache.
>
> - The change introduces an additional short lock on the list of peers
>    and allows to acquire existing long locks from different processes.
>    For example, it's possible that different workers will resolve large
>    SRV records from the same upstream and attempt to update the list of
>    peers at the same time.
>
> diff --git a/src/http/modules/ngx_http_upstream_zone_module.c b/src/http/modules/ngx_http_upstream_zone_module.c
> --- a/src/http/modules/ngx_http_upstream_zone_module.c
> +++ b/src/http/modules/ngx_http_upstream_zone_module.c
> @@ -10,6 +10,9 @@
>   #include <ngx_http.h>
>   
>   
> +#define NGX_UPSTREAM_RESOLVE_NO_WORKER  (ngx_uint_t) -1
> +
> +
>   static char *ngx_http_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd,
>       void *conf);
>   static ngx_int_t ngx_http_upstream_init_zone(ngx_shm_zone_t *shm_zone,
> @@ -40,6 +43,13 @@ static ngx_command_t  ngx_http_upstream_
>   static ngx_int_t ngx_http_upstream_zone_init_worker(ngx_cycle_t *cycle);
>   static void ngx_http_upstream_zone_resolve_timer(ngx_event_t *event);
>   static void ngx_http_upstream_zone_resolve_handler(ngx_resolver_ctx_t *ctx);
> +static void ngx_http_upstream_zone_resolve_queue_insert(ngx_queue_t *queue,
> +    ngx_http_upstream_host_t *host);
> +static void ngx_http_upstream_zone_start_resolve(
> +    ngx_http_upstream_srv_conf_t *uscf, ngx_http_upstream_host_t *host);
> +static void ngx_http_upstream_zone_schedule_resolve(
> +    ngx_http_upstream_srv_conf_t *uscf, ngx_http_upstream_host_t *host,
> +    ngx_msec_t timer);
>   
>   
>   static ngx_http_module_t  ngx_http_upstream_zone_module_ctx = {
> @@ -231,6 +241,8 @@ ngx_http_upstream_zone_copy_peers(ngx_sl
>       peers->shpool = shpool;
>       peers->config = config;
>   
> +    ngx_queue_init(&peers->resolve_queue);
> +
>       for (peerp = &peers->peer; *peerp; peerp = &peer->next) {
>           /* pool is unlocked */
>           peer = ngx_http_upstream_zone_copy_peer(peers, *peerp);
> @@ -248,6 +260,9 @@ ngx_http_upstream_zone_copy_peers(ngx_sl
>               return NULL;
>           }
>   
> +        ngx_http_upstream_rr_peer_ref(peers, peer);
> +        ngx_queue_insert_tail(&peers->resolve_queue, &peer->host->queue);
> +
>           *peerp = peer;
>           peer->id = (*peers->config)++;
>       }
> @@ -268,6 +283,8 @@ ngx_http_upstream_zone_copy_peers(ngx_sl
>       backup->shpool = shpool;
>       backup->config = config;
>   
> +    ngx_queue_init(&backup->resolve_queue);
> +
>       for (peerp = &backup->peer; *peerp; peerp = &peer->next) {
>           /* pool is unlocked */
>           peer = ngx_http_upstream_zone_copy_peer(backup, *peerp);
> @@ -285,6 +302,9 @@ ngx_http_upstream_zone_copy_peers(ngx_sl
>               return NULL;
>           }
>   
> +        ngx_http_upstream_rr_peer_ref(backup, peer);
> +        ngx_queue_insert_tail(&backup->resolve_queue, &peer->host->queue);
> +
>           *peerp = peer;
>           peer->id = (*backup->config)++;
>       }
> @@ -357,6 +377,8 @@ ngx_http_upstream_zone_copy_peer(ngx_htt
>   
>               dst->host->peers = peers;
>               dst->host->peer = dst;
> +            dst->host->expires = ngx_current_msec;
> +            dst->host->worker = NGX_UPSTREAM_RESOLVE_NO_WORKER;
>   
>               dst->host->name.len = src->host->name.len;
>               ngx_memcpy(dst->host->name.data, src->host->name.data,
> @@ -438,13 +460,124 @@ ngx_http_upstream_zone_remove_peer_locke
>   }
>   
>   
> +static void
> +ngx_http_upstream_zone_resolve_queue_insert(ngx_queue_t *queue,
> +                                            ngx_http_upstream_host_t *host)
> +{
> +    ngx_queue_t               *q;
> +    ngx_http_upstream_host_t  *item;
> +
> +    q = ngx_queue_last(queue);
> +
> +    while (q != ngx_queue_sentinel(queue)) {
> +        item = ngx_queue_data(q, ngx_http_upstream_host_t, queue);
> +
> +        if ((ngx_msec_int_t) (item->expires - host->expires) <= 0) {
> +            break;
> +        }
> +
> +        q = ngx_queue_prev(q);
> +    }
> +
> +    ngx_queue_insert_after(q, &host->queue);
> +}
> +
> +
> +static void
> +ngx_http_upstream_zone_schedule_resolve(ngx_http_upstream_srv_conf_t *uscf,
> +                                        ngx_http_upstream_host_t *host,
> +                                        ngx_msec_t timer)
> +{
> +    ngx_msec_t                     now;
> +    ngx_event_t                   *event;
> +    ngx_http_upstream_host_t      *head;
> +    ngx_http_upstream_rr_peers_t  *peers;
> +
> +    now = ngx_current_msec;
> +    event = &uscf->event;
> +    peers = host->peers;
> +
> +    ngx_http_upstream_rr_peers_wlock(peers);
> +
> +    host->expires = now + timer;
> +    host->worker = NGX_UPSTREAM_RESOLVE_NO_WORKER;
> +    ngx_http_upstream_zone_resolve_queue_insert(&peers->resolve_queue, host);
> +
> +    head = ngx_queue_data(ngx_queue_head(&peers->resolve_queue),
> +                          ngx_http_upstream_host_t, queue);
> +    if ((ngx_msec_int_t) (head->expires - host->expires) < 0) {
> +        timer = ngx_max((ngx_msec_int_t) (head->expires - now), 1);
> +    }
> +
> +    ngx_http_upstream_rr_peers_unlock(peers);
> +
> +    if (!event->timer_set
> +        || (ngx_msec_int_t) (now + timer - event->timer.key) < 0)
> +    {
> +        ngx_add_timer(event, timer);
> +    }
> +}
> +
> +
> +static void
> +ngx_http_upstream_zone_resolve_timer(ngx_event_t *event)
> +{
> +    ngx_msec_t                     now, timer;
> +    ngx_msec_int_t                 expires;
> +    ngx_http_upstream_host_t      *host;
> +    ngx_http_upstream_srv_conf_t  *uscf;
> +    ngx_http_upstream_rr_peers_t  *peers;
> +
> +    uscf = event->data;
> +    peers = uscf->peer.data;
> +    now = ngx_current_msec;
> +
> +    timer = (ngx_msec_t) 1000 *
> +            (uscf->resolver->valid ? uscf->resolver->valid : 10);
> +
> +    do {
> +        for ( ;; ) {
> +            ngx_http_upstream_rr_peers_wlock(peers);
> +
> +            if (ngx_queue_empty(&peers->resolve_queue)) {
> +                ngx_http_upstream_rr_peers_unlock(peers);
> +                break;
> +            }
> +
> +            host = ngx_queue_data(ngx_queue_head(&peers->resolve_queue),
> +                                  ngx_http_upstream_host_t, queue);
> +            expires = host->expires - now;
> +
> +            if (expires > 0) {
> +                ngx_http_upstream_rr_peers_unlock(peers);
> +                timer = ngx_min(timer, (ngx_msec_t) expires);
> +                break;
> +            }
> +
> +            ngx_queue_remove(&host->queue);
> +            host->worker = ngx_worker;
> +
> +            ngx_http_upstream_rr_peers_unlock(peers);
> +            ngx_http_upstream_zone_start_resolve(uscf, host);
> +        }
> +
> +        peers = peers->next;
> +
> +    } while (peers);
> +
> +    if (!event->timer_set
> +        || ((ngx_msec_int_t) (now + timer - event->timer.key)) < 0)
> +    {
> +        ngx_add_timer(event, timer);
> +    }
> +}
> +
> +
>   static ngx_int_t
>   ngx_http_upstream_zone_init_worker(ngx_cycle_t *cycle)
>   {
>       ngx_uint_t                      i;
>       ngx_event_t                    *event;
> -    ngx_http_upstream_rr_peer_t    *peer;
> -    ngx_http_upstream_rr_peers_t   *peers;
>       ngx_http_upstream_srv_conf_t   *uscf, **uscfp;
>       ngx_http_upstream_main_conf_t  *umcf;
>   
> @@ -470,34 +603,13 @@ ngx_http_upstream_zone_init_worker(ngx_c
>               continue;
>           }
>   
> -        peers = uscf->peer.data;
> -
> -        do {
> -            ngx_http_upstream_rr_peers_wlock(peers);
> -
> -            for (peer = peers->resolve; peer; peer = peer->next) {
> -
> -                if (peer->host->worker != ngx_worker) {
> -                    continue;
> -                }
> -
> -                event = &peer->host->event;
> -                ngx_memzero(event, sizeof(ngx_event_t));
> +        event = &uscf->event;
> +        event->data = uscf;
> +        event->handler = ngx_http_upstream_zone_resolve_timer;
> +        event->log = cycle->log;
> +        event->cancelable = 1;
>   
> -                event->data = uscf;
> -                event->handler = ngx_http_upstream_zone_resolve_timer;
> -                event->log = cycle->log;
> -                event->cancelable = 1;
> -
> -                ngx_http_upstream_rr_peer_ref(peers, peer);
> -                ngx_add_timer(event, 1);
> -            }
> -
> -            ngx_http_upstream_rr_peers_unlock(peers);
> -
> -            peers = peers->next;
> -
> -        } while (peers);
> +        ngx_add_timer(event, 1);
>       }
>   
>       return NGX_OK;
> @@ -505,16 +617,13 @@ ngx_http_upstream_zone_init_worker(ngx_c
>   
>   
>   static void
> -ngx_http_upstream_zone_resolve_timer(ngx_event_t *event)
> +ngx_http_upstream_zone_start_resolve(ngx_http_upstream_srv_conf_t *uscf,
> +                                     ngx_http_upstream_host_t *host)
>   {
>       ngx_resolver_ctx_t            *ctx;
> -    ngx_http_upstream_host_t      *host;
>       ngx_http_upstream_rr_peer_t   *template;
>       ngx_http_upstream_rr_peers_t  *peers;
> -    ngx_http_upstream_srv_conf_t  *uscf;
>   
> -    host = (ngx_http_upstream_host_t *) event;
> -    uscf = event->data;
>       peers = host->peers;
>       template = host->peer;
>   
> @@ -540,11 +649,13 @@ ngx_http_upstream_zone_resolve_timer(ngx
>       }
>   
>       if (ctx == NGX_NO_RESOLVER) {
> -        ngx_log_error(NGX_LOG_ERR, event->log, 0,
> +        ngx_log_error(NGX_LOG_ERR, uscf->event.log, 0,
>                         "no resolver defined to resolve %V", &host->name);
>           return;
>       }
>   
> +    host->upstream = uscf;
> +
>       ctx->name = host->name;
>       ctx->handler = ngx_http_upstream_zone_resolve_handler;
>       ctx->data = host;
> @@ -558,7 +669,8 @@ ngx_http_upstream_zone_resolve_timer(ngx
>   
>   retry:
>   
> -    ngx_add_timer(event, ngx_max(uscf->resolver_timeout, 1000));
> +    ngx_http_upstream_zone_schedule_resolve(uscf, host,
> +                                        ngx_max(uscf->resolver_timeout, 1000));
>   }
>   
>   
> @@ -590,8 +702,8 @@ ngx_http_upstream_zone_resolve_handler(n
>       ngx_http_upstream_srv_conf_t  *uscf;
>   
>       host = ctx->data;
> -    event = &host->event;
> -    uscf = event->data;
> +    uscf = host->upstream;
> +    event = &uscf->event;
>       peers = host->peers;
>       template = host->peer;
>   
> @@ -651,7 +763,8 @@ ngx_http_upstream_zone_resolve_handler(n
>   
>               ngx_resolve_name_done(ctx);
>   
> -            ngx_add_timer(event, ngx_max(uscf->resolver_timeout, 1000));
> +            ngx_http_upstream_zone_schedule_resolve(uscf, host,
> +                                        ngx_max(uscf->resolver_timeout, 1000));
>               return;
>           }
>   
> @@ -851,5 +964,5 @@ done:
>   
>       ngx_resolve_name_done(ctx);
>   
> -    ngx_add_timer(event, timer);
> +    ngx_http_upstream_zone_schedule_resolve(uscf, host, timer);
>   }
> diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h
> --- a/src/http/ngx_http_upstream.h
> +++ b/src/http/ngx_http_upstream.h
> @@ -138,6 +138,7 @@ struct ngx_http_upstream_srv_conf_s {
>       ngx_uint_t                       no_port;  /* unsigned no_port:1 */
>   
>   #if (NGX_HTTP_UPSTREAM_ZONE)
> +    ngx_event_t                      event;
>       ngx_shm_zone_t                  *shm_zone;
>       ngx_resolver_t                  *resolver;
>       ngx_msec_t                       resolver_timeout;
> diff --git a/src/http/ngx_http_upstream_round_robin.h b/src/http/ngx_http_upstream_round_robin.h
> --- a/src/http/ngx_http_upstream_round_robin.h
> +++ b/src/http/ngx_http_upstream_round_robin.h
> @@ -21,12 +21,14 @@ typedef struct ngx_http_upstream_rr_peer
>   #if (NGX_HTTP_UPSTREAM_ZONE)
>   
>   typedef struct {
> -    ngx_event_t                     event;         /* must be first */
> +    ngx_queue_t                     queue;
>       ngx_uint_t                      worker;
> +    ngx_msec_t                      expires;
>       ngx_str_t                       name;
>       ngx_str_t                       service;
>       ngx_http_upstream_rr_peers_t   *peers;
>       ngx_http_upstream_rr_peer_t    *peer;
> +    ngx_http_upstream_srv_conf_t   *upstream;  /* local */
>   } ngx_http_upstream_host_t;
>   
>   #endif
> @@ -101,6 +103,7 @@ struct ngx_http_upstream_rr_peers_s {
>   #if (NGX_HTTP_UPSTREAM_ZONE)
>       ngx_uint_t                     *config;
>       ngx_http_upstream_rr_peer_t    *resolve;
> +    ngx_queue_t                     resolve_queue;
>       ngx_uint_t                      zombies;
>   #endif
>   };
> diff --git a/src/stream/ngx_stream_upstream.h b/src/stream/ngx_stream_upstream.h
> --- a/src/stream/ngx_stream_upstream.h
> +++ b/src/stream/ngx_stream_upstream.h
> @@ -85,6 +85,7 @@ struct ngx_stream_upstream_srv_conf_s {
>       ngx_uint_t                         no_port;  /* unsigned no_port:1 */
>   
>   #if (NGX_STREAM_UPSTREAM_ZONE)
> +    ngx_event_t                        event;
>       ngx_shm_zone_t                    *shm_zone;
>       ngx_resolver_t                    *resolver;
>       ngx_msec_t                         resolver_timeout;
> diff --git a/src/stream/ngx_stream_upstream_round_robin.h b/src/stream/ngx_stream_upstream_round_robin.h
> --- a/src/stream/ngx_stream_upstream_round_robin.h
> +++ b/src/stream/ngx_stream_upstream_round_robin.h
> @@ -21,12 +21,14 @@ typedef struct ngx_stream_upstream_rr_pe
>   #if (NGX_STREAM_UPSTREAM_ZONE)
>   
>   typedef struct {
> -    ngx_event_t                      event;         /* must be first */
> +    ngx_queue_t                      queue;
>       ngx_uint_t                       worker;
> +    ngx_msec_t                       expires;
>       ngx_str_t                        name;
>       ngx_str_t                        service;
>       ngx_stream_upstream_rr_peers_t  *peers;
>       ngx_stream_upstream_rr_peer_t   *peer;
> +    ngx_stream_upstream_srv_conf_t  *upstream;  /* local */
>   } ngx_stream_upstream_host_t;
>   
>   #endif
> @@ -99,6 +101,7 @@ struct ngx_stream_upstream_rr_peers_s {
>   #if (NGX_STREAM_UPSTREAM_ZONE)
>       ngx_uint_t                      *config;
>       ngx_stream_upstream_rr_peer_t   *resolve;
> +    ngx_queue_t                      resolve_queue;
>       ngx_uint_t                       zombies;
>   #endif
>   };
> diff --git a/src/stream/ngx_stream_upstream_zone_module.c b/src/stream/ngx_stream_upstream_zone_module.c
> --- a/src/stream/ngx_stream_upstream_zone_module.c
> +++ b/src/stream/ngx_stream_upstream_zone_module.c
> @@ -10,6 +10,9 @@
>   #include <ngx_stream.h>
>   
>   
> +#define NGX_UPSTREAM_RESOLVE_NO_WORKER  (ngx_uint_t) -1
> +
> +
>   static char *ngx_stream_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd,
>       void *conf);
>   static ngx_int_t ngx_stream_upstream_init_zone(ngx_shm_zone_t *shm_zone,
> @@ -40,6 +43,13 @@ static ngx_command_t  ngx_stream_upstrea
>   static ngx_int_t ngx_stream_upstream_zone_init_worker(ngx_cycle_t *cycle);
>   static void ngx_stream_upstream_zone_resolve_timer(ngx_event_t *event);
>   static void ngx_stream_upstream_zone_resolve_handler(ngx_resolver_ctx_t *ctx);
> +static void ngx_stream_upstream_zone_resolve_queue_insert(ngx_queue_t *queue,
> +    ngx_stream_upstream_host_t *host);
> +static void ngx_stream_upstream_zone_start_resolve(
> +    ngx_stream_upstream_srv_conf_t *uscf, ngx_stream_upstream_host_t *host);
> +static void ngx_stream_upstream_zone_schedule_resolve(
> +    ngx_stream_upstream_srv_conf_t *uscf, ngx_stream_upstream_host_t *host,
> +    ngx_msec_t timer);
>   
>   
>   static ngx_stream_module_t  ngx_stream_upstream_zone_module_ctx = {
> @@ -228,6 +238,8 @@ ngx_stream_upstream_zone_copy_peers(ngx_
>       peers->shpool = shpool;
>       peers->config = config;
>   
> +    ngx_queue_init(&peers->resolve_queue);
> +
>       for (peerp = &peers->peer; *peerp; peerp = &peer->next) {
>           /* pool is unlocked */
>           peer = ngx_stream_upstream_zone_copy_peer(peers, *peerp);
> @@ -245,6 +257,9 @@ ngx_stream_upstream_zone_copy_peers(ngx_
>               return NULL;
>           }
>   
> +        ngx_stream_upstream_rr_peer_ref(peers, peer);
> +        ngx_queue_insert_tail(&peers->resolve_queue, &peer->host->queue);
> +
>           *peerp = peer;
>           peer->id = (*peers->config)++;
>       }
> @@ -265,6 +280,8 @@ ngx_stream_upstream_zone_copy_peers(ngx_
>       backup->shpool = shpool;
>       backup->config = config;
>   
> +    ngx_queue_init(&backup->resolve_queue);
> +
>       for (peerp = &backup->peer; *peerp; peerp = &peer->next) {
>           /* pool is unlocked */
>           peer = ngx_stream_upstream_zone_copy_peer(backup, *peerp);
> @@ -282,6 +299,9 @@ ngx_stream_upstream_zone_copy_peers(ngx_
>               return NULL;
>           }
>   
> +        ngx_stream_upstream_rr_peer_ref(backup, peer);
> +        ngx_queue_insert_tail(&backup->resolve_queue, &peer->host->queue);
> +
>           *peerp = peer;
>           peer->id = (*backup->config)++;
>       }
> @@ -354,6 +374,8 @@ ngx_stream_upstream_zone_copy_peer(ngx_s
>   
>               dst->host->peers = peers;
>               dst->host->peer = dst;
> +            dst->host->expires = ngx_current_msec;
> +            dst->host->worker = NGX_UPSTREAM_RESOLVE_NO_WORKER;
>   
>               dst->host->name.len = src->host->name.len;
>               ngx_memcpy(dst->host->name.data, src->host->name.data,
> @@ -435,13 +457,124 @@ ngx_stream_upstream_zone_remove_peer_loc
>   }
>   
>   
> +static void
> +ngx_stream_upstream_zone_resolve_queue_insert(ngx_queue_t *queue,
> +                                              ngx_stream_upstream_host_t *host)
> +{
> +    ngx_queue_t                 *q;
> +    ngx_stream_upstream_host_t  *item;
> +
> +    q = ngx_queue_last(queue);
> +
> +    while (q != ngx_queue_sentinel(queue)) {
> +        item = ngx_queue_data(q, ngx_stream_upstream_host_t, queue);
> +
> +        if ((ngx_msec_int_t) (item->expires - host->expires) <= 0) {
> +            break;
> +        }
> +
> +        q = ngx_queue_prev(q);
> +    }
> +
> +    ngx_queue_insert_after(q, &host->queue);
> +}
> +
> +
> +static void
> +ngx_stream_upstream_zone_schedule_resolve(ngx_stream_upstream_srv_conf_t *uscf,
> +                                          ngx_stream_upstream_host_t *host,
> +                                          ngx_msec_t timer)
> +{
> +    ngx_msec_t                       now;
> +    ngx_event_t                     *event;
> +    ngx_stream_upstream_host_t      *head;
> +    ngx_stream_upstream_rr_peers_t  *peers;
> +
> +    now = ngx_current_msec;
> +    event = &uscf->event;
> +    peers = host->peers;
> +
> +    ngx_stream_upstream_rr_peers_wlock(peers);
> +
> +    host->expires = now + timer;
> +    host->worker = NGX_UPSTREAM_RESOLVE_NO_WORKER;
> +    ngx_stream_upstream_zone_resolve_queue_insert(&peers->resolve_queue, host);
> +
> +    head = ngx_queue_data(ngx_queue_head(&peers->resolve_queue),
> +                          ngx_stream_upstream_host_t, queue);
> +    if ((ngx_msec_int_t) (head->expires - host->expires) < 0) {
> +        timer = ngx_max((ngx_msec_int_t) (head->expires - now), 1);
> +    }
> +
> +    ngx_stream_upstream_rr_peers_unlock(peers);
> +
> +    if (!event->timer_set
> +        || (ngx_msec_int_t) (now + timer - event->timer.key) < 0)
> +    {
> +        ngx_add_timer(event, timer);
> +    }
> +}
> +
> +
> +static void
> +ngx_stream_upstream_zone_resolve_timer(ngx_event_t *event)
> +{
> +    ngx_msec_t                       now, timer;
> +    ngx_msec_int_t                   expires;
> +    ngx_stream_upstream_host_t      *host;
> +    ngx_stream_upstream_srv_conf_t  *uscf;
> +    ngx_stream_upstream_rr_peers_t  *peers;
> +
> +    uscf = event->data;
> +    peers = uscf->peer.data;
> +    now = ngx_current_msec;
> +
> +    timer = (ngx_msec_t) 1000 *
> +            (uscf->resolver->valid ? uscf->resolver->valid : 10);
> +
> +    do {
> +        for ( ;; ) {
> +            ngx_stream_upstream_rr_peers_wlock(peers);
> +
> +            if (ngx_queue_empty(&peers->resolve_queue)) {
> +                ngx_stream_upstream_rr_peers_unlock(peers);
> +                break;
> +            }
> +
> +            host = ngx_queue_data(ngx_queue_head(&peers->resolve_queue),
> +                                  ngx_stream_upstream_host_t, queue);
> +            expires = host->expires - now;
> +
> +            if (expires > 0) {
> +                ngx_stream_upstream_rr_peers_unlock(peers);
> +                timer = ngx_min(timer, (ngx_msec_t) expires);
> +                break;
> +            }
> +
> +            ngx_queue_remove(&host->queue);
> +            host->worker = ngx_worker;
> +
> +            ngx_stream_upstream_rr_peers_unlock(peers);
> +            ngx_stream_upstream_zone_start_resolve(uscf, host);
> +        }
> +
> +        peers = peers->next;
> +
> +    } while (peers);
> +
> +    if (!event->timer_set
> +        || (ngx_msec_int_t) (now + timer - event->timer.key) < 0)
> +    {
> +        ngx_add_timer(event, timer);
> +    }
> +}
> +
> +
>   static ngx_int_t
>   ngx_stream_upstream_zone_init_worker(ngx_cycle_t *cycle)
>   {
>       ngx_uint_t                        i;
>       ngx_event_t                      *event;
> -    ngx_stream_upstream_rr_peer_t    *peer;
> -    ngx_stream_upstream_rr_peers_t   *peers;
>       ngx_stream_upstream_srv_conf_t   *uscf, **uscfp;
>       ngx_stream_upstream_main_conf_t  *umcf;
>   
> @@ -468,34 +601,13 @@ ngx_stream_upstream_zone_init_worker(ngx
>               continue;
>           }
>   
> -        peers = uscf->peer.data;
> -
> -        do {
> -            ngx_stream_upstream_rr_peers_wlock(peers);
> -
> -            for (peer = peers->resolve; peer; peer = peer->next) {
> -
> -                if (peer->host->worker != ngx_worker) {
> -                    continue;
> -                }
> -
> -                event = &peer->host->event;
> -                ngx_memzero(event, sizeof(ngx_event_t));
> +        event = &uscf->event;
> +        event->data = uscf;
> +        event->handler = ngx_stream_upstream_zone_resolve_timer;
> +        event->log = cycle->log;
> +        event->cancelable = 1;
>   
> -                event->data = uscf;
> -                event->handler = ngx_stream_upstream_zone_resolve_timer;
> -                event->log = cycle->log;
> -                event->cancelable = 1;
> -
> -                ngx_stream_upstream_rr_peer_ref(peers, peer);
> -                ngx_add_timer(event, 1);
> -            }
> -
> -            ngx_stream_upstream_rr_peers_unlock(peers);
> -
> -            peers = peers->next;
> -
> -        } while (peers);
> +        ngx_add_timer(event, 1);
>       }
>   
>       return NGX_OK;
> @@ -503,16 +615,13 @@ ngx_stream_upstream_zone_init_worker(ngx
>   
>   
>   static void
> -ngx_stream_upstream_zone_resolve_timer(ngx_event_t *event)
> +ngx_stream_upstream_zone_start_resolve(ngx_stream_upstream_srv_conf_t *uscf,
> +                                       ngx_stream_upstream_host_t *host)
>   {
>       ngx_resolver_ctx_t              *ctx;
> -    ngx_stream_upstream_host_t      *host;
>       ngx_stream_upstream_rr_peer_t   *template;
>       ngx_stream_upstream_rr_peers_t  *peers;
> -    ngx_stream_upstream_srv_conf_t  *uscf;
>   
> -    host = (ngx_stream_upstream_host_t *) event;
> -    uscf = event->data;
>       peers = host->peers;
>       template = host->peer;
>   
> @@ -538,11 +647,13 @@ ngx_stream_upstream_zone_resolve_timer(n
>       }
>   
>       if (ctx == NGX_NO_RESOLVER) {
> -        ngx_log_error(NGX_LOG_ERR, event->log, 0,
> +        ngx_log_error(NGX_LOG_ERR, uscf->event.log, 0,
>                         "no resolver defined to resolve %V", &host->name);
>           return;
>       }
>   
> +    host->upstream = uscf;
> +
>       ctx->name = host->name;
>       ctx->handler = ngx_stream_upstream_zone_resolve_handler;
>       ctx->data = host;
> @@ -556,7 +667,8 @@ ngx_stream_upstream_zone_resolve_timer(n
>   
>   retry:
>   
> -    ngx_add_timer(event, ngx_max(uscf->resolver_timeout, 1000));
> +    ngx_stream_upstream_zone_schedule_resolve(uscf, host,
> +                                        ngx_max(uscf->resolver_timeout, 1000));
>   }
>   
>   
> @@ -588,8 +700,8 @@ ngx_stream_upstream_zone_resolve_handler
>       ngx_stream_upstream_srv_conf_t  *uscf;
>   
>       host = ctx->data;
> -    event = &host->event;
> -    uscf = event->data;
> +    uscf = host->upstream;
> +    event = &uscf->event;
>       peers = host->peers;
>       template = host->peer;
>   
> @@ -649,7 +761,8 @@ ngx_stream_upstream_zone_resolve_handler
>   
>               ngx_resolve_name_done(ctx);
>   
> -            ngx_add_timer(event, ngx_max(uscf->resolver_timeout, 1000));
> +            ngx_stream_upstream_zone_schedule_resolve(uscf, host,
> +                                        ngx_max(uscf->resolver_timeout, 1000));
>               return;
>           }
>   
> @@ -849,5 +962,5 @@ done:
>   
>       ngx_resolve_name_done(ctx);
>   
> -    ngx_add_timer(event, timer);
> +    ngx_stream_upstream_zone_schedule_resolve(uscf, host, timer);
>   }
> _______________________________________________
> nginx-devel mailing list
> nginx-devel at nginx.org
> https://mailman.nginx.org/mailman/listinfo/nginx-devel


More information about the nginx-devel mailing list