[nginx] Threads: offloading of temp files writing to thread pools.
Maxim Dounin
mdounin at mdounin.ru
Fri Mar 18 03:49:27 UTC 2016
details: http://hg.nginx.org/nginx/rev/6e10518f95d8
branches:
changeset: 6442:6e10518f95d8
user: Maxim Dounin <mdounin at mdounin.ru>
date: Fri Mar 18 06:44:03 2016 +0300
description:
Threads: offloading of temp files writing to thread pools.
The ngx_thread_write_chain_to_file() function introduced, which
uses ngx_file_t thread_handler, thread_ctx and thread_task fields.
The task context structure (ngx_thread_file_ctx_t) is the same for
both reading and writing, and can be safely shared as long as
operations are serialized.
The task->handler field is now always set (and not only when task is
allocated), as the same task can be used with different handlers.
The thread_write flag is introduced in the ngx_temp_file_t structure
to explicitly enable use of ngx_thread_write_chain_to_file() in
ngx_write_chain_to_temp_file() when supported by caller.
In collaboration with Valentin Bartenev.
diffstat:
src/core/ngx_file.c | 9 ++
src/core/ngx_file.h | 1 +
src/os/unix/ngx_files.c | 166 +++++++++++++++++++++++++++++++++++++++++++----
src/os/unix/ngx_files.h | 2 +
4 files changed, 164 insertions(+), 14 deletions(-)
diffs (280 lines):
diff --git a/src/core/ngx_file.c b/src/core/ngx_file.c
--- a/src/core/ngx_file.c
+++ b/src/core/ngx_file.c
@@ -124,6 +124,15 @@ ngx_write_chain_to_temp_file(ngx_temp_fi
}
}
+#if (NGX_THREADS && NGX_HAVE_PWRITEV)
+
+ if (tf->thread_write) {
+ return ngx_thread_write_chain_to_file(&tf->file, chain, tf->offset,
+ tf->pool);
+ }
+
+#endif
+
return ngx_write_chain_to_file(&tf->file, chain, tf->offset, tf->pool);
}
diff --git a/src/core/ngx_file.h b/src/core/ngx_file.h
--- a/src/core/ngx_file.h
+++ b/src/core/ngx_file.h
@@ -78,6 +78,7 @@ typedef struct {
unsigned log_level:8;
unsigned persistent:1;
unsigned clean:1;
+ unsigned thread_write:1;
} ngx_temp_file_t;
diff --git a/src/os/unix/ngx_files.c b/src/os/unix/ngx_files.c
--- a/src/os/unix/ngx_files.c
+++ b/src/os/unix/ngx_files.c
@@ -12,6 +12,7 @@
#if (NGX_THREADS)
#include <ngx_thread_pool.h>
static void ngx_thread_read_handler(void *data, ngx_log_t *log);
+static void ngx_thread_write_chain_to_file_handler(void *data, ngx_log_t *log);
#endif
static ngx_chain_t *ngx_chain_to_iovec(ngx_iovec_t *vec, ngx_chain_t *cl);
@@ -77,14 +78,17 @@ ngx_read_file(ngx_file_t *file, u_char *
#if (NGX_THREADS)
typedef struct {
- ngx_fd_t fd;
- u_char *buf;
- size_t size;
- off_t offset;
+ ngx_fd_t fd;
+ ngx_uint_t write; /* unsigned write:1; */
- size_t read;
- ngx_err_t err;
-} ngx_thread_read_ctx_t;
+ u_char *buf;
+ size_t size;
+ ngx_chain_t *chain;
+ off_t offset;
+
+ size_t nbytes;
+ ngx_err_t err;
+} ngx_thread_file_ctx_t;
ssize_t
@@ -92,7 +96,7 @@ ngx_thread_read(ngx_file_t *file, u_char
ngx_pool_t *pool)
{
ngx_thread_task_t *task;
- ngx_thread_read_ctx_t *ctx;
+ ngx_thread_file_ctx_t *ctx;
ngx_log_debug4(NGX_LOG_DEBUG_CORE, file->log, 0,
"thread read: %d, %p, %uz, %O",
@@ -101,13 +105,11 @@ ngx_thread_read(ngx_file_t *file, u_char
task = file->thread_task;
if (task == NULL) {
- task = ngx_thread_task_alloc(pool, sizeof(ngx_thread_read_ctx_t));
+ task = ngx_thread_task_alloc(pool, sizeof(ngx_thread_file_ctx_t));
if (task == NULL) {
return NGX_ERROR;
}
- task->handler = ngx_thread_read_handler;
-
file->thread_task = task;
}
@@ -116,15 +118,25 @@ ngx_thread_read(ngx_file_t *file, u_char
if (task->event.complete) {
task->event.complete = 0;
+ if (ctx->write) {
+ ngx_log_error(NGX_LOG_ALERT, file->log, 0,
+ "invalid thread call, read instead of write");
+ return NGX_ERROR;
+ }
+
if (ctx->err) {
ngx_log_error(NGX_LOG_CRIT, file->log, ctx->err,
"pread() \"%s\" failed", file->name.data);
return NGX_ERROR;
}
- return ctx->read;
+ return ctx->nbytes;
}
+ task->handler = ngx_thread_read_handler;
+
+ ctx->write = 0;
+
ctx->fd = file->fd;
ctx->buf = buf;
ctx->size = size;
@@ -143,7 +155,7 @@ ngx_thread_read(ngx_file_t *file, u_char
static void
ngx_thread_read_handler(void *data, ngx_log_t *log)
{
- ngx_thread_read_ctx_t *ctx = data;
+ ngx_thread_file_ctx_t *ctx = data;
ssize_t n;
@@ -155,7 +167,7 @@ ngx_thread_read_handler(void *data, ngx_
ctx->err = ngx_errno;
} else {
- ctx->read = n;
+ ctx->nbytes = n;
ctx->err = 0;
}
@@ -454,6 +466,132 @@ eintr:
}
+#if (NGX_THREADS)
+
+ssize_t
+ngx_thread_write_chain_to_file(ngx_file_t *file, ngx_chain_t *cl, off_t offset,
+ ngx_pool_t *pool)
+{
+ ngx_thread_task_t *task;
+ ngx_thread_file_ctx_t *ctx;
+
+ ngx_log_debug3(NGX_LOG_DEBUG_CORE, file->log, 0,
+ "thread write chain: %d, %p, %O",
+ file->fd, cl, offset);
+
+ task = file->thread_task;
+
+ if (task == NULL) {
+ task = ngx_thread_task_alloc(pool,
+ sizeof(ngx_thread_file_ctx_t));
+ if (task == NULL) {
+ return NGX_ERROR;
+ }
+
+ file->thread_task = task;
+ }
+
+ ctx = task->ctx;
+
+ if (task->event.complete) {
+ task->event.complete = 0;
+
+ if (!ctx->write) {
+ ngx_log_error(NGX_LOG_ALERT, file->log, 0,
+ "invalid thread call, write instead of read");
+ return NGX_ERROR;
+ }
+
+ if (ctx->err || ctx->nbytes == 0) {
+ ngx_log_error(NGX_LOG_CRIT, file->log, ctx->err,
+ "pwritev() \"%s\" failed", file->name.data);
+ return NGX_ERROR;
+ }
+
+ file->offset += ctx->nbytes;
+ return ctx->nbytes;
+ }
+
+ task->handler = ngx_thread_write_chain_to_file_handler;
+
+ ctx->write = 1;
+
+ ctx->fd = file->fd;
+ ctx->chain = cl;
+ ctx->offset = offset;
+
+ if (file->thread_handler(task, file) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ return NGX_AGAIN;
+}
+
+
+static void
+ngx_thread_write_chain_to_file_handler(void *data, ngx_log_t *log)
+{
+ ngx_thread_file_ctx_t *ctx = data;
+
+#if (NGX_HAVE_PWRITEV)
+
+ off_t offset;
+ ssize_t n;
+ ngx_err_t err;
+ ngx_chain_t *cl;
+ ngx_iovec_t vec;
+ struct iovec iovs[NGX_IOVS_PREALLOCATE];
+
+ vec.iovs = iovs;
+ vec.nalloc = NGX_IOVS_PREALLOCATE;
+
+ cl = ctx->chain;
+ offset = ctx->offset;
+
+ ctx->nbytes = 0;
+ ctx->err = 0;
+
+ do {
+ /* create the iovec and coalesce the neighbouring bufs */
+ cl = ngx_chain_to_iovec(&vec, cl);
+
+eintr:
+
+ n = pwritev(ctx->fd, iovs, vec.count, offset);
+
+ if (n == -1) {
+ err = ngx_errno;
+
+ if (err == NGX_EINTR) {
+ ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, err,
+ "pwritev() was interrupted");
+ goto eintr;
+ }
+
+ ctx->err = err;
+ return;
+ }
+
+ if ((size_t) n != vec.size) {
+ ctx->nbytes = 0;
+ return;
+ }
+
+ ctx->nbytes += n;
+ offset += n;
+ } while (cl);
+
+#else
+
+ ctx->err = NGX_ENOSYS;
+ return;
+
+#endif
+}
+
+#endif /* NGX_THREADS */
+
+
ngx_int_t
ngx_set_file_time(u_char *name, ngx_fd_t fd, time_t s)
{
diff --git a/src/os/unix/ngx_files.h b/src/os/unix/ngx_files.h
--- a/src/os/unix/ngx_files.h
+++ b/src/os/unix/ngx_files.h
@@ -387,6 +387,8 @@ extern ngx_uint_t ngx_file_aio;
#if (NGX_THREADS)
ssize_t ngx_thread_read(ngx_file_t *file, u_char *buf, size_t size,
off_t offset, ngx_pool_t *pool);
+ssize_t ngx_thread_write_chain_to_file(ngx_file_t *file, ngx_chain_t *cl,
+ off_t offset, ngx_pool_t *pool);
#endif
More information about the nginx-devel
mailing list