Here's the final set of patches towards the removal of sendpage. All the
drivers that use sendpage() get switched over to using sendmsg() with
MSG_SPLICE_PAGES.
skb_splice_from_iter() is given the facility to copy slab data() into
fragments - or to coalesce them (in future) with other unspliced buffers in
the target skbuff. This means that the caller can work the same way, no
matter whether MSG_SPLICE_PAGES is supplied or not and no matter if the
protocol just ignores it. If MSG_SPLICE_PAGES is not supplied or if it is
just ignored, the data will get copied as normal rather than being spliced.
For the moment, skb_splice_from_iter() is equipped with its own fragment
allocator - one that has percpu pages to allocate from to deal with
parallel callers but that can also drop the percpu lock around calling the
page allocator.
The following changes are made:
(1) Introduce an SMP-safe shared fragment allocator and make
skb_splice_from_iter() use it. The allocator is exported so that
ocfs2 can use it.
This now doesn't alter the existing page_frag_cache allocator.
(2) Expose information from the allocator in /proc/. This is useful for
debugging it, but could be dropped.
(3) Make the protocol drivers behave according to MSG_MORE, not
MSG_SENDPAGE_NOTLAST. The latter is restricted to turning on MSG_MORE
in the sendpage() wrappers.
(4) Make siw, ceph/rds, skb_send_sock, dlm, nvme, smc, ocfs2, drbd and
iscsi use sendmsg(), not sendpage and make them specify MSG_MORE
instead of MSG_SENDPAGE_NOTLAST.
ocfs2 now allocates fragments for a couple of cases where it would
otherwise pass in a pointer to shared data that doesn't seem to
sufficient locking.
(5) Make drbd coalesce its entire message into a single sendmsg().
(6) Kill off sendpage and clean up MSG_SENDPAGE_NOTLAST.
I've pushed the patches here also:
https://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs.git/log/?h=sendpage-3-frag
David
Link: https://git.kernel.org/pub/scm/linux/kernel/git/netdev/net-next.git/commit/?id=51c78a4d532efe9543a4df019ff405f05c6157f6 # part 1
David Howells (17):
net: Copy slab data for sendmsg(MSG_SPLICE_PAGES)
net: Display info about MSG_SPLICE_PAGES memory handling in proc
tcp_bpf, smc, tls, espintcp: Reduce MSG_SENDPAGE_NOTLAST usage
siw: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage to transmit
ceph: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage
net: Use sendmsg(MSG_SPLICE_PAGES) not sendpage in skb_send_sock()
ceph: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage()
rds: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage
dlm: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage
nvme: Use sendmsg(MSG_SPLICE_PAGES) rather then sendpage
smc: Drop smc_sendpage() in favour of smc_sendmsg() + MSG_SPLICE_PAGES
ocfs2: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage()
drbd: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage()
drdb: Send an entire bio in a single sendmsg
iscsi: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage
sock: Remove ->sendpage*() in favour of sendmsg(MSG_SPLICE_PAGES)
net: Kill MSG_SENDPAGE_NOTLAST
Documentation/bpf/map_sockmap.rst | 10 +-
Documentation/filesystems/locking.rst | 2 -
Documentation/filesystems/vfs.rst | 1 -
Documentation/networking/scaling.rst | 4 +-
crypto/af_alg.c | 28 --
crypto/algif_aead.c | 22 +-
crypto/algif_rng.c | 2 -
crypto/algif_skcipher.c | 14 -
drivers/block/drbd/drbd_main.c | 86 ++----
drivers/infiniband/sw/siw/siw_qp_tx.c | 230 +++-------------
.../chelsio/inline_crypto/chtls/chtls.h | 2 -
.../chelsio/inline_crypto/chtls/chtls_io.c | 14 -
.../chelsio/inline_crypto/chtls/chtls_main.c | 1 -
drivers/nvme/host/tcp.c | 44 +--
drivers/nvme/target/tcp.c | 46 ++--
drivers/scsi/iscsi_tcp.c | 26 +-
drivers/scsi/iscsi_tcp.h | 2 +-
drivers/target/iscsi/iscsi_target_util.c | 14 +-
fs/dlm/lowcomms.c | 10 +-
fs/nfsd/vfs.c | 2 +-
fs/ocfs2/cluster/tcp.c | 107 ++++----
include/crypto/if_alg.h | 2 -
include/linux/net.h | 8 -
include/linux/skbuff.h | 5 +
include/linux/socket.h | 4 +-
include/net/inet_common.h | 2 -
include/net/sock.h | 6 -
include/net/tcp.h | 4 -
net/appletalk/ddp.c | 1 -
net/atm/pvc.c | 1 -
net/atm/svc.c | 1 -
net/ax25/af_ax25.c | 1 -
net/caif/caif_socket.c | 2 -
net/can/bcm.c | 1 -
net/can/isotp.c | 1 -
net/can/j1939/socket.c | 1 -
net/can/raw.c | 1 -
net/ceph/messenger_v1.c | 58 ++--
net/ceph/messenger_v2.c | 91 ++-----
net/core/skbuff.c | 257 ++++++++++++++++--
net/core/sock.c | 35 +--
net/dccp/ipv4.c | 1 -
net/dccp/ipv6.c | 1 -
net/ieee802154/socket.c | 2 -
net/ipv4/af_inet.c | 21 --
net/ipv4/tcp.c | 43 +--
net/ipv4/tcp_bpf.c | 30 +-
net/ipv4/tcp_ipv4.c | 1 -
net/ipv4/udp.c | 15 -
net/ipv4/udp_impl.h | 2 -
net/ipv4/udplite.c | 1 -
net/ipv6/af_inet6.c | 3 -
net/ipv6/raw.c | 1 -
net/ipv6/tcp_ipv6.c | 1 -
net/kcm/kcmsock.c | 20 --
net/key/af_key.c | 1 -
net/l2tp/l2tp_ip.c | 1 -
net/l2tp/l2tp_ip6.c | 1 -
net/llc/af_llc.c | 1 -
net/mctp/af_mctp.c | 1 -
net/mptcp/protocol.c | 2 -
net/netlink/af_netlink.c | 1 -
net/netrom/af_netrom.c | 1 -
net/packet/af_packet.c | 2 -
net/phonet/socket.c | 2 -
net/qrtr/af_qrtr.c | 1 -
net/rds/af_rds.c | 1 -
net/rds/tcp_send.c | 74 ++---
net/rose/af_rose.c | 1 -
net/rxrpc/af_rxrpc.c | 1 -
net/sctp/protocol.c | 1 -
net/smc/af_smc.c | 29 --
net/smc/smc_stats.c | 2 +-
net/smc/smc_stats.h | 1 -
net/smc/smc_tx.c | 20 +-
net/smc/smc_tx.h | 2 -
net/socket.c | 48 ----
net/tipc/socket.c | 3 -
net/tls/tls.h | 6 -
net/tls/tls_device.c | 24 +-
net/tls/tls_main.c | 9 +-
net/tls/tls_sw.c | 37 +--
net/unix/af_unix.c | 19 --
net/vmw_vsock/af_vsock.c | 3 -
net/x25/af_x25.c | 1 -
net/xdp/xsk.c | 1 -
net/xfrm/espintcp.c | 10 +-
.../perf/trace/beauty/include/linux/socket.h | 1 -
tools/perf/trace/beauty/msg_flags.c | 3 -
89 files changed, 539 insertions(+), 1063 deletions(-)
If sendmsg() is passed MSG_SPLICE_PAGES and is given a buffer that contains
some data that's resident in the slab, copy it rather than returning EIO.
This can be made use of by a number of drivers in the kernel, including:
iwarp, ceph/rds, dlm, nvme, ocfs2, drdb. It could also be used by iscsi,
rxrpc, sunrpc, cifs and probably others.
skb_splice_from_iter() is given it's own fragment allocator as
page_frag_alloc_align() can't be used because it does no locking to prevent
parallel callers from racing. alloc_skb_frag() uses a separate folio for
each cpu and locks to the cpu whilst allocating, reenabling cpu migration
around folio allocation.
This could allocate a whole page instead for each fragment to be copied, as
alloc_skb_with_frags() would do instead, but that would waste a lot of
space (most of the fragments look like they're going to be small).
This allows an entire message that consists of, say, a protocol header or
two, a number of pages of data and a protocol footer to be sent using a
single call to sock_sendmsg().
The callers could be made to copy the data into fragments before calling
sendmsg(), but that then penalises them if MSG_SPLICE_PAGES gets ignored.
Signed-off-by: David Howells <[email protected]>
cc: Alexander Duyck <[email protected]>
cc: Eric Dumazet <[email protected]>
cc: "David S. Miller" <[email protected]>
cc: David Ahern <[email protected]>
cc: Jakub Kicinski <[email protected]>
cc: Paolo Abeni <[email protected]>
cc: Jens Axboe <[email protected]>
cc: Matthew Wilcox <[email protected]>
cc: Menglong Dong <[email protected]>
cc: [email protected]
---
include/linux/skbuff.h | 5 ++
net/core/skbuff.c | 172 ++++++++++++++++++++++++++++++++++++++++-
2 files changed, 174 insertions(+), 3 deletions(-)
diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
index 91ed66952580..0ba776cd9be8 100644
--- a/include/linux/skbuff.h
+++ b/include/linux/skbuff.h
@@ -5037,6 +5037,11 @@ static inline void skb_mark_for_recycle(struct sk_buff *skb)
#endif
}
+void *alloc_skb_frag(size_t fragsz, gfp_t gfp);
+void *copy_skb_frag(const void *s, size_t len, gfp_t gfp);
+ssize_t skb_splice_from_iter(struct sk_buff *skb, struct iov_iter *iter,
+ ssize_t maxsize, gfp_t gfp);
+
ssize_t skb_splice_from_iter(struct sk_buff *skb, struct iov_iter *iter,
ssize_t maxsize, gfp_t gfp);
diff --git a/net/core/skbuff.c b/net/core/skbuff.c
index fee2b1c105fe..9bd8d6bf6c21 100644
--- a/net/core/skbuff.c
+++ b/net/core/skbuff.c
@@ -6755,6 +6755,146 @@ nodefer: __kfree_skb(skb);
smp_call_function_single_async(cpu, &sd->defer_csd);
}
+struct skb_splice_frag_cache {
+ struct folio *folio;
+ void *virt;
+ unsigned int offset;
+ /* we maintain a pagecount bias, so that we dont dirty cache line
+ * containing page->_refcount every time we allocate a fragment.
+ */
+ unsigned int pagecnt_bias;
+ bool pfmemalloc;
+};
+
+static DEFINE_PER_CPU(struct skb_splice_frag_cache, skb_splice_frag_cache);
+
+/**
+ * alloc_skb_frag - Allocate a page fragment for using in a socket
+ * @fragsz: The size of fragment required
+ * @gfp: Allocation flags
+ */
+void *alloc_skb_frag(size_t fragsz, gfp_t gfp)
+{
+ struct skb_splice_frag_cache *cache;
+ struct folio *folio, *spare = NULL;
+ size_t offset, fsize;
+ void *p;
+
+ if (WARN_ON_ONCE(fragsz == 0))
+ fragsz = 1;
+
+ cache = get_cpu_ptr(&skb_splice_frag_cache);
+reload:
+ folio = cache->folio;
+ offset = cache->offset;
+try_again:
+ if (fragsz > offset)
+ goto insufficient_space;
+
+ /* Make the allocation. */
+ cache->pagecnt_bias--;
+ offset = ALIGN_DOWN(offset - fragsz, SMP_CACHE_BYTES);
+ cache->offset = offset;
+ p = cache->virt + offset;
+ put_cpu_ptr(skb_splice_frag_cache);
+ if (spare)
+ folio_put(spare);
+ return p;
+
+insufficient_space:
+ /* See if we can refurbish the current folio. */
+ if (!folio || !folio_ref_sub_and_test(folio, cache->pagecnt_bias))
+ goto get_new_folio;
+ if (unlikely(cache->pfmemalloc)) {
+ __folio_put(folio);
+ goto get_new_folio;
+ }
+
+ fsize = folio_size(folio);
+ if (unlikely(fragsz > fsize))
+ goto frag_too_big;
+
+ /* OK, page count is 0, we can safely set it */
+ folio_set_count(folio, PAGE_FRAG_CACHE_MAX_SIZE + 1);
+
+ /* Reset page count bias and offset to start of new frag */
+ cache->pagecnt_bias = PAGE_FRAG_CACHE_MAX_SIZE + 1;
+ offset = fsize;
+ goto try_again;
+
+get_new_folio:
+ if (!spare) {
+ cache->folio = NULL;
+ put_cpu_ptr(&skb_splice_frag_cache);
+
+#if PAGE_SIZE < PAGE_FRAG_CACHE_MAX_SIZE
+ spare = folio_alloc(gfp | __GFP_NOWARN | __GFP_NORETRY |
+ __GFP_NOMEMALLOC,
+ PAGE_FRAG_CACHE_MAX_ORDER);
+ if (!spare)
+#endif
+ spare = folio_alloc(gfp, 0);
+ if (!spare)
+ return NULL;
+
+ cache = get_cpu_ptr(&skb_splice_frag_cache);
+ /* We may now be on a different cpu and/or someone else may
+ * have refilled it
+ */
+ cache->pfmemalloc = folio_is_pfmemalloc(spare);
+ if (cache->folio)
+ goto reload;
+ }
+
+ cache->folio = spare;
+ cache->virt = folio_address(spare);
+ folio = spare;
+ spare = NULL;
+
+ /* Even if we own the page, we do not use atomic_set(). This would
+ * break get_page_unless_zero() users.
+ */
+ folio_ref_add(folio, PAGE_FRAG_CACHE_MAX_SIZE);
+
+ /* Reset page count bias and offset to start of new frag */
+ cache->pagecnt_bias = PAGE_FRAG_CACHE_MAX_SIZE + 1;
+ offset = folio_size(folio);
+ goto try_again;
+
+frag_too_big:
+ /*
+ * The caller is trying to allocate a fragment with fragsz > PAGE_SIZE
+ * but the cache isn't big enough to satisfy the request, this may
+ * happen in low memory conditions. We don't release the cache page
+ * because it could make memory pressure worse so we simply return NULL
+ * here.
+ */
+ cache->offset = offset;
+ put_cpu_ptr(&skb_splice_frag_cache);
+ if (spare)
+ folio_put(spare);
+ return NULL;
+}
+EXPORT_SYMBOL(alloc_skb_frag);
+
+/**
+ * copy_skb_frag - Copy data into a page fragment.
+ * @s: The data to copy
+ * @len: The size of the data
+ * @gfp: Allocation flags
+ */
+void *copy_skb_frag(const void *s, size_t len, gfp_t gfp)
+{
+ void *p;
+
+ p = alloc_skb_frag(len, gfp);
+ if (!p)
+ return NULL;
+
+ return memcpy(p, s, len);
+}
+EXPORT_SYMBOL(copy_skb_frag);
+
static void skb_splice_csum_page(struct sk_buff *skb, struct page *page,
size_t offset, size_t len)
{
@@ -6808,17 +6948,43 @@ ssize_t skb_splice_from_iter(struct sk_buff *skb, struct iov_iter *iter,
break;
}
+ if (space == 0 &&
+ !skb_can_coalesce(skb, skb_shinfo(skb)->nr_frags,
+ pages[0], off)) {
+ iov_iter_revert(iter, len);
+ break;
+ }
+
i = 0;
do {
struct page *page = pages[i++];
size_t part = min_t(size_t, PAGE_SIZE - off, len);
-
- ret = -EIO;
- if (WARN_ON_ONCE(!sendpage_ok(page)))
+ bool put = false;
+
+ if (PageSlab(page)) {
+ const void *p;
+ void *q;
+
+ p = kmap_local_page(page);
+ q = copy_skb_frag(p + off, part, gfp);
+ kunmap_local(p);
+ if (!q) {
+ iov_iter_revert(iter, len);
+ ret = -ENOMEM;
+ goto out;
+ }
+ page = virt_to_page(q);
+ off = offset_in_page(q);
+ put = true;
+ } else if (WARN_ON_ONCE(!sendpage_ok(page))) {
+ ret = -EIO;
goto out;
+ }
ret = skb_append_pagefrags(skb, page, off, part,
frag_limit);
+ if (put)
+ put_page(page);
if (ret < 0) {
iov_iter_revert(iter, len);
goto out;
When transmitting data, call down into TCP using a single sendmsg with
MSG_SPLICE_PAGES to indicate that content should be spliced rather than
performing several sendmsg and sendpage calls to transmit header and data
pages.
To make this work, the data is assembled in a bio_vec array and attached to
a BVEC-type iterator. The header are copied into memory acquired from
zcopy_alloc() which just breaks a page up into small pieces that can be
freed with put_page().
Signed-off-by: David Howells <[email protected]>
cc: Santosh Shilimkar <[email protected]>
cc: "David S. Miller" <[email protected]>
cc: Eric Dumazet <[email protected]>
cc: Jakub Kicinski <[email protected]>
cc: Paolo Abeni <[email protected]>
cc: Jens Axboe <[email protected]>
cc: Matthew Wilcox <[email protected]>
cc: [email protected]
cc: [email protected]
cc: [email protected]
---
net/rds/tcp_send.c | 74 +++++++++++++++++-----------------------------
1 file changed, 27 insertions(+), 47 deletions(-)
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 8c4d1d6e9249..550390d5ff2b 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -52,29 +52,23 @@ void rds_tcp_xmit_path_complete(struct rds_conn_path *cp)
tcp_sock_set_cork(tc->t_sock->sk, false);
}
-/* the core send_sem serializes this with other xmit and shutdown */
-static int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len)
-{
- struct kvec vec = {
- .iov_base = data,
- .iov_len = len,
- };
- struct msghdr msg = {
- .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
- };
-
- return kernel_sendmsg(sock, &msg, &vec, 1, vec.iov_len);
-}
-
/* the core send_sem serializes this with other xmit and shutdown */
int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off)
{
struct rds_conn_path *cp = rm->m_inc.i_conn_path;
struct rds_tcp_connection *tc = cp->cp_transport_data;
+ struct msghdr msg = {
+ .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
+ };
+ struct bio_vec *bvec;
+ unsigned int i, size = 0, ix = 0;
int done = 0;
- int ret = 0;
- int more;
+ int ret = -ENOMEM;
+
+ bvec = kmalloc_array(1 + sg, sizeof(struct bio_vec), GFP_KERNEL);
+ if (!bvec)
+ goto out;
if (hdr_off == 0) {
/*
@@ -101,41 +95,26 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
/* see rds_tcp_write_space() */
set_bit(SOCK_NOSPACE, &tc->t_sock->sk->sk_socket->flags);
- ret = rds_tcp_sendmsg(tc->t_sock,
- (void *)&rm->m_inc.i_hdr + hdr_off,
- sizeof(rm->m_inc.i_hdr) - hdr_off);
- if (ret < 0)
- goto out;
- done += ret;
- if (hdr_off + done != sizeof(struct rds_header))
- goto out;
+ bvec_set_virt(&bvec[ix], (void *)&rm->m_inc.i_hdr + hdr_off,
+ sizeof(rm->m_inc.i_hdr) - hdr_off);
+ size += bvec[ix].bv_len;
+ ix++;
}
- more = rm->data.op_nents > 1 ? (MSG_MORE | MSG_SENDPAGE_NOTLAST) : 0;
- while (sg < rm->data.op_nents) {
- int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
-
- ret = tc->t_sock->ops->sendpage(tc->t_sock,
- sg_page(&rm->data.op_sg[sg]),
- rm->data.op_sg[sg].offset + off,
- rm->data.op_sg[sg].length - off,
- flags);
- rdsdebug("tcp sendpage %p:%u:%u ret %d\n", (void *)sg_page(&rm->data.op_sg[sg]),
- rm->data.op_sg[sg].offset + off, rm->data.op_sg[sg].length - off,
- ret);
- if (ret <= 0)
- break;
-
- off += ret;
- done += ret;
- if (off == rm->data.op_sg[sg].length) {
- off = 0;
- sg++;
- }
- if (sg == rm->data.op_nents - 1)
- more = 0;
+ for (i = sg; i < rm->data.op_nents; i++) {
+ bvec_set_page(&bvec[ix],
+ sg_page(&rm->data.op_sg[i]),
+ rm->data.op_sg[i].length - off,
+ rm->data.op_sg[i].offset + off);
+ off = 0;
+ size += bvec[ix].bv_len;
+ ix++;
}
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, bvec, ix, size);
+ ret = sock_sendmsg(tc->t_sock, &msg);
+ rdsdebug("tcp sendmsg-splice %u,%u ret %d\n", ix, size, ret);
+
out:
if (ret <= 0) {
/* write_space will hit after EAGAIN, all else fatal */
@@ -158,6 +137,7 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
}
if (done == 0)
done = ret;
+ kfree(bvec);
return done;
}
When transmitting data, call down into TCP using a single sendmsg with
MSG_SPLICE_PAGES to indicate that content should be spliced rather than
performing several sendmsg and sendpage calls to transmit header, data
pages and trailer.
To make this work, the data is assembled in a bio_vec array and attached to
a BVEC-type iterator. The header and trailer (if present) are copied into
page fragments that can be freed with put_page().
Signed-off-by: David Howells <[email protected]>
cc: Bernard Metzler <[email protected]>
cc: Tom Talpey <[email protected]>
cc: "David S. Miller" <[email protected]>
cc: Eric Dumazet <[email protected]>
cc: Jakub Kicinski <[email protected]>
cc: Paolo Abeni <[email protected]>
cc: Jens Axboe <[email protected]>
cc: Matthew Wilcox <[email protected]>
cc: [email protected]
cc: [email protected]
---
drivers/infiniband/sw/siw/siw_qp_tx.c | 230 ++++----------------------
1 file changed, 35 insertions(+), 195 deletions(-)
diff --git a/drivers/infiniband/sw/siw/siw_qp_tx.c b/drivers/infiniband/sw/siw/siw_qp_tx.c
index ffb16beb6c30..a15b3fedf484 100644
--- a/drivers/infiniband/sw/siw/siw_qp_tx.c
+++ b/drivers/infiniband/sw/siw/siw_qp_tx.c
@@ -311,114 +311,8 @@ static int siw_tx_ctrl(struct siw_iwarp_tx *c_tx, struct socket *s,
return rv;
}
-/*
- * 0copy TCP transmit interface: Use MSG_SPLICE_PAGES.
- *
- * Using sendpage to push page by page appears to be less efficient
- * than using sendmsg, even if data are copied.
- *
- * A general performance limitation might be the extra four bytes
- * trailer checksum segment to be pushed after user data.
- */
-static int siw_tcp_sendpages(struct socket *s, struct page **page, int offset,
- size_t size)
-{
- struct bio_vec bvec;
- struct msghdr msg = {
- .msg_flags = (MSG_MORE | MSG_DONTWAIT | MSG_SENDPAGE_NOTLAST |
- MSG_SPLICE_PAGES),
- };
- struct sock *sk = s->sk;
- int i = 0, rv = 0, sent = 0;
-
- while (size) {
- size_t bytes = min_t(size_t, PAGE_SIZE - offset, size);
-
- if (size + offset <= PAGE_SIZE)
- msg.msg_flags &= ~MSG_SENDPAGE_NOTLAST;
-
- tcp_rate_check_app_limited(sk);
- bvec_set_page(&bvec, page[i], bytes, offset);
- iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
-
-try_page_again:
- lock_sock(sk);
- rv = tcp_sendmsg_locked(sk, &msg, size);
- release_sock(sk);
-
- if (rv > 0) {
- size -= rv;
- sent += rv;
- if (rv != bytes) {
- offset += rv;
- bytes -= rv;
- goto try_page_again;
- }
- offset = 0;
- } else {
- if (rv == -EAGAIN || rv == 0)
- break;
- return rv;
- }
- i++;
- }
- return sent;
-}
-
-/*
- * siw_0copy_tx()
- *
- * Pushes list of pages to TCP socket. If pages from multiple
- * SGE's, all referenced pages of each SGE are pushed in one
- * shot.
- */
-static int siw_0copy_tx(struct socket *s, struct page **page,
- struct siw_sge *sge, unsigned int offset,
- unsigned int size)
-{
- int i = 0, sent = 0, rv;
- int sge_bytes = min(sge->length - offset, size);
-
- offset = (sge->laddr + offset) & ~PAGE_MASK;
-
- while (sent != size) {
- rv = siw_tcp_sendpages(s, &page[i], offset, sge_bytes);
- if (rv >= 0) {
- sent += rv;
- if (size == sent || sge_bytes > rv)
- break;
-
- i += PAGE_ALIGN(sge_bytes + offset) >> PAGE_SHIFT;
- sge++;
- sge_bytes = min(sge->length, size - sent);
- offset = sge->laddr & ~PAGE_MASK;
- } else {
- sent = rv;
- break;
- }
- }
- return sent;
-}
-
#define MAX_TRAILER (MPA_CRC_SIZE + 4)
-static void siw_unmap_pages(struct kvec *iov, unsigned long kmap_mask, int len)
-{
- int i;
-
- /*
- * Work backwards through the array to honor the kmap_local_page()
- * ordering requirements.
- */
- for (i = (len-1); i >= 0; i--) {
- if (kmap_mask & BIT(i)) {
- unsigned long addr = (unsigned long)iov[i].iov_base;
-
- kunmap_local((void *)(addr & PAGE_MASK));
- }
- }
-}
-
/*
* siw_tx_hdt() tries to push a complete packet to TCP where all
* packet fragments are referenced by the elements of one iovec.
@@ -438,30 +332,21 @@ static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s)
{
struct siw_wqe *wqe = &c_tx->wqe_active;
struct siw_sge *sge = &wqe->sqe.sge[c_tx->sge_idx];
- struct kvec iov[MAX_ARRAY];
- struct page *page_array[MAX_ARRAY];
+ struct bio_vec bvec[MAX_ARRAY];
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_EOR };
+ void *trl;
int seg = 0, do_crc = c_tx->do_crc, is_kva = 0, rv;
unsigned int data_len = c_tx->bytes_unsent, hdr_len = 0, trl_len = 0,
sge_off = c_tx->sge_off, sge_idx = c_tx->sge_idx,
pbl_idx = c_tx->pbl_idx;
- unsigned long kmap_mask = 0L;
if (c_tx->state == SIW_SEND_HDR) {
- if (c_tx->use_sendpage) {
- rv = siw_tx_ctrl(c_tx, s, MSG_DONTWAIT | MSG_MORE);
- if (rv)
- goto done;
+ void *hdr = &c_tx->pkt.ctrl + c_tx->ctrl_sent;
- c_tx->state = SIW_SEND_DATA;
- } else {
- iov[0].iov_base =
- (char *)&c_tx->pkt.ctrl + c_tx->ctrl_sent;
- iov[0].iov_len = hdr_len =
- c_tx->ctrl_len - c_tx->ctrl_sent;
- seg = 1;
- }
+ hdr_len = c_tx->ctrl_len - c_tx->ctrl_sent;
+ bvec_set_virt(&bvec[0], hdr, hdr_len);
+ seg = 1;
}
wqe->processed += data_len;
@@ -477,28 +362,9 @@ static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s)
} else {
is_kva = 1;
}
- if (is_kva && !c_tx->use_sendpage) {
- /*
- * tx from kernel virtual address: either inline data
- * or memory region with assigned kernel buffer
- */
- iov[seg].iov_base =
- ib_virt_dma_to_ptr(sge->laddr + sge_off);
- iov[seg].iov_len = sge_len;
-
- if (do_crc)
- crypto_shash_update(c_tx->mpa_crc_hd,
- iov[seg].iov_base,
- sge_len);
- sge_off += sge_len;
- data_len -= sge_len;
- seg++;
- goto sge_done;
- }
while (sge_len) {
size_t plen = min((int)PAGE_SIZE - fp_off, sge_len);
- void *kaddr;
if (!is_kva) {
struct page *p;
@@ -511,33 +377,12 @@ static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s)
p = siw_get_upage(mem->umem,
sge->laddr + sge_off);
if (unlikely(!p)) {
- siw_unmap_pages(iov, kmap_mask, seg);
wqe->processed -= c_tx->bytes_unsent;
rv = -EFAULT;
goto done_crc;
}
- page_array[seg] = p;
-
- if (!c_tx->use_sendpage) {
- void *kaddr = kmap_local_page(p);
-
- /* Remember for later kunmap() */
- kmap_mask |= BIT(seg);
- iov[seg].iov_base = kaddr + fp_off;
- iov[seg].iov_len = plen;
-
- if (do_crc)
- crypto_shash_update(
- c_tx->mpa_crc_hd,
- iov[seg].iov_base,
- plen);
- } else if (do_crc) {
- kaddr = kmap_local_page(p);
- crypto_shash_update(c_tx->mpa_crc_hd,
- kaddr + fp_off,
- plen);
- kunmap_local(kaddr);
- }
+
+ bvec_set_page(&bvec[seg], p, plen, fp_off);
} else {
/*
* Cast to an uintptr_t to preserve all 64 bits
@@ -545,12 +390,16 @@ static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s)
*/
u64 va = sge->laddr + sge_off;
- page_array[seg] = ib_virt_dma_to_page(va);
- if (do_crc)
- crypto_shash_update(
- c_tx->mpa_crc_hd,
- ib_virt_dma_to_ptr(va),
- plen);
+ bvec_set_virt(&bvec[seg], ib_virt_dma_to_ptr(va), plen);
+ }
+
+ if (do_crc) {
+ void *kaddr =
+ kmap_local_page(bvec[seg].bv_page);
+ crypto_shash_update(c_tx->mpa_crc_hd,
+ kaddr + bvec[seg].bv_offset,
+ bvec[seg].bv_len);
+ kunmap_local(kaddr);
}
sge_len -= plen;
@@ -560,13 +409,12 @@ static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s)
if (++seg >= (int)MAX_ARRAY) {
siw_dbg_qp(tx_qp(c_tx), "to many fragments\n");
- siw_unmap_pages(iov, kmap_mask, seg-1);
wqe->processed -= c_tx->bytes_unsent;
rv = -EMSGSIZE;
goto done_crc;
}
}
-sge_done:
+
/* Update SGE variables at end of SGE */
if (sge_off == sge->length &&
(data_len != 0 || wqe->processed < wqe->bytes)) {
@@ -575,15 +423,8 @@ static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s)
sge_off = 0;
}
}
- /* trailer */
- if (likely(c_tx->state != SIW_SEND_TRAILER)) {
- iov[seg].iov_base = &c_tx->trailer.pad[4 - c_tx->pad];
- iov[seg].iov_len = trl_len = MAX_TRAILER - (4 - c_tx->pad);
- } else {
- iov[seg].iov_base = &c_tx->trailer.pad[c_tx->ctrl_sent];
- iov[seg].iov_len = trl_len = MAX_TRAILER - c_tx->ctrl_sent;
- }
+ /* Set the CRC in the trailer */
if (c_tx->pad) {
*(u32 *)c_tx->trailer.pad = 0;
if (do_crc)
@@ -596,23 +437,23 @@ static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s)
else if (do_crc)
crypto_shash_final(c_tx->mpa_crc_hd, (u8 *)&c_tx->trailer.crc);
- data_len = c_tx->bytes_unsent;
-
- if (c_tx->use_sendpage) {
- rv = siw_0copy_tx(s, page_array, &wqe->sqe.sge[c_tx->sge_idx],
- c_tx->sge_off, data_len);
- if (rv == data_len) {
- rv = kernel_sendmsg(s, &msg, &iov[seg], 1, trl_len);
- if (rv > 0)
- rv += data_len;
- else
- rv = data_len;
- }
+ /* Copy the trailer and add it to the output list */
+ if (likely(c_tx->state != SIW_SEND_TRAILER)) {
+ trl = &c_tx->trailer.pad[4 - c_tx->pad];
+ trl_len = MAX_TRAILER - (4 - c_tx->pad);
} else {
- rv = kernel_sendmsg(s, &msg, iov, seg + 1,
- hdr_len + data_len + trl_len);
- siw_unmap_pages(iov, kmap_mask, seg);
+ trl = &c_tx->trailer.pad[c_tx->ctrl_sent];
+ trl_len = MAX_TRAILER - c_tx->ctrl_sent;
}
+ bvec_set_virt(&bvec[seg], trl, trl_len);
+
+ data_len = c_tx->bytes_unsent;
+
+ if (c_tx->use_sendpage)
+ msg.msg_flags |= MSG_SPLICE_PAGES;
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, bvec, seg + 1,
+ hdr_len + data_len + trl_len);
+ rv = sock_sendmsg(s, &msg);
if (rv < (int)hdr_len) {
/* Not even complete hdr pushed or negative rv */
wqe->processed -= data_len;
@@ -673,7 +514,6 @@ static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s)
}
done_crc:
c_tx->do_crc = 0;
-done:
return rv;
}
Use sendmsg() with MSG_SPLICE_PAGES rather than sendpage in
skb_send_sock(). This causes pages to be spliced from the source iterator
if possible.
This allows ->sendpage() to be replaced by something that can handle
multiple multipage folios in a single transaction.
Note that this could perhaps be improved to fill out a bvec array with all
the frags and then make a single sendmsg call, possibly sticking the header
on the front also.
Signed-off-by: David Howells <[email protected]>
cc: "David S. Miller" <[email protected]>
cc: Eric Dumazet <[email protected]>
cc: Jakub Kicinski <[email protected]>
cc: Paolo Abeni <[email protected]>
cc: Jens Axboe <[email protected]>
cc: Matthew Wilcox <[email protected]>
cc: [email protected]
---
net/core/skbuff.c | 49 ++++++++++++++++++++++++++---------------------
1 file changed, 27 insertions(+), 22 deletions(-)
diff --git a/net/core/skbuff.c b/net/core/skbuff.c
index c388a73e5d4e..e518bb57183a 100644
--- a/net/core/skbuff.c
+++ b/net/core/skbuff.c
@@ -2990,32 +2990,32 @@ int skb_splice_bits(struct sk_buff *skb, struct sock *sk, unsigned int offset,
}
EXPORT_SYMBOL_GPL(skb_splice_bits);
-static int sendmsg_unlocked(struct sock *sk, struct msghdr *msg,
- struct kvec *vec, size_t num, size_t size)
+static int sendmsg_locked(struct sock *sk, struct msghdr *msg)
{
struct socket *sock = sk->sk_socket;
+ size_t size = msg_data_left(msg);
if (!sock)
return -EINVAL;
- return kernel_sendmsg(sock, msg, vec, num, size);
+
+ if (!sock->ops->sendmsg_locked)
+ return sock_no_sendmsg_locked(sk, msg, size);
+
+ return sock->ops->sendmsg_locked(sk, msg, size);
}
-static int sendpage_unlocked(struct sock *sk, struct page *page, int offset,
- size_t size, int flags)
+static int sendmsg_unlocked(struct sock *sk, struct msghdr *msg)
{
struct socket *sock = sk->sk_socket;
if (!sock)
return -EINVAL;
- return kernel_sendpage(sock, page, offset, size, flags);
+ return sock_sendmsg(sock, msg);
}
-typedef int (*sendmsg_func)(struct sock *sk, struct msghdr *msg,
- struct kvec *vec, size_t num, size_t size);
-typedef int (*sendpage_func)(struct sock *sk, struct page *page, int offset,
- size_t size, int flags);
+typedef int (*sendmsg_func)(struct sock *sk, struct msghdr *msg);
static int __skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset,
- int len, sendmsg_func sendmsg, sendpage_func sendpage)
+ int len, sendmsg_func sendmsg)
{
unsigned int orig_len = len;
struct sk_buff *head = skb;
@@ -3035,8 +3035,9 @@ static int __skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset,
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;
- ret = INDIRECT_CALL_2(sendmsg, kernel_sendmsg_locked,
- sendmsg_unlocked, sk, &msg, &kv, 1, slen);
+ iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, &kv, 1, slen);
+ ret = INDIRECT_CALL_2(sendmsg, sendmsg_locked,
+ sendmsg_unlocked, sk, &msg);
if (ret <= 0)
goto error;
@@ -3067,11 +3068,17 @@ static int __skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset,
slen = min_t(size_t, len, skb_frag_size(frag) - offset);
while (slen) {
- ret = INDIRECT_CALL_2(sendpage, kernel_sendpage_locked,
- sendpage_unlocked, sk,
- skb_frag_page(frag),
- skb_frag_off(frag) + offset,
- slen, MSG_DONTWAIT);
+ struct bio_vec bvec;
+ struct msghdr msg = {
+ .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT,
+ };
+
+ bvec_set_page(&bvec, skb_frag_page(frag), slen,
+ skb_frag_off(frag) + offset);
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, slen);
+
+ ret = INDIRECT_CALL_2(sendmsg, sendmsg_locked,
+ sendmsg_unlocked, sk, &msg);
if (ret <= 0)
goto error;
@@ -3108,16 +3115,14 @@ static int __skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset,
int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
int len)
{
- return __skb_send_sock(sk, skb, offset, len, kernel_sendmsg_locked,
- kernel_sendpage_locked);
+ return __skb_send_sock(sk, skb, offset, len, sendmsg_locked);
}
EXPORT_SYMBOL_GPL(skb_send_sock_locked);
/* Send skb data on a socket. Socket must be unlocked. */
int skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset, int len)
{
- return __skb_send_sock(sk, skb, offset, len, sendmsg_unlocked,
- sendpage_unlocked);
+ return __skb_send_sock(sk, skb, offset, len, sendmsg_unlocked);
}
/**
Display information about the memory handling MSG_SPLICE_PAGES does to copy
slabbed data into page fragments.
For each CPU that has a cached folio, it displays the folio pfn, the offset
pointer within the folio and the size of the folio.
It also displays the number of pages refurbished and the number of pages
replaced.
Signed-off-by: David Howells <[email protected]>
cc: Alexander Duyck <[email protected]>
cc: Eric Dumazet <[email protected]>
cc: "David S. Miller" <[email protected]>
cc: David Ahern <[email protected]>
cc: Jakub Kicinski <[email protected]>
cc: Paolo Abeni <[email protected]>
cc: Jens Axboe <[email protected]>
cc: Matthew Wilcox <[email protected]>
cc: Menglong Dong <[email protected]>
cc: [email protected]
---
net/core/skbuff.c | 42 +++++++++++++++++++++++++++++++++++++++---
1 file changed, 39 insertions(+), 3 deletions(-)
diff --git a/net/core/skbuff.c b/net/core/skbuff.c
index 9bd8d6bf6c21..c388a73e5d4e 100644
--- a/net/core/skbuff.c
+++ b/net/core/skbuff.c
@@ -83,6 +83,7 @@
#include <linux/user_namespace.h>
#include <linux/indirect_call_wrapper.h>
#include <linux/textsearch.h>
+#include <linux/proc_fs.h>
#include "dev.h"
#include "sock_destructor.h"
@@ -6758,6 +6759,7 @@ nodefer: __kfree_skb(skb);
struct skb_splice_frag_cache {
struct folio *folio;
void *virt;
+ unsigned int fsize;
unsigned int offset;
/* we maintain a pagecount bias, so that we dont dirty cache line
* containing page->_refcount every time we allocate a fragment.
@@ -6767,6 +6769,26 @@ struct skb_splice_frag_cache {
};
static DEFINE_PER_CPU(struct skb_splice_frag_cache, skb_splice_frag_cache);
+static atomic_t skb_splice_frag_replaced, skb_splice_frag_refurbished;
+
+static int skb_splice_show(struct seq_file *m, void *data)
+{
+ int cpu;
+
+ seq_printf(m, "refurb=%u repl=%u\n",
+ atomic_read(&skb_splice_frag_refurbished),
+ atomic_read(&skb_splice_frag_replaced));
+
+ for_each_possible_cpu(cpu) {
+ const struct skb_splice_frag_cache *cache =
+ per_cpu_ptr(&skb_splice_frag_cache, cpu);
+
+ seq_printf(m, "[%u] %lx %u/%u\n",
+ cpu, folio_pfn(cache->folio),
+ cache->offset, cache->fsize);
+ }
+ return 0;
+}
/**
* alloc_skb_frag - Allocate a page fragment for using in a socket
@@ -6803,17 +6825,21 @@ void *alloc_skb_frag(size_t fragsz, gfp_t gfp)
insufficient_space:
/* See if we can refurbish the current folio. */
- if (!folio || !folio_ref_sub_and_test(folio, cache->pagecnt_bias))
+ if (!folio)
goto get_new_folio;
+ if (!folio_ref_sub_and_test(folio, cache->pagecnt_bias))
+ goto replace_folio;
if (unlikely(cache->pfmemalloc)) {
__folio_put(folio);
- goto get_new_folio;
+ goto replace_folio;
}
fsize = folio_size(folio);
if (unlikely(fragsz > fsize))
goto frag_too_big;
+ atomic_inc(&skb_splice_frag_refurbished);
+
/* OK, page count is 0, we can safely set it */
folio_set_count(folio, PAGE_FRAG_CACHE_MAX_SIZE + 1);
@@ -6822,6 +6848,8 @@ void *alloc_skb_frag(size_t fragsz, gfp_t gfp)
offset = fsize;
goto try_again;
+replace_folio:
+ atomic_inc(&skb_splice_frag_replaced);
get_new_folio:
if (!spare) {
cache->folio = NULL;
@@ -6848,6 +6876,7 @@ void *alloc_skb_frag(size_t fragsz, gfp_t gfp)
cache->folio = spare;
cache->virt = folio_address(spare);
+ cache->fsize = folio_size(spare);
folio = spare;
spare = NULL;
@@ -6858,7 +6887,7 @@ void *alloc_skb_frag(size_t fragsz, gfp_t gfp)
/* Reset page count bias and offset to start of new frag */
cache->pagecnt_bias = PAGE_FRAG_CACHE_MAX_SIZE + 1;
- offset = folio_size(folio);
+ offset = cache->fsize;
goto try_again;
frag_too_big:
@@ -7008,3 +7037,10 @@ ssize_t skb_splice_from_iter(struct sk_buff *skb, struct iov_iter *iter,
return spliced ?: ret;
}
EXPORT_SYMBOL(skb_splice_from_iter);
+
+static int skb_splice_init(void)
+{
+ proc_create_single("pagefrags", S_IFREG | 0444, NULL, &skb_splice_show);
+ return 0;
+}
+late_initcall(skb_splice_init);
Use sendmsg() and MSG_SPLICE_PAGES rather than sendpage in ceph when
transmitting data. For the moment, this can only transmit one page at a
time because of the architecture of net/ceph/, but if
write_partial_message_data() can be given a bvec[] at a time by the
iteration code, this would allow pages to be sent in a batch.
Signed-off-by: David Howells <[email protected]>
cc: Ilya Dryomov <[email protected]>
cc: Xiubo Li <[email protected]>
cc: Jeff Layton <[email protected]>
cc: "David S. Miller" <[email protected]>
cc: Eric Dumazet <[email protected]>
cc: Jakub Kicinski <[email protected]>
cc: Paolo Abeni <[email protected]>
cc: Jens Axboe <[email protected]>
cc: Matthew Wilcox <[email protected]>
cc: [email protected]
cc: [email protected]
---
net/ceph/messenger_v1.c | 58 ++++++++++++++---------------------------
1 file changed, 19 insertions(+), 39 deletions(-)
diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c
index d664cb1593a7..f082e5c780a3 100644
--- a/net/ceph/messenger_v1.c
+++ b/net/ceph/messenger_v1.c
@@ -74,37 +74,6 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
return r;
}
-/*
- * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
- */
-static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
- int offset, size_t size, int more)
-{
- ssize_t (*sendpage)(struct socket *sock, struct page *page,
- int offset, size_t size, int flags);
- int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
- int ret;
-
- /*
- * sendpage cannot properly handle pages with page_count == 0,
- * we need to fall back to sendmsg if that's the case.
- *
- * Same goes for slab pages: skb_can_coalesce() allows
- * coalescing neighboring slab objects into a single frag which
- * triggers one of hardened usercopy checks.
- */
- if (sendpage_ok(page))
- sendpage = sock->ops->sendpage;
- else
- sendpage = sock_no_sendpage;
-
- ret = sendpage(sock, page, offset, size, flags);
- if (ret == -EAGAIN)
- ret = 0;
-
- return ret;
-}
-
static void con_out_kvec_reset(struct ceph_connection *con)
{
BUG_ON(con->v1.out_skip);
@@ -464,7 +433,6 @@ static int write_partial_message_data(struct ceph_connection *con)
struct ceph_msg *msg = con->out_msg;
struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
- int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
u32 crc;
dout("%s %p msg %p\n", __func__, con, msg);
@@ -482,6 +450,10 @@ static int write_partial_message_data(struct ceph_connection *con)
*/
crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
while (cursor->total_resid) {
+ struct bio_vec bvec;
+ struct msghdr msghdr = {
+ .msg_flags = MSG_SPLICE_PAGES,
+ };
struct page *page;
size_t page_offset;
size_t length;
@@ -494,9 +466,12 @@ static int write_partial_message_data(struct ceph_connection *con)
page = ceph_msg_data_next(cursor, &page_offset, &length);
if (length == cursor->total_resid)
- more = MSG_MORE;
- ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
- more);
+ msghdr.msg_flags |= MSG_MORE;
+
+ bvec_set_page(&bvec, page, length, page_offset);
+ iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, length);
+
+ ret = sock_sendmsg(con->sock, &msghdr);
if (ret <= 0) {
if (do_datacrc)
msg->footer.data_crc = cpu_to_le32(crc);
@@ -526,7 +501,10 @@ static int write_partial_message_data(struct ceph_connection *con)
*/
static int write_partial_skip(struct ceph_connection *con)
{
- int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
+ struct bio_vec bvec;
+ struct msghdr msghdr = {
+ .msg_flags = MSG_SPLICE_PAGES | MSG_MORE,
+ };
int ret;
dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
@@ -534,9 +512,11 @@ static int write_partial_skip(struct ceph_connection *con)
size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
if (size == con->v1.out_skip)
- more = MSG_MORE;
- ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
- more);
+ msghdr.msg_flags &= ~MSG_MORE;
+ bvec_set_page(&bvec, ZERO_PAGE(0), size, 0);
+ iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, size);
+
+ ret = sock_sendmsg(con->sock, &msghdr);
if (ret <= 0)
goto out;
con->v1.out_skip -= ret;
Since _drdb_sendpage() is now using sendmsg to send the pages rather
sendpage, pass the entire bio in one go using a bvec iterator instead of
doing it piecemeal.
Signed-off-by: David Howells <[email protected]>
cc: Philipp Reisner <[email protected]>
cc: Lars Ellenberg <[email protected]>
cc: "Christoph Böhmwalder" <[email protected]>
cc: Jens Axboe <[email protected]>
cc: "David S. Miller" <[email protected]>
cc: Eric Dumazet <[email protected]>
cc: Jakub Kicinski <[email protected]>
cc: Paolo Abeni <[email protected]>
cc: [email protected]
cc: [email protected]
cc: [email protected]
---
drivers/block/drbd/drbd_main.c | 77 +++++++++++-----------------------
1 file changed, 25 insertions(+), 52 deletions(-)
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index c9d84183844c..f7ebdb5ab45e 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -1520,28 +1520,15 @@ static void drbd_update_congested(struct drbd_connection *connection)
* As a workaround, we disable sendpage on pages
* with page_count == 0 or PageSlab.
*/
-static int _drbd_no_send_page(struct drbd_peer_device *peer_device, struct page *page,
- int offset, size_t size, unsigned msg_flags)
-{
- struct socket *socket;
- void *addr;
- int err;
-
- socket = peer_device->connection->data.socket;
- addr = kmap(page) + offset;
- err = drbd_send_all(peer_device->connection, socket, addr, size, msg_flags);
- kunmap(page);
- if (!err)
- peer_device->device->send_cnt += size >> 9;
- return err;
-}
-
-static int _drbd_send_page(struct drbd_peer_device *peer_device, struct page *page,
- int offset, size_t size, unsigned msg_flags)
+static int _drbd_send_pages(struct drbd_peer_device *peer_device,
+ struct iov_iter *iter, unsigned msg_flags)
{
struct socket *socket = peer_device->connection->data.socket;
- struct bio_vec bvec;
- struct msghdr msg = { .msg_flags = msg_flags, };
+ struct msghdr msg = {
+ .msg_flags = msg_flags | MSG_NOSIGNAL,
+ .msg_iter = *iter,
+ };
+ size_t size = iov_iter_count(iter);
int err = -EIO;
/* e.g. XFS meta- & log-data is in slab pages, which have a
@@ -1550,11 +1537,8 @@ static int _drbd_send_page(struct drbd_peer_device *peer_device, struct page *pa
* put_page(); and would cause either a VM_BUG directly, or
* __page_cache_release a page that would actually still be referenced
* by someone, leading to some obscure delayed Oops somewhere else. */
- if (!drbd_disable_sendpage && sendpage_ok(page))
- msg.msg_flags |= MSG_NOSIGNAL | MSG_SPLICE_PAGES;
-
- bvec_set_page(&bvec, page, offset, size);
- iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
+ if (drbd_disable_sendpage)
+ msg.msg_flags &= ~(MSG_NOSIGNAL | MSG_SPLICE_PAGES);
drbd_update_congested(peer_device->connection);
do {
@@ -1585,39 +1569,22 @@ static int _drbd_send_page(struct drbd_peer_device *peer_device, struct page *pa
static int _drbd_send_bio(struct drbd_peer_device *peer_device, struct bio *bio)
{
- struct bio_vec bvec;
- struct bvec_iter iter;
+ struct iov_iter iter;
- /* hint all but last page with MSG_MORE */
- bio_for_each_segment(bvec, bio, iter) {
- int err;
+ iov_iter_bvec(&iter, ITER_SOURCE, bio->bi_io_vec, bio->bi_vcnt,
+ bio->bi_iter.bi_size);
- err = _drbd_no_send_page(peer_device, bvec.bv_page,
- bvec.bv_offset, bvec.bv_len,
- bio_iter_last(bvec, iter)
- ? 0 : MSG_MORE);
- if (err)
- return err;
- }
- return 0;
+ return _drbd_send_pages(peer_device, &iter, 0);
}
static int _drbd_send_zc_bio(struct drbd_peer_device *peer_device, struct bio *bio)
{
- struct bio_vec bvec;
- struct bvec_iter iter;
+ struct iov_iter iter;
- /* hint all but last page with MSG_MORE */
- bio_for_each_segment(bvec, bio, iter) {
- int err;
+ iov_iter_bvec(&iter, ITER_SOURCE, bio->bi_io_vec, bio->bi_vcnt,
+ bio->bi_iter.bi_size);
- err = _drbd_send_page(peer_device, bvec.bv_page,
- bvec.bv_offset, bvec.bv_len,
- bio_iter_last(bvec, iter) ? 0 : MSG_MORE);
- if (err)
- return err;
- }
- return 0;
+ return _drbd_send_pages(peer_device, &iter, MSG_SPLICE_PAGES);
}
static int _drbd_send_zc_ee(struct drbd_peer_device *peer_device,
@@ -1629,10 +1596,16 @@ static int _drbd_send_zc_ee(struct drbd_peer_device *peer_device,
/* hint all but last page with MSG_MORE */
page_chain_for_each(page) {
+ struct iov_iter iter;
+ struct bio_vec bvec;
unsigned l = min_t(unsigned, len, PAGE_SIZE);
- err = _drbd_send_page(peer_device, page, 0, l,
- page_chain_next(page) ? MSG_MORE : 0);
+ bvec_set_page(&bvec, page, 0, l);
+ iov_iter_bvec(&iter, ITER_SOURCE, &bvec, 1, l);
+
+ err = _drbd_send_pages(peer_device, &iter,
+ MSG_SPLICE_PAGES |
+ (page_chain_next(page) ? MSG_MORE : 0));
if (err)
return err;
len -= l;
Use sendmsg() with MSG_SPLICE_PAGES rather than sendpage. This allows
multiple pages and multipage folios to be passed through.
TODO: iscsit_fe_sendpage_sg() should perhaps set up a bio_vec array for the
entire set of pages it's going to transfer plus two for the header and
trailer and page fragments to hold the header and trailer - and then call
sendmsg once for the entire message.
Signed-off-by: David Howells <[email protected]>
cc: "Martin K. Petersen" <[email protected]>
cc: "David S. Miller" <[email protected]>
cc: Eric Dumazet <[email protected]>
cc: Jakub Kicinski <[email protected]>
cc: Paolo Abeni <[email protected]>
cc: Jens Axboe <[email protected]>
cc: Matthew Wilcox <[email protected]>
cc: [email protected]
cc: [email protected]
cc: [email protected]
---
drivers/scsi/iscsi_tcp.c | 26 +++++++++---------------
drivers/scsi/iscsi_tcp.h | 2 +-
drivers/target/iscsi/iscsi_target_util.c | 14 +++++++------
3 files changed, 19 insertions(+), 23 deletions(-)
diff --git a/drivers/scsi/iscsi_tcp.c b/drivers/scsi/iscsi_tcp.c
index 9637d4bc2bc9..9ab8555180a3 100644
--- a/drivers/scsi/iscsi_tcp.c
+++ b/drivers/scsi/iscsi_tcp.c
@@ -301,35 +301,32 @@ static int iscsi_sw_tcp_xmit_segment(struct iscsi_tcp_conn *tcp_conn,
while (!iscsi_tcp_segment_done(tcp_conn, segment, 0, r)) {
struct scatterlist *sg;
+ struct msghdr msg = {};
+ struct bio_vec bv;
unsigned int offset, copy;
- int flags = 0;
r = 0;
offset = segment->copied;
copy = segment->size - offset;
if (segment->total_copied + segment->size < segment->total_size)
- flags |= MSG_MORE | MSG_SENDPAGE_NOTLAST;
+ msg.msg_flags |= MSG_MORE;
if (tcp_sw_conn->queue_recv)
- flags |= MSG_DONTWAIT;
+ msg.msg_flags |= MSG_DONTWAIT;
- /* Use sendpage if we can; else fall back to sendmsg */
if (!segment->data) {
+ if (!tcp_conn->iscsi_conn->datadgst_en)
+ msg.msg_flags |= MSG_SPLICE_PAGES;
sg = segment->sg;
offset += segment->sg_offset + sg->offset;
- r = tcp_sw_conn->sendpage(sk, sg_page(sg), offset,
- copy, flags);
+ bvec_set_page(&bv, sg_page(sg), copy, offset);
} else {
- struct msghdr msg = { .msg_flags = flags };
- struct kvec iov = {
- .iov_base = segment->data + offset,
- .iov_len = copy
- };
-
- r = kernel_sendmsg(sk, &msg, &iov, 1, copy);
+ bvec_set_virt(&bv, segment->data + offset, copy);
}
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bv, 1, copy);
+ r = sock_sendmsg(sk, &msg);
if (r < 0) {
iscsi_tcp_segment_unmap(segment);
return r;
@@ -746,7 +743,6 @@ iscsi_sw_tcp_conn_bind(struct iscsi_cls_session *cls_session,
sock_no_linger(sk);
iscsi_sw_tcp_conn_set_callbacks(conn);
- tcp_sw_conn->sendpage = tcp_sw_conn->sock->ops->sendpage;
/*
* set receive state machine into initial state
*/
@@ -777,8 +773,6 @@ static int iscsi_sw_tcp_conn_set_param(struct iscsi_cls_conn *cls_conn,
return -ENOTCONN;
}
iscsi_set_param(cls_conn, param, buf, buflen);
- tcp_sw_conn->sendpage = conn->datadgst_en ?
- sock_no_sendpage : tcp_sw_conn->sock->ops->sendpage;
mutex_unlock(&tcp_sw_conn->sock_lock);
break;
case ISCSI_PARAM_MAX_R2T:
diff --git a/drivers/scsi/iscsi_tcp.h b/drivers/scsi/iscsi_tcp.h
index 68e14a344904..d6ec08d7eb63 100644
--- a/drivers/scsi/iscsi_tcp.h
+++ b/drivers/scsi/iscsi_tcp.h
@@ -48,7 +48,7 @@ struct iscsi_sw_tcp_conn {
uint32_t sendpage_failures_cnt;
uint32_t discontiguous_hdr_cnt;
- ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int);
+ bool can_splice_to_tcp;
};
struct iscsi_sw_tcp_host {
diff --git a/drivers/target/iscsi/iscsi_target_util.c b/drivers/target/iscsi/iscsi_target_util.c
index b14835fcb033..8bab1898f1d0 100644
--- a/drivers/target/iscsi/iscsi_target_util.c
+++ b/drivers/target/iscsi/iscsi_target_util.c
@@ -1129,6 +1129,8 @@ int iscsit_fe_sendpage_sg(
struct iscsit_conn *conn)
{
struct scatterlist *sg = cmd->first_data_sg;
+ struct bio_vec bvec;
+ struct msghdr msghdr = { .msg_flags = MSG_SPLICE_PAGES, };
struct kvec iov;
u32 tx_hdr_size, data_len;
u32 offset = cmd->first_data_sg_off;
@@ -1172,17 +1174,17 @@ int iscsit_fe_sendpage_sg(
u32 space = (sg->length - offset);
u32 sub_len = min_t(u32, data_len, space);
send_pg:
- tx_sent = conn->sock->ops->sendpage(conn->sock,
- sg_page(sg), sg->offset + offset, sub_len, 0);
+ bvec_set_page(&bvec, sg_page(sg), sub_len, sg->offset + offset);
+ iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, sub_len);
+
+ tx_sent = conn->sock->ops->sendmsg(conn->sock, &msghdr, sub_len);
if (tx_sent != sub_len) {
if (tx_sent == -EAGAIN) {
- pr_err("tcp_sendpage() returned"
- " -EAGAIN\n");
+ pr_err("sendmsg/splice returned -EAGAIN\n");
goto send_pg;
}
- pr_err("tcp_sendpage() failure: %d\n",
- tx_sent);
+ pr_err("sendmsg/splice failure: %d\n", tx_sent);
return -1;
}
On Fri, Jun 16, 2023 at 05:12:44PM +0100, David Howells wrote:
...
> +/**
> + * alloc_skb_frag - Allocate a page fragment for using in a socket
> + * @fragsz: The size of fragment required
> + * @gfp: Allocation flags
> + */
> +void *alloc_skb_frag(size_t fragsz, gfp_t gfp)
> +{
> + struct skb_splice_frag_cache *cache;
> + struct folio *folio, *spare = NULL;
> + size_t offset, fsize;
> + void *p;
> +
> + if (WARN_ON_ONCE(fragsz == 0))
> + fragsz = 1;
> +
> + cache = get_cpu_ptr(&skb_splice_frag_cache);
> +reload:
> + folio = cache->folio;
> + offset = cache->offset;
> +try_again:
> + if (fragsz > offset)
> + goto insufficient_space;
> +
> + /* Make the allocation. */
> + cache->pagecnt_bias--;
> + offset = ALIGN_DOWN(offset - fragsz, SMP_CACHE_BYTES);
> + cache->offset = offset;
> + p = cache->virt + offset;
> + put_cpu_ptr(skb_splice_frag_cache);
Hi David,
I don't think it makes any difference at run-time.
But to keep Sparse happy, perhaps this ought to be put_cpu_var()
...
Simon Horman <[email protected]> wrote:
> > + cache = get_cpu_ptr(&skb_splice_frag_cache);
...
> > + put_cpu_ptr(skb_splice_frag_cache);
>
> Hi David,
>
> I don't think it makes any difference at run-time.
> But to keep Sparse happy, perhaps this ought to be put_cpu_var()
Actually, the problem is a missing "&". I think I should use put_cpu_ptr() to
match get_cpu_ptr(). It doesn't crash because the argument is ignored.
David
On Sat, Jun 17, 2023 at 07:43:15AM +0100, David Howells wrote:
> Simon Horman <[email protected]> wrote:
>
> > > + cache = get_cpu_ptr(&skb_splice_frag_cache);
> ...
> > > + put_cpu_ptr(skb_splice_frag_cache);
> >
> > Hi David,
> >
> > I don't think it makes any difference at run-time.
> > But to keep Sparse happy, perhaps this ought to be put_cpu_var()
>
> Actually, the problem is a missing "&". I think I should use put_cpu_ptr() to
> match get_cpu_ptr(). It doesn't crash because the argument is ignored.
Thanks David, I agree that is a better idea.