[nginx] Threads: offloading of temp files writing to thread pools.

Maxim Dounin mdounin at mdounin.ru
Fri Mar 18 03:49:27 UTC 2016


details:   http://hg.nginx.org/nginx/rev/6e10518f95d8
branches:  
changeset: 6442:6e10518f95d8
user:      Maxim Dounin <mdounin at mdounin.ru>
date:      Fri Mar 18 06:44:03 2016 +0300
description:
Threads: offloading of temp files writing to thread pools.

The ngx_thread_write_chain_to_file() function introduced, which
uses ngx_file_t thread_handler, thread_ctx and thread_task fields.
The task context structure (ngx_thread_file_ctx_t) is the same for
both reading and writing, and can be safely shared as long as
operations are serialized.

The task->handler field is now always set (and not only when task is
allocated), as the same task can be used with different handlers.

The thread_write flag is introduced in the ngx_temp_file_t structure
to explicitly enable use of ngx_thread_write_chain_to_file() in
ngx_write_chain_to_temp_file() when supported by caller.

In collaboration with Valentin Bartenev.

diffstat:

 src/core/ngx_file.c     |    9 ++
 src/core/ngx_file.h     |    1 +
 src/os/unix/ngx_files.c |  166 +++++++++++++++++++++++++++++++++++++++++++----
 src/os/unix/ngx_files.h |    2 +
 4 files changed, 164 insertions(+), 14 deletions(-)

diffs (280 lines):

diff --git a/src/core/ngx_file.c b/src/core/ngx_file.c
--- a/src/core/ngx_file.c
+++ b/src/core/ngx_file.c
@@ -124,6 +124,15 @@ ngx_write_chain_to_temp_file(ngx_temp_fi
         }
     }
 
+#if (NGX_THREADS && NGX_HAVE_PWRITEV)
+
+    if (tf->thread_write) {
+        return ngx_thread_write_chain_to_file(&tf->file, chain, tf->offset,
+                                              tf->pool);
+    }
+
+#endif
+
     return ngx_write_chain_to_file(&tf->file, chain, tf->offset, tf->pool);
 }
 
diff --git a/src/core/ngx_file.h b/src/core/ngx_file.h
--- a/src/core/ngx_file.h
+++ b/src/core/ngx_file.h
@@ -78,6 +78,7 @@ typedef struct {
     unsigned                   log_level:8;
     unsigned                   persistent:1;
     unsigned                   clean:1;
+    unsigned                   thread_write:1;
 } ngx_temp_file_t;
 
 
diff --git a/src/os/unix/ngx_files.c b/src/os/unix/ngx_files.c
--- a/src/os/unix/ngx_files.c
+++ b/src/os/unix/ngx_files.c
@@ -12,6 +12,7 @@
 #if (NGX_THREADS)
 #include <ngx_thread_pool.h>
 static void ngx_thread_read_handler(void *data, ngx_log_t *log);
+static void ngx_thread_write_chain_to_file_handler(void *data, ngx_log_t *log);
 #endif
 
 static ngx_chain_t *ngx_chain_to_iovec(ngx_iovec_t *vec, ngx_chain_t *cl);
@@ -77,14 +78,17 @@ ngx_read_file(ngx_file_t *file, u_char *
 #if (NGX_THREADS)
 
 typedef struct {
-    ngx_fd_t     fd;
-    u_char      *buf;
-    size_t       size;
-    off_t        offset;
+    ngx_fd_t       fd;
+    ngx_uint_t     write;   /* unsigned  write:1; */
 
-    size_t       read;
-    ngx_err_t    err;
-} ngx_thread_read_ctx_t;
+    u_char        *buf;
+    size_t         size;
+    ngx_chain_t   *chain;
+    off_t          offset;
+
+    size_t         nbytes;
+    ngx_err_t      err;
+} ngx_thread_file_ctx_t;
 
 
 ssize_t
@@ -92,7 +96,7 @@ ngx_thread_read(ngx_file_t *file, u_char
     ngx_pool_t *pool)
 {
     ngx_thread_task_t      *task;
-    ngx_thread_read_ctx_t  *ctx;
+    ngx_thread_file_ctx_t  *ctx;
 
     ngx_log_debug4(NGX_LOG_DEBUG_CORE, file->log, 0,
                    "thread read: %d, %p, %uz, %O",
@@ -101,13 +105,11 @@ ngx_thread_read(ngx_file_t *file, u_char
     task = file->thread_task;
 
     if (task == NULL) {
-        task = ngx_thread_task_alloc(pool, sizeof(ngx_thread_read_ctx_t));
+        task = ngx_thread_task_alloc(pool, sizeof(ngx_thread_file_ctx_t));
         if (task == NULL) {
             return NGX_ERROR;
         }
 
-        task->handler = ngx_thread_read_handler;
-
         file->thread_task = task;
     }
 
@@ -116,15 +118,25 @@ ngx_thread_read(ngx_file_t *file, u_char
     if (task->event.complete) {
         task->event.complete = 0;
 
+        if (ctx->write) {
+            ngx_log_error(NGX_LOG_ALERT, file->log, 0,
+                          "invalid thread call, read instead of write");
+            return NGX_ERROR;
+        }
+
         if (ctx->err) {
             ngx_log_error(NGX_LOG_CRIT, file->log, ctx->err,
                           "pread() \"%s\" failed", file->name.data);
             return NGX_ERROR;
         }
 
-        return ctx->read;
+        return ctx->nbytes;
     }
 
+    task->handler = ngx_thread_read_handler;
+
+    ctx->write = 0;
+
     ctx->fd = file->fd;
     ctx->buf = buf;
     ctx->size = size;
@@ -143,7 +155,7 @@ ngx_thread_read(ngx_file_t *file, u_char
 static void
 ngx_thread_read_handler(void *data, ngx_log_t *log)
 {
-    ngx_thread_read_ctx_t *ctx = data;
+    ngx_thread_file_ctx_t *ctx = data;
 
     ssize_t  n;
 
@@ -155,7 +167,7 @@ ngx_thread_read_handler(void *data, ngx_
         ctx->err = ngx_errno;
 
     } else {
-        ctx->read = n;
+        ctx->nbytes = n;
         ctx->err = 0;
     }
 
@@ -454,6 +466,132 @@ eintr:
 }
 
 
+#if (NGX_THREADS)
+
+ssize_t
+ngx_thread_write_chain_to_file(ngx_file_t *file, ngx_chain_t *cl, off_t offset,
+    ngx_pool_t *pool)
+{
+    ngx_thread_task_t      *task;
+    ngx_thread_file_ctx_t  *ctx;
+
+    ngx_log_debug3(NGX_LOG_DEBUG_CORE, file->log, 0,
+                   "thread write chain: %d, %p, %O",
+                   file->fd, cl, offset);
+
+    task = file->thread_task;
+
+    if (task == NULL) {
+        task = ngx_thread_task_alloc(pool,
+                                     sizeof(ngx_thread_file_ctx_t));
+        if (task == NULL) {
+            return NGX_ERROR;
+        }
+
+        file->thread_task = task;
+    }
+
+    ctx = task->ctx;
+
+    if (task->event.complete) {
+        task->event.complete = 0;
+
+        if (!ctx->write) {
+            ngx_log_error(NGX_LOG_ALERT, file->log, 0,
+                          "invalid thread call, write instead of read");
+            return NGX_ERROR;
+        }
+
+        if (ctx->err || ctx->nbytes == 0) {
+            ngx_log_error(NGX_LOG_CRIT, file->log, ctx->err,
+                          "pwritev() \"%s\" failed", file->name.data);
+            return NGX_ERROR;
+        }
+
+        file->offset += ctx->nbytes;
+        return ctx->nbytes;
+    }
+
+    task->handler = ngx_thread_write_chain_to_file_handler;
+
+    ctx->write = 1;
+
+    ctx->fd = file->fd;
+    ctx->chain = cl;
+    ctx->offset = offset;
+
+    if (file->thread_handler(task, file) != NGX_OK) {
+        return NGX_ERROR;
+    }
+
+    return NGX_AGAIN;
+}
+
+
+static void
+ngx_thread_write_chain_to_file_handler(void *data, ngx_log_t *log)
+{
+    ngx_thread_file_ctx_t *ctx = data;
+
+#if (NGX_HAVE_PWRITEV)
+
+    off_t          offset;
+    ssize_t        n;
+    ngx_err_t      err;
+    ngx_chain_t   *cl;
+    ngx_iovec_t    vec;
+    struct iovec   iovs[NGX_IOVS_PREALLOCATE];
+
+    vec.iovs = iovs;
+    vec.nalloc = NGX_IOVS_PREALLOCATE;
+
+    cl = ctx->chain;
+    offset = ctx->offset;
+
+    ctx->nbytes = 0;
+    ctx->err = 0;
+
+    do {
+        /* create the iovec and coalesce the neighbouring bufs */
+        cl = ngx_chain_to_iovec(&vec, cl);
+
+eintr:
+
+        n = pwritev(ctx->fd, iovs, vec.count, offset);
+
+        if (n == -1) {
+            err = ngx_errno;
+
+            if (err == NGX_EINTR) {
+                ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, err,
+                               "pwritev() was interrupted");
+                goto eintr;
+            }
+
+            ctx->err = err;
+            return;
+        }
+
+        if ((size_t) n != vec.size) {
+            ctx->nbytes = 0;
+            return;
+        }
+
+        ctx->nbytes += n;
+        offset += n;
+    } while (cl);
+
+#else
+
+    ctx->err = NGX_ENOSYS;
+    return;
+
+#endif
+}
+
+#endif /* NGX_THREADS */
+
+
 ngx_int_t
 ngx_set_file_time(u_char *name, ngx_fd_t fd, time_t s)
 {
diff --git a/src/os/unix/ngx_files.h b/src/os/unix/ngx_files.h
--- a/src/os/unix/ngx_files.h
+++ b/src/os/unix/ngx_files.h
@@ -387,6 +387,8 @@ extern ngx_uint_t  ngx_file_aio;
 #if (NGX_THREADS)
 ssize_t ngx_thread_read(ngx_file_t *file, u_char *buf, size_t size,
     off_t offset, ngx_pool_t *pool);
+ssize_t ngx_thread_write_chain_to_file(ngx_file_t *file, ngx_chain_t *cl,
+    off_t offset, ngx_pool_t *pool);
 #endif
 
 



More information about the nginx-devel mailing list