[PATCH 2 of 2] QUIC: input packet batching with recvmmsg()
Roman Arutyunyan
arut at nginx.com
Thu Mar 7 17:54:07 UTC 2024
# HG changeset patch
# User Roman Arutyunyan <arut at nginx.com>
# Date 1707486707 -28800
# Fri Feb 09 21:51:47 2024 +0800
# Node ID 4584ba4b1d65a90f69201cecf1f1e650c1cbd87b
# Parent 5d28510b62bffba3187d7fe69baccd2d2da41a12
QUIC: input packet batching with recvmmsg().
diff --git a/auto/os/linux b/auto/os/linux
--- a/auto/os/linux
+++ b/auto/os/linux
@@ -303,4 +303,15 @@ ngx_feature_test="struct mmsghdr msg[UIO
. auto/feature
+ngx_feature="recvmmsg()"
+ngx_feature_name="NGX_HAVE_RECVMMSG"
+ngx_feature_run=no
+ngx_feature_incs="#include <sys/socket.h>"
+ngx_feature_path=
+ngx_feature_libs=
+ngx_feature_test="struct mmsghdr msg[64];
+ recvmmsg(0, msg, 64, 0, NULL);"
+. auto/feature
+
+
CC_AUX_FLAGS="$cc_aux_flags -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64"
diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c
--- a/src/event/ngx_event.c
+++ b/src/event/ngx_event.c
@@ -898,7 +898,11 @@ ngx_event_process_init(ngx_cycle_t *cycl
#if (NGX_QUIC)
} else if (ls[i].quic) {
+#if (NGX_HAVE_RECVMMSG)
+ rev->handler = ngx_quic_recvmmsg;
+#else
rev->handler = ngx_quic_recvmsg;
+#endif
#if (NGX_HAVE_SENDMMSG)
wev->handler = ngx_event_sendmmsg;
diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h
--- a/src/event/quic/ngx_event_quic.h
+++ b/src/event/quic/ngx_event_quic.h
@@ -112,6 +112,9 @@ struct ngx_quic_stream_s {
void ngx_quic_recvmsg(ngx_event_t *ev);
+#if (NGX_HAVE_RECVMMSG)
+void ngx_quic_recvmmsg(ngx_event_t *ev);
+#endif
void ngx_quic_run(ngx_connection_t *c, ngx_quic_conf_t *conf);
ngx_connection_t *ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi);
void ngx_quic_finalize_connection(ngx_connection_t *c, ngx_uint_t err,
diff --git a/src/event/quic/ngx_event_quic_udp.c b/src/event/quic/ngx_event_quic_udp.c
--- a/src/event/quic/ngx_event_quic_udp.c
+++ b/src/event/quic/ngx_event_quic_udp.c
@@ -11,6 +11,7 @@
#include <ngx_event_quic_connection.h>
+static void ngx_quic_handle_msg(ngx_event_t *ev, struct msghdr *msg, size_t n);
static void ngx_quic_close_accepted_connection(ngx_connection_t *c);
static ngx_connection_t *ngx_quic_lookup_connection(ngx_listening_t *ls,
ngx_str_t *key, struct sockaddr *local_sockaddr, socklen_t local_socklen);
@@ -20,20 +21,13 @@ void
ngx_quic_recvmsg(ngx_event_t *ev)
{
ssize_t n;
- ngx_str_t key;
- ngx_buf_t buf;
- ngx_log_t *log;
ngx_err_t err;
- socklen_t socklen, local_socklen;
- ngx_event_t *rev, *wev;
struct iovec iov[1];
struct msghdr msg;
- ngx_sockaddr_t sa, lsa;
- struct sockaddr *sockaddr, *local_sockaddr;
+ ngx_sockaddr_t sa;
ngx_listening_t *ls;
ngx_event_conf_t *ecf;
- ngx_connection_t *c, *lc;
- ngx_quic_socket_t *qsock;
+ ngx_connection_t *lc;
static u_char buffer[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
#if (NGX_HAVE_ADDRINFO_CMSG)
@@ -106,239 +100,10 @@ ngx_quic_recvmsg(ngx_event_t *ev)
}
#endif
- sockaddr = msg.msg_name;
- socklen = msg.msg_namelen;
-
- if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) {
- socklen = sizeof(ngx_sockaddr_t);
- }
-
-#if (NGX_HAVE_UNIX_DOMAIN)
-
- if (sockaddr->sa_family == AF_UNIX) {
- struct sockaddr_un *saun = (struct sockaddr_un *) sockaddr;
-
- if (socklen <= (socklen_t) offsetof(struct sockaddr_un, sun_path)
- || saun->sun_path[0] == '\0')
- {
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0,
- "unbound unix socket");
- goto next;
- }
- }
-
-#endif
-
- local_sockaddr = ls->sockaddr;
- local_socklen = ls->socklen;
-
-#if (NGX_HAVE_ADDRINFO_CMSG)
-
- if (ls->wildcard) {
- struct cmsghdr *cmsg;
-
- ngx_memcpy(&lsa, local_sockaddr, local_socklen);
- local_sockaddr = &lsa.sockaddr;
-
- for (cmsg = CMSG_FIRSTHDR(&msg);
- cmsg != NULL;
- cmsg = CMSG_NXTHDR(&msg, cmsg))
- {
- if (ngx_get_srcaddr_cmsg(cmsg, local_sockaddr) == NGX_OK) {
- break;
- }
- }
- }
-
-#endif
-
- if (ngx_quic_get_packet_dcid(ev->log, buffer, n, &key) != NGX_OK) {
- goto next;
- }
-
- c = ngx_quic_lookup_connection(ls, &key, local_sockaddr, local_socklen);
-
- if (c) {
-
-#if (NGX_DEBUG)
- if (c->log->log_level & NGX_LOG_DEBUG_EVENT) {
- ngx_log_handler_pt handler;
-
- handler = c->log->handler;
- c->log->handler = NULL;
-
- ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic recvmsg: fd:%d n:%z", c->fd, n);
-
- c->log->handler = handler;
- }
-#endif
-
- ngx_memzero(&buf, sizeof(ngx_buf_t));
-
- buf.pos = buffer;
- buf.last = buffer + n;
- buf.start = buf.pos;
- buf.end = buffer + sizeof(buffer);
-
- qsock = ngx_quic_get_socket(c);
-
- ngx_memcpy(&qsock->sockaddr, sockaddr, socklen);
- qsock->socklen = socklen;
-
- c->udp->buffer = &buf;
-
- rev = c->read;
- rev->ready = 1;
- rev->active = 0;
-
- rev->handler(rev);
-
- if (c->udp) {
- c->udp->buffer = NULL;
- }
-
- rev->ready = 0;
- rev->active = 1;
-
- goto next;
- }
-
-#if (NGX_STAT_STUB)
- (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1);
-#endif
-
- ngx_accept_disabled = ngx_cycle->connection_n / 8
- - ngx_cycle->free_connection_n;
-
- c = ngx_get_connection(lc->fd, ev->log);
- if (c == NULL) {
- return;
- }
-
- c->shared = 1;
- c->type = SOCK_DGRAM;
- c->socklen = socklen;
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "quic recvmsg: fd:%d n:%z", lc->fd, n);
-#if (NGX_STAT_STUB)
- (void) ngx_atomic_fetch_add(ngx_stat_active, 1);
-#endif
-
- c->pool = ngx_create_pool(ls->pool_size, ev->log);
- if (c->pool == NULL) {
- ngx_quic_close_accepted_connection(c);
- return;
- }
-
- c->sockaddr = ngx_palloc(c->pool, NGX_SOCKADDRLEN);
- if (c->sockaddr == NULL) {
- ngx_quic_close_accepted_connection(c);
- return;
- }
-
- ngx_memcpy(c->sockaddr, sockaddr, socklen);
-
- log = ngx_palloc(c->pool, sizeof(ngx_log_t));
- if (log == NULL) {
- ngx_quic_close_accepted_connection(c);
- return;
- }
-
- *log = ls->log;
-
- c->log = log;
- c->pool->log = log;
- c->listening = ls;
-
- if (local_sockaddr == &lsa.sockaddr) {
- local_sockaddr = ngx_palloc(c->pool, local_socklen);
- if (local_sockaddr == NULL) {
- ngx_quic_close_accepted_connection(c);
- return;
- }
-
- ngx_memcpy(local_sockaddr, &lsa, local_socklen);
- }
-
- c->local_sockaddr = local_sockaddr;
- c->local_socklen = local_socklen;
-
- c->buffer = ngx_create_temp_buf(c->pool, n);
- if (c->buffer == NULL) {
- ngx_quic_close_accepted_connection(c);
- return;
- }
-
- c->buffer->last = ngx_cpymem(c->buffer->last, buffer, n);
-
- rev = c->read;
- wev = c->write;
-
- rev->active = 1;
- wev->ready = 1;
-
- rev->log = log;
- wev->log = log;
-
- /*
- * TODO: MT: - ngx_atomic_fetch_add()
- * or protection by critical section or light mutex
- *
- * TODO: MP: - allocated in a shared memory
- * - ngx_atomic_fetch_add()
- * or protection by critical section or light mutex
- */
-
- c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
-
- c->start_time = ngx_current_msec;
-
-#if (NGX_STAT_STUB)
- (void) ngx_atomic_fetch_add(ngx_stat_handled, 1);
-#endif
-
- if (ls->addr_ntop) {
- c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
- if (c->addr_text.data == NULL) {
- ngx_quic_close_accepted_connection(c);
- return;
- }
-
- c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
- c->addr_text.data,
- ls->addr_text_max_len, 0);
- if (c->addr_text.len == 0) {
- ngx_quic_close_accepted_connection(c);
- return;
- }
- }
-
-#if (NGX_DEBUG)
- {
- ngx_str_t addr;
- u_char text[NGX_SOCKADDR_STRLEN];
-
- ngx_debug_accepted_connection(ecf, c);
-
- if (log->log_level & NGX_LOG_DEBUG_EVENT) {
- addr.data = text;
- addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
- NGX_SOCKADDR_STRLEN, 1);
-
- ngx_log_debug4(NGX_LOG_DEBUG_EVENT, log, 0,
- "*%uA quic recvmsg: %V fd:%d n:%z",
- c->number, &addr, c->fd, n);
- }
-
- }
-#endif
-
- log->data = NULL;
- log->handler = NULL;
-
- ls->handler(c);
-
- next:
+ ngx_quic_handle_msg(ev, &msg, n);
if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
ev->available -= n;
@@ -348,6 +113,386 @@ ngx_quic_recvmsg(ngx_event_t *ev)
}
+#if (NGX_HAVE_RECVMMSG)
+
+#define NGX_QUIC_RECVMMSG_NUM 64
+
+void
+ngx_quic_recvmmsg(ngx_event_t *ev)
+{
+ int nmsg;
+ size_t n;
+ ngx_err_t err;
+ ngx_int_t i;
+ struct iovec iov[NGX_QUIC_RECVMMSG_NUM];
+ struct msghdr *msg;
+ ngx_sockaddr_t sa[NGX_QUIC_RECVMMSG_NUM];
+ ngx_listening_t *ls;
+ ngx_event_conf_t *ecf;
+ ngx_connection_t *lc;
+ static u_char buffer[NGX_QUIC_RECVMMSG_NUM]
+ [NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
+ struct mmsghdr msgvec[NGX_QUIC_RECVMMSG_NUM];
+
+#if (NGX_HAVE_ADDRINFO_CMSG)
+ u_char msg_control[NGX_QUIC_RECVMMSG_NUM]
+ [CMSG_SPACE(sizeof(ngx_addrinfo_t))];
+#endif
+
+ if (ev->timedout) {
+ if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
+ return;
+ }
+
+ ev->timedout = 0;
+ }
+
+ ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
+
+ if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
+ ev->available = ecf->multi_accept;
+ }
+
+ lc = ev->data;
+ ls = lc->listening;
+ ev->ready = 0;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "quic recvmmsg on %V, ready: %d",
+ &ls->addr_text, ev->available);
+
+ do {
+
+ ngx_memzero(msgvec, sizeof(msgvec));
+
+#if (NGX_HAVE_ADDRINFO_CMSG)
+ if (ls->wildcard) {
+ ngx_memzero(msg_control, sizeof(msg_control));
+ }
+#endif
+
+ for (i = 0; i < NGX_QUIC_RECVMMSG_NUM; i++) {
+ msg = &msgvec[i].msg_hdr;
+
+ iov[i].iov_base = (void *) buffer[i];
+ iov[i].iov_len = NGX_QUIC_MAX_UDP_PAYLOAD_SIZE;
+
+ msg->msg_name = &sa[i];
+ msg->msg_namelen = sizeof(ngx_sockaddr_t);
+ msg->msg_iov = &iov[i];
+ msg->msg_iovlen = 1;
+
+#if (NGX_HAVE_ADDRINFO_CMSG)
+ if (ls->wildcard) {
+ msg->msg_control = &msg_control[i];
+ msg->msg_controllen = sizeof(msg_control);
+ }
+#endif
+ }
+
+ nmsg = recvmmsg(lc->fd, msgvec, NGX_QUIC_RECVMMSG_NUM, 0, NULL);
+
+ if (nmsg == -1) {
+ err = ngx_socket_errno;
+
+ if (err == NGX_EAGAIN) {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err,
+ "quic recvmmsg() not ready");
+ return;
+ }
+
+ ngx_log_error(NGX_LOG_ALERT, ev->log, err,
+ "quic recvmmsg() failed");
+
+ return;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "quic recvmmsg: fd:%d n:%i", lc->fd, nmsg);
+
+ for (i = 0; i < nmsg; i++) {
+ msg = &msgvec[i].msg_hdr;
+ n = msgvec[i].msg_len;
+
+#if (NGX_HAVE_ADDRINFO_CMSG)
+ if (msg->msg_flags & (MSG_TRUNC|MSG_CTRUNC)) {
+ ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
+ "quic recvmmsg() truncated data");
+ continue;
+ }
+#endif
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "quic recvmmsg: n:%z", n);
+
+ ngx_quic_handle_msg(ev, msg, n);
+
+ if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
+ ev->available -= n;
+ }
+ }
+
+ } while (ev->available);
+}
+
+#endif
+
+
+static void
+ngx_quic_handle_msg(ngx_event_t *ev, struct msghdr *msg, size_t n)
+{
+ u_char *buffer;
+ ngx_str_t key;
+ ngx_buf_t buf;
+ ngx_log_t *log;
+ socklen_t socklen, local_socklen;
+ ngx_event_t *rev, *wev;
+ ngx_sockaddr_t lsa;
+ struct sockaddr *sockaddr, *local_sockaddr;
+ ngx_listening_t *ls;
+ ngx_connection_t *c, *lc;
+ ngx_quic_socket_t *qsock;
+
+ lc = ev->data;
+ ls = lc->listening;
+
+ buffer = msg->msg_iov[0].iov_base;
+
+ sockaddr = msg->msg_name;
+ socklen = msg->msg_namelen;
+
+ if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) {
+ socklen = sizeof(ngx_sockaddr_t);
+ }
+
+#if (NGX_HAVE_UNIX_DOMAIN)
+
+ if (sockaddr->sa_family == AF_UNIX) {
+ struct sockaddr_un *saun = (struct sockaddr_un *) sockaddr;
+
+ if (socklen <= (socklen_t) offsetof(struct sockaddr_un, sun_path)
+ || saun->sun_path[0] == '\0')
+ {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0,
+ "unbound unix socket");
+ return;
+ }
+ }
+
+#endif
+
+ local_sockaddr = ls->sockaddr;
+ local_socklen = ls->socklen;
+
+#if (NGX_HAVE_ADDRINFO_CMSG)
+
+ if (ls->wildcard) {
+ struct cmsghdr *cmsg;
+
+ ngx_memcpy(&lsa, local_sockaddr, local_socklen);
+ local_sockaddr = &lsa.sockaddr;
+
+ for (cmsg = CMSG_FIRSTHDR(msg);
+ cmsg != NULL;
+ cmsg = CMSG_NXTHDR(msg, cmsg))
+ {
+ if (ngx_get_srcaddr_cmsg(cmsg, local_sockaddr) == NGX_OK) {
+ break;
+ }
+ }
+ }
+
+#endif
+
+ if (ngx_quic_get_packet_dcid(ev->log, buffer, n, &key) != NGX_OK) {
+ return;
+ }
+
+ c = ngx_quic_lookup_connection(ls, &key, local_sockaddr, local_socklen);
+
+ if (c) {
+
+#if (NGX_DEBUG)
+ if (c->log->log_level & NGX_LOG_DEBUG_EVENT) {
+ ngx_log_handler_pt handler;
+
+ handler = c->log->handler;
+ c->log->handler = NULL;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic msg: n:%z", n);
+
+ c->log->handler = handler;
+ }
+#endif
+
+ ngx_memzero(&buf, sizeof(ngx_buf_t));
+
+ buf.pos = buffer;
+ buf.last = buffer + n;
+ buf.start = buf.pos;
+ buf.end = buffer + sizeof(buffer);
+
+ qsock = ngx_quic_get_socket(c);
+
+ ngx_memcpy(&qsock->sockaddr, sockaddr, socklen);
+ qsock->socklen = socklen;
+
+ c->udp->buffer = &buf;
+
+ rev = c->read;
+ rev->ready = 1;
+ rev->active = 0;
+
+ rev->handler(rev);
+
+ if (c->udp) {
+ c->udp->buffer = NULL;
+ }
+
+ rev->ready = 0;
+ rev->active = 1;
+
+ return;
+ }
+
+#if (NGX_STAT_STUB)
+ (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1);
+#endif
+
+ ngx_accept_disabled = ngx_cycle->connection_n / 8
+ - ngx_cycle->free_connection_n;
+
+ c = ngx_get_connection(lc->fd, ev->log);
+ if (c == NULL) {
+ return;
+ }
+
+ c->shared = 1;
+ c->type = SOCK_DGRAM;
+ c->socklen = socklen;
+
+#if (NGX_STAT_STUB)
+ (void) ngx_atomic_fetch_add(ngx_stat_active, 1);
+#endif
+
+ c->pool = ngx_create_pool(ls->pool_size, ev->log);
+ if (c->pool == NULL) {
+ ngx_quic_close_accepted_connection(c);
+ return;
+ }
+
+ c->sockaddr = ngx_palloc(c->pool, NGX_SOCKADDRLEN);
+ if (c->sockaddr == NULL) {
+ ngx_quic_close_accepted_connection(c);
+ return;
+ }
+
+ ngx_memcpy(c->sockaddr, sockaddr, socklen);
+
+ log = ngx_palloc(c->pool, sizeof(ngx_log_t));
+ if (log == NULL) {
+ ngx_quic_close_accepted_connection(c);
+ return;
+ }
+
+ *log = ls->log;
+
+ c->log = log;
+ c->pool->log = log;
+ c->listening = ls;
+
+ if (local_sockaddr == &lsa.sockaddr) {
+ local_sockaddr = ngx_palloc(c->pool, local_socklen);
+ if (local_sockaddr == NULL) {
+ ngx_quic_close_accepted_connection(c);
+ return;
+ }
+
+ ngx_memcpy(local_sockaddr, &lsa, local_socklen);
+ }
+
+ c->local_sockaddr = local_sockaddr;
+ c->local_socklen = local_socklen;
+
+ c->buffer = ngx_create_temp_buf(c->pool, n);
+ if (c->buffer == NULL) {
+ ngx_quic_close_accepted_connection(c);
+ return;
+ }
+
+ c->buffer->last = ngx_cpymem(c->buffer->last, buffer, n);
+
+ rev = c->read;
+ wev = c->write;
+
+ rev->active = 1;
+ wev->ready = 1;
+
+ rev->log = log;
+ wev->log = log;
+
+ /*
+ * TODO: MT: - ngx_atomic_fetch_add()
+ * or protection by critical section or light mutex
+ *
+ * TODO: MP: - allocated in a shared memory
+ * - ngx_atomic_fetch_add()
+ * or protection by critical section or light mutex
+ */
+
+ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
+
+ c->start_time = ngx_current_msec;
+
+#if (NGX_STAT_STUB)
+ (void) ngx_atomic_fetch_add(ngx_stat_handled, 1);
+#endif
+
+ if (ls->addr_ntop) {
+ c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
+ if (c->addr_text.data == NULL) {
+ ngx_quic_close_accepted_connection(c);
+ return;
+ }
+
+ c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
+ c->addr_text.data,
+ ls->addr_text_max_len, 0);
+ if (c->addr_text.len == 0) {
+ ngx_quic_close_accepted_connection(c);
+ return;
+ }
+ }
+
+#if (NGX_DEBUG)
+ {
+ ngx_str_t addr;
+ ngx_event_conf_t *ecf;
+ u_char text[NGX_SOCKADDR_STRLEN];
+
+ ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
+
+ ngx_debug_accepted_connection(ecf, c);
+
+ if (log->log_level & NGX_LOG_DEBUG_EVENT) {
+ addr.data = text;
+ addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
+ NGX_SOCKADDR_STRLEN, 1);
+
+ ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0,
+ "*%uA quic msg: %V n:%z", c->number, &addr, n);
+ }
+
+ }
+#endif
+
+ log->data = NULL;
+ log->handler = NULL;
+
+ ls->handler(c);
+}
+
+
static void
ngx_quic_close_accepted_connection(ngx_connection_t *c)
{
More information about the nginx-devel
mailing list