[PATCH 1 of 2] QUIC: output packet batching with sendmmsg()
Roman Arutyunyan
arut at nginx.com
Thu Mar 7 17:54:06 UTC 2024
# HG changeset patch
# User Roman Arutyunyan <arut at nginx.com>
# Date 1709833123 -28800
# Fri Mar 08 01:38:43 2024 +0800
# Node ID 5d28510b62bffba3187d7fe69baccd2d2da41a12
# Parent 2ed3f57dca0a664340bca2236c7d614902db4180
QUIC: output packet batching with sendmmsg().
diff --git a/auto/os/linux b/auto/os/linux
--- a/auto/os/linux
+++ b/auto/os/linux
@@ -291,4 +291,16 @@ ngx_feature_test="socklen_t optlen = siz
. auto/feature
+ngx_feature="sendmmsg()"
+ngx_feature_name="NGX_HAVE_SENDMMSG"
+ngx_feature_run=no
+ngx_feature_incs="#include <sys/socket.h>
+ #include <sys/uio.h>"
+ngx_feature_path=
+ngx_feature_libs=
+ngx_feature_test="struct mmsghdr msg[UIO_MAXIOV];
+ sendmmsg(0, msg, UIO_MAXIOV, 0);"
+. auto/feature
+
+
CC_AUX_FLAGS="$cc_aux_flags -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64"
diff --git a/auto/sources b/auto/sources
--- a/auto/sources
+++ b/auto/sources
@@ -171,6 +171,7 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \
src/os/unix/ngx_writev_chain.c \
src/os/unix/ngx_udp_send.c \
src/os/unix/ngx_udp_sendmsg_chain.c \
+ src/os/unix/ngx_udp_sendmmsg.c \
src/os/unix/ngx_channel.c \
src/os/unix/ngx_shmem.c \
src/os/unix/ngx_process.c \
diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c
--- a/src/event/ngx_event.c
+++ b/src/event/ngx_event.c
@@ -823,9 +823,11 @@ ngx_event_process_init(ngx_cycle_t *cycl
ls[i].connection = c;
rev = c->read;
+ wev = c->write;
rev->log = c->log;
rev->accept = 1;
+ wev->log = c->log;
#if (NGX_HAVE_DEFERRED_ACCEPT)
rev->deferred_accept = ls[i].deferred_accept;
@@ -897,6 +899,14 @@ ngx_event_process_init(ngx_cycle_t *cycl
#if (NGX_QUIC)
} else if (ls[i].quic) {
rev->handler = ngx_quic_recvmsg;
+#if (NGX_HAVE_SENDMMSG)
+ wev->handler = ngx_event_sendmmsg;
+
+ c->data = ngx_pcalloc(cycle->pool, sizeof(ngx_sendmmsg_batch_t));
+ if (c->data == NULL) {
+ return NGX_ERROR;
+ }
+#endif
#endif
} else {
rev->handler = ngx_event_recvmsg;
diff --git a/src/event/ngx_event_udp.h b/src/event/ngx_event_udp.h
--- a/src/event/ngx_event_udp.h
+++ b/src/event/ngx_event_udp.h
@@ -22,6 +22,8 @@
#endif
+#define NGX_SENDMMSG_BUFFER 4194304 /* 4M */
+
struct ngx_udp_connection_s {
ngx_rbtree_node_t node;
@@ -47,6 +49,23 @@ typedef union {
#endif
} ngx_addrinfo_t;
+
+#if (NGX_HAVE_SENDMMSG)
+
+typedef struct {
+ u_char buffer[NGX_SENDMMSG_BUFFER];
+ struct mmsghdr msgvec[UIO_MAXIOV];
+ size_t size;
+ ngx_uint_t vlen;
+} ngx_sendmmsg_batch_t;
+
+
+ssize_t ngx_sendmmsg(ngx_connection_t *c, struct msghdr *msg, int flags);
+u_char *ngx_sendmmsg_buffer(ngx_connection_t *c, size_t size);
+void ngx_event_sendmmsg(ngx_event_t *ev);
+
+#endif
+
size_t ngx_set_srcaddr_cmsg(struct cmsghdr *cmsg,
struct sockaddr *local_sockaddr);
ngx_int_t ngx_get_srcaddr_cmsg(struct cmsghdr *cmsg,
diff --git a/src/event/quic/ngx_event_quic_migration.c b/src/event/quic/ngx_event_quic_migration.c
--- a/src/event/quic/ngx_event_quic_migration.c
+++ b/src/event/quic/ngx_event_quic_migration.c
@@ -908,10 +908,12 @@ ngx_quic_expire_path_mtu_discovery(ngx_c
static ngx_int_t
ngx_quic_send_path_mtu_probe(ngx_connection_t *c, ngx_quic_path_t *path)
{
+ void *bt;
size_t mtu;
uint64_t pnum;
ngx_int_t rc;
ngx_uint_t log_error;
+ ngx_connection_t *lc;
ngx_quic_frame_t *frame;
ngx_quic_send_ctx_t *ctx;
ngx_quic_connection_t *qc;
@@ -933,6 +935,10 @@ ngx_quic_send_path_mtu_probe(ngx_connect
"mtu:%uz pnum:%uL tries:%ui",
path->seqnum, path->mtud, ctx->pnum, path->tries);
+ lc = c->listening->connection;
+ bt = lc->data;
+ lc->data = NULL;
+
log_error = c->log_error;
c->log_error = NGX_ERROR_IGNORE_EMSGSIZE;
@@ -943,6 +949,7 @@ ngx_quic_send_path_mtu_probe(ngx_connect
path->mtu = mtu;
c->log_error = log_error;
+ lc->data = bt;
if (rc == NGX_OK) {
path->mtu_pnum[path->tries] = pnum;
diff --git a/src/event/quic/ngx_event_quic_output.c b/src/event/quic/ngx_event_quic_output.c
--- a/src/event/quic/ngx_event_quic_output.c
+++ b/src/event/quic/ngx_event_quic_output.c
@@ -61,6 +61,7 @@ static void ngx_quic_init_packet(ngx_con
static ngx_uint_t ngx_quic_get_padding_level(ngx_connection_t *c);
static ssize_t ngx_quic_send(ngx_connection_t *c, u_char *buf, size_t len,
struct sockaddr *sockaddr, socklen_t socklen);
+static u_char *ngx_quic_send_buffer(ngx_connection_t *c);
static void ngx_quic_set_packet_number(ngx_quic_header_t *pkt,
ngx_quic_send_ctx_t *ctx);
@@ -114,14 +115,13 @@ ngx_quic_create_datagrams(ngx_connection
{
size_t len, min;
ssize_t n;
- u_char *p;
+ u_char *dst, *p;
uint64_t preserved_pnum[NGX_QUIC_SEND_CTX_LAST];
ngx_uint_t i, pad;
ngx_quic_path_t *path;
ngx_quic_send_ctx_t *ctx;
ngx_quic_congestion_t *cg;
ngx_quic_connection_t *qc;
- static u_char dst[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
qc = ngx_quic_get_connection(c);
cg = &qc->congestion;
@@ -129,6 +129,7 @@ ngx_quic_create_datagrams(ngx_connection
while (cg->in_flight < cg->window) {
+ dst = ngx_quic_send_buffer(c);
p = dst;
len = ngx_quic_path_limit(c, path, path->mtu);
@@ -688,12 +689,12 @@ static ssize_t
ngx_quic_send(ngx_connection_t *c, u_char *buf, size_t len,
struct sockaddr *sockaddr, socklen_t socklen)
{
- ssize_t n;
- struct iovec iov;
- struct msghdr msg;
+ ssize_t n;
+ struct iovec iov;
+ struct msghdr msg;
#if (NGX_HAVE_ADDRINFO_CMSG)
- struct cmsghdr *cmsg;
- char msg_control[CMSG_SPACE(sizeof(ngx_addrinfo_t))];
+ struct cmsghdr *cmsg;
+ char msg_control[CMSG_SPACE(sizeof(ngx_addrinfo_t))];
#endif
ngx_memzero(&msg, sizeof(struct msghdr));
@@ -720,7 +721,11 @@ ngx_quic_send(ngx_connection_t *c, u_cha
}
#endif
+#if (NGX_HAVE_SENDMMSG)
+ n = ngx_sendmmsg(c, &msg, 0);
+#else
n = ngx_sendmsg(c, &msg, 0);
+#endif
if (n < 0) {
return n;
}
@@ -731,6 +736,26 @@ ngx_quic_send(ngx_connection_t *c, u_cha
}
+static u_char *
+ngx_quic_send_buffer(ngx_connection_t *c)
+{
+ static u_char buffer[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
+
+#if (NGX_HAVE_SENDMMSG)
+
+ u_char *p;
+
+ p = ngx_sendmmsg_buffer(c, NGX_QUIC_MAX_UDP_PAYLOAD_SIZE);
+ if (p) {
+ return p;
+ }
+
+#endif
+
+ return buffer;
+}
+
+
static void
ngx_quic_set_packet_number(ngx_quic_header_t *pkt, ngx_quic_send_ctx_t *ctx)
{
@@ -764,9 +789,9 @@ ngx_quic_set_packet_number(ngx_quic_head
ngx_int_t
ngx_quic_negotiate_version(ngx_connection_t *c, ngx_quic_header_t *inpkt)
{
- size_t len;
- ngx_quic_header_t pkt;
- static u_char buf[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
+ u_char *buf;
+ size_t len;
+ ngx_quic_header_t pkt;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"sending version negotiation packet");
@@ -776,6 +801,8 @@ ngx_quic_negotiate_version(ngx_connectio
pkt.dcid = inpkt->scid;
pkt.scid = inpkt->dcid;
+ buf = ngx_quic_send_buffer(c);
+
len = ngx_quic_create_version_negotiation(&pkt, buf);
#ifdef NGX_QUIC_DEBUG_PACKETS
@@ -793,10 +820,9 @@ ngx_int_t
ngx_quic_send_stateless_reset(ngx_connection_t *c, ngx_quic_conf_t *conf,
ngx_quic_header_t *pkt)
{
- u_char *token;
+ u_char *token, *buf;
size_t len, max;
uint16_t rndbytes;
- u_char buf[NGX_QUIC_MAX_SR_PACKET];
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic handle stateless reset output");
@@ -819,6 +845,8 @@ ngx_quic_send_stateless_reset(ngx_connec
+ NGX_QUIC_MIN_SR_PACKET;
}
+ buf = ngx_quic_send_buffer(c);
+
if (RAND_bytes(buf, len - NGX_QUIC_SR_TOKEN_LEN) != 1) {
return NGX_ERROR;
}
@@ -892,9 +920,7 @@ ngx_quic_send_early_cc(ngx_connection_t
ngx_quic_keys_t keys;
ngx_quic_frame_t frame;
ngx_quic_header_t pkt;
-
- static u_char src[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
- static u_char dst[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
+ static u_char src[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
ngx_memzero(&frame, sizeof(ngx_quic_frame_t));
ngx_memzero(&pkt, sizeof(ngx_quic_header_t));
@@ -945,7 +971,7 @@ ngx_quic_send_early_cc(ngx_connection_t
pkt.payload.data = src;
pkt.payload.len = len;
- res.data = dst;
+ res.data = ngx_quic_send_buffer(c);
ngx_quic_log_packet(c->log, &pkt);
@@ -974,7 +1000,6 @@ ngx_quic_send_retry(ngx_connection_t *c,
ngx_str_t res, token;
ngx_quic_header_t pkt;
- u_char buf[NGX_QUIC_RETRY_BUFFER_SIZE];
u_char dcid[NGX_QUIC_SERVER_CID_LEN];
u_char tbuf[NGX_QUIC_TOKEN_BUF_SIZE];
@@ -1008,7 +1033,7 @@ ngx_quic_send_retry(ngx_connection_t *c,
pkt.token = token;
- res.data = buf;
+ res.data = ngx_quic_send_buffer(c);
if (ngx_quic_encrypt(&pkt, &res) != NGX_OK) {
return NGX_ERROR;
@@ -1193,9 +1218,7 @@ ngx_quic_frame_sendto(ngx_connection_t *
ngx_quic_send_ctx_t *ctx;
ngx_quic_congestion_t *cg;
ngx_quic_connection_t *qc;
-
static u_char src[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
- static u_char dst[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
qc = ngx_quic_get_connection(c);
cg = &qc->congestion;
@@ -1254,7 +1277,7 @@ ngx_quic_frame_sendto(ngx_connection_t *
pkt.payload.data = src;
pkt.payload.len = len;
- res.data = dst;
+ res.data = ngx_quic_send_buffer(c);
ngx_quic_log_packet(c->log, &pkt);
diff --git a/src/os/unix/ngx_udp_sendmmsg.c b/src/os/unix/ngx_udp_sendmmsg.c
new file mode 100644
--- /dev/null
+++ b/src/os/unix/ngx_udp_sendmmsg.c
@@ -0,0 +1,185 @@
+
+/*
+ * Copyright (C) Roman Arutyunyan
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_event.h>
+
+
+#if (NGX_HAVE_SENDMMSG)
+
+static void ngx_sendmmsg_flush(ngx_connection_t *lc);
+
+
+ssize_t
+ngx_sendmmsg(ngx_connection_t *c, struct msghdr *msg, int flags)
+{
+ u_char *p;
+ size_t size, aux_size;
+ ngx_uint_t i;
+ struct iovec *iov;
+ struct msghdr *nmsg;
+ ngx_connection_t *lc;
+ ngx_sendmmsg_batch_t *sb;
+
+ if (c->listening == NULL || c->listening->connection->data == NULL) {
+ return ngx_sendmsg(c, msg, flags);
+ }
+
+ lc = c->listening->connection;
+ sb = lc->data;
+
+ for (i = 0, size = 0; i < msg->msg_iovlen; i++) {
+ size += msg->msg_iov[i].iov_len;
+ }
+
+ c->sent += size;
+
+ nmsg = &sb->msgvec[sb->vlen++].msg_hdr;
+
+ aux_size = NGX_ALIGNMENT + sizeof(struct iovec)
+ + NGX_ALIGNMENT + msg->msg_namelen
+ + NGX_ALIGNMENT + msg->msg_controllen;
+
+ if (sb->size + size + aux_size > NGX_SENDMMSG_BUFFER) {
+ *nmsg = *msg;
+ goto flush;
+ }
+
+ ngx_memzero(nmsg, sizeof(struct msghdr));
+
+ p = sb->buffer + sb->size;
+
+ for (i = 0; i < msg->msg_iovlen; i++) {
+ if (msg->msg_iov[i].iov_base != p) {
+ ngx_memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len);
+ }
+
+ p += msg->msg_iov[i].iov_len;
+ }
+
+ p = ngx_align_ptr(p, NGX_ALIGNMENT);
+ iov = (struct iovec *) p;
+ nmsg->msg_iov = iov;
+ nmsg->msg_iovlen = 1;
+ iov->iov_base = sb->buffer + sb->size;
+ iov->iov_len = size;
+ p += sizeof(struct iovec);
+
+ p = ngx_align_ptr(p, NGX_ALIGNMENT);
+ nmsg->msg_name = p;
+ nmsg->msg_namelen = msg->msg_namelen;
+ p = ngx_cpymem(p, msg->msg_name, msg->msg_namelen);
+
+ if (msg->msg_controllen) {
+ p = ngx_align_ptr(p, NGX_ALIGNMENT);
+ nmsg->msg_control = p;
+ nmsg->msg_controllen = msg->msg_controllen;
+ p = ngx_cpymem(p, msg->msg_control, msg->msg_controllen);
+ }
+
+ sb->size = p - sb->buffer;
+
+ ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "sendmmsg batch n:%ui s:%uz a:%uz t:%uz",
+ sb->vlen, size, aux_size, sb->size);
+
+ if (sb->vlen == UIO_MAXIOV) {
+ goto flush;
+ }
+
+ ngx_post_event(lc->write, &ngx_posted_events);
+
+ return size;
+
+flush:
+
+ ngx_sendmmsg_flush(lc);
+
+ return size;
+}
+
+
+static void
+ngx_sendmmsg_flush(ngx_connection_t *lc)
+{
+ int n;
+ ngx_err_t err;
+ ngx_sendmmsg_batch_t *sb;
+
+ sb = lc->data;
+
+ if (sb == NULL || sb->vlen == 0) {
+ return;
+ }
+
+ n = sendmmsg(lc->fd, sb->msgvec, sb->vlen, 0);
+
+ ngx_log_debug3(NGX_LOG_DEBUG_EVENT, lc->log, 0,
+ "sendmmsg: %d of %ui s:%uz", n, sb->vlen, sb->size);
+
+ if (n == -1) {
+ err = ngx_errno;
+
+ switch (err) {
+ case NGX_EAGAIN:
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, lc->log, err,
+ "sendmmsg() not ready");
+ break;
+
+ case NGX_EINTR:
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, lc->log, err,
+ "sendmmsg() was interrupted");
+ break;
+
+ default:
+ ngx_connection_error(lc, err, "sendmmsg() failed");
+ break;
+ }
+ }
+
+ sb->size = 0;
+ sb->vlen = 0;
+}
+
+
+void
+ngx_event_sendmmsg(ngx_event_t *ev)
+{
+ ngx_connection_t *lc;
+
+ lc = ev->data;
+
+ ngx_sendmmsg_flush(lc);
+}
+
+
+u_char *
+ngx_sendmmsg_buffer(ngx_connection_t *c, size_t size)
+{
+ ngx_connection_t *lc;
+ ngx_sendmmsg_batch_t *sb;
+
+ if (c->listening == NULL || c->listening->connection->data == NULL) {
+ return NULL;
+ }
+
+ if (size > NGX_SENDMMSG_BUFFER) {
+ return NULL;
+ }
+
+ lc = c->listening->connection;
+ sb = lc->data;
+
+ if (sb->size + size > NGX_SENDMMSG_BUFFER) {
+ ngx_sendmmsg_flush(lc);
+ }
+
+ return sb->buffer + sb->size;
+}
+
+#endif /* NGX_HAVE_SENDMMSG */
More information about the nginx-devel
mailing list