2017-06-23 21:17:01

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 00/18] Server-side NFS/RDMA changes proposed for v4.13

Hi Bruce-

This series overhauls the "call receive" side of the RPC-over-RDMA
transport to use the new rdma_rw API. A slight performance increase
(better throughput and latency) has been seen. Benefits include:

o Fewer ib_post_send calls per RPC Call

o Code duplicated by other ULPs is replaced with core functions

o Position-Zero Read chunks are handled in one function

o More thorough transport header sanity checking


IMO these are ready for you to consider for v4.13. Have a pleasant
weekend.


Available in the "nfsd-rdma-for-4.13" topic branch of this git repo:

git://git.linux-nfs.org/projects/cel/cel-2.6.git


Or for browsing:

http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfsd-rdma-for-4.13


Remaining rdma_rw API changes are here:

http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfsd-rdma-rw-api


Changes since v2:
* Rebased on v4.12-rc6 to pick up an RDMA CM fix
* Fix op_ctxt leak when client forces a disconnect
* Updated krb5i RQ_SPLICE_OK fix based on review comments
* Dropped enable-force-mr patch


Changes since v1:
* Fixed bugzilla.linux-nfs 307
* Cleaned up struct svc_rdma_chunk_ctxt

---

Chuck Lever (18):
sunrpc: Disable splice for krb5i
svcrdma: Squelch disconnection messages
svcrdma: Avoid Send Queue overflow
svcrdma: Remove svc_rdma_marshal.c
svcrdma: Improve Read chunk sanity checking
svcrdma: Improve Write chunk sanity checking
svcrdma: Improve Reply chunk sanity checking
svcrdma: Don't account for Receive queue "starvation"
sunrpc: Allocate one more page per svc_rqst
svcrdma: Add recvfrom helpers to svc_rdma_rw.c
svcrdma: Use generic RDMA R/W API in RPC Call path
svcrdma: Properly compute .len and .buflen for received RPC Calls
svcrdma: Remove unused Read completion handlers
svcrdma: Remove frmr cache
svcrdma: Clean-up svc_rdma_unmap_dma
svcrdma: Clean up after converting svc_rdma_recvfrom to rdma_rw API
svcrdma: use offset_in_page() macro
svcrdma: Remove svc_rdma_chunk_ctxt::cc_dir field


include/linux/sunrpc/svc.h | 3
include/linux/sunrpc/svc_rdma.h | 46 --
net/sunrpc/auth_gss/svcauth_gss.c | 8
net/sunrpc/svc.c | 2
net/sunrpc/svc_xprt.c | 8
net/sunrpc/xprtrdma/Makefile | 4
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 168 -------
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 734 ++++++++++++------------------
net/sunrpc/xprtrdma/svc_rdma_rw.c | 449 ++++++++++++++++++
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 15 -
net/sunrpc/xprtrdma/svc_rdma_transport.c | 250 +---------
11 files changed, 776 insertions(+), 911 deletions(-)
delete mode 100644 net/sunrpc/xprtrdma/svc_rdma_marshal.c

--
Chuck Lever


2017-06-23 21:17:09

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 01/18] sunrpc: Disable splice for krb5i

Running a multi-threaded 8KB fio test (70/30 mix), three or four out
of twelve of the jobs fail when using krb5i. The failure is an EIO
on a read.

Troubleshooting confirmed the EIO results when the client fails to
verify the MIC of an NFS READ reply. Bruce suggested the problem
could be due to the data payload changing between the time the
reply's MIC was computed on the server and the time the reply was
actually sent.

krb5p gets around this problem by disabling RQ_SPLICE_OK. Use the
same mechanism for krb5i RPCs.

"iozone -i0 -i1 -s128m -y1k -az -I", export is tmpfs, mount is
sec=krb5i,vers=3,proto=rdma. The important numbers are the
read / reread column.

Here's without the RQ_SPLICE_OK patch:

kB reclen write rewrite read reread
131072 1 7546 7929 8396 8267
131072 2 14375 14600 15843 15639
131072 4 19280 19248 21303 21410
131072 8 32350 31772 35199 34883
131072 16 36748 37477 49365 51706
131072 32 55669 56059 57475 57389
131072 64 74599 75190 74903 75550
131072 128 99810 101446 102828 102724
131072 256 122042 122612 124806 125026
131072 512 137614 138004 141412 141267
131072 1024 146601 148774 151356 151409
131072 2048 180684 181727 293140 292840
131072 4096 206907 207658 552964 549029
131072 8192 223982 224360 454493 473469
131072 16384 228927 228390 654734 632607

And here's with it:

kB reclen write rewrite read reread
131072 1 7700 7365 7958 8011
131072 2 13211 13303 14937 14414
131072 4 19001 19265 20544 20657
131072 8 30883 31097 34255 33566
131072 16 36868 34908 51499 49944
131072 32 56428 55535 58710 56952
131072 64 73507 74676 75619 74378
131072 128 100324 101442 103276 102736
131072 256 122517 122995 124639 124150
131072 512 137317 139007 140530 140830
131072 1024 146807 148923 151246 151072
131072 2048 179656 180732 292631 292034
131072 4096 206216 208583 543355 541951
131072 8192 223738 224273 494201 489372
131072 16384 229313 229840 691719 668427

I would say that there is not much difference in this test.

For good measure, here's the same test with sec=krb5p:

kB reclen write rewrite read reread
131072 1 5982 5881 6137 6218
131072 2 10216 10252 10850 10932
131072 4 12236 12575 15375 15526
131072 8 15461 15462 23821 22351
131072 16 25677 25811 27529 27640
131072 32 31903 32354 34063 33857
131072 64 42989 43188 45635 45561
131072 128 52848 53210 56144 56141
131072 256 59123 59214 62691 62933
131072 512 63140 63277 66887 67025
131072 1024 65255 65299 69213 69140
131072 2048 76454 76555 133767 133862
131072 4096 84726 84883 251925 250702
131072 8192 89491 89482 270821 276085
131072 16384 91572 91597 361768 336868

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=307
Signed-off-by: Chuck Lever <[email protected]>
Reviewed-by: Jeff Layton <[email protected]>
---
net/sunrpc/auth_gss/svcauth_gss.c | 8 ++++++++
net/sunrpc/svc.c | 2 +-
2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index a54a7a3..7b1ee5a 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -838,6 +838,14 @@ u32 svcauth_gss_flavor(struct auth_domain *dom)
struct xdr_netobj mic;
struct xdr_buf integ_buf;

+ /* NFS READ normally uses splice to send data in-place. However
+ * the data in cache can change after the reply's MIC is computed
+ * but before the RPC reply is sent. To prevent the client from
+ * rejecting the server-computed MIC in this somewhat rare case,
+ * do not use splice with the GSS integrity service.
+ */
+ clear_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
+
/* Did we already verify the signature on the original pass through? */
if (rqstp->rq_deferred)
return 0;
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index bc0f5a0..3dd8e86 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1166,7 +1166,7 @@ static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ..
if (argv->iov_len < 6*4)
goto err_short_len;

- /* Will be turned off only in gss privacy case: */
+ /* Will be turned off by GSS integrity and privacy services */
set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
/* Will be turned off only when NFSv4 Sessions are used */
set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);


2017-06-23 21:17:17

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 02/18] svcrdma: Squelch disconnection messages

The server displays "svcrdma: failed to post Send WR (-107)" in the
kernel log when the client disconnects. This could flood the server's
log, so remove the message.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 1736337..5ba6d91 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -313,13 +313,17 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
dma_addr = ib_dma_map_page(dev, virt_to_page(base),
offset, len, DMA_TO_DEVICE);
if (ib_dma_mapping_error(dev, dma_addr))
- return -EIO;
+ goto out_maperr;

ctxt->sge[sge_no].addr = dma_addr;
ctxt->sge[sge_no].length = len;
ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
svc_rdma_count_mappings(rdma, ctxt);
return 0;
+
+out_maperr:
+ pr_err("svcrdma: failed to map buffer\n");
+ return -EIO;
}

static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -334,13 +338,17 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,

dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
if (ib_dma_mapping_error(dev, dma_addr))
- return -EIO;
+ goto out_maperr;

ctxt->sge[sge_no].addr = dma_addr;
ctxt->sge[sge_no].length = len;
ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
svc_rdma_count_mappings(rdma, ctxt);
return 0;
+
+out_maperr:
+ pr_err("svcrdma: failed to map page\n");
+ return -EIO;
}

/**
@@ -547,7 +555,6 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
return 0;

err:
- pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
svc_rdma_unmap_dma(ctxt);
svc_rdma_put_context(ctxt, 1);
return ret;


2017-06-23 21:17:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 03/18] svcrdma: Avoid Send Queue overflow

Sanity case: Catch the case where more Work Requests are being
posted to the Send Queue than there are Send Queue Entries.

This might happen if a client sends a chunk with more segments than
there are SQEs for the transport. The server can't send that reply,
so the transport will deadlock unless the client drops the RPC.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_rw.c | 5 +++++
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 2 +-
2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 0cf6202..3b35364 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -232,6 +232,9 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
struct ib_cqe *cqe;
int ret;

+ if (cc->cc_sqecount > rdma->sc_sq_depth)
+ return -EINVAL;
+
first_wr = NULL;
cqe = &cc->cc_cqe;
list_for_each(tmp, &cc->cc_rwctxts) {
@@ -425,6 +428,7 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
*
* Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Write chunk,
+ * %-EINVAL if client provided too many segments,
* %-ENOMEM if rdma_rw context pool was exhausted,
* %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
@@ -465,6 +469,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
*
* Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Reply chunk,
+ * %-EINVAL if client provided too many segments,
* %-ENOMEM if rdma_rw context pool was exhausted,
* %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 5ba6d91..19fd01e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -684,7 +684,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
return 0;

err2:
- if (ret != -E2BIG)
+ if (ret != -E2BIG || ret != -EINVAL)
goto err1;

ret = svc_rdma_post_recv(rdma, GFP_KERNEL);


2017-06-23 21:17:37

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 04/18] svcrdma: Remove svc_rdma_marshal.c

svc_rdma_marshal.c has one remaining exported function --
svc_rdma_xdr_decode_req -- and it has a single call site. Take
the same approach as the sendto path, and move this function
into the source file where it is called.

This is a refactoring change only.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 3 -
net/sunrpc/xprtrdma/Makefile | 4 -
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 168 -------------------------------
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 126 +++++++++++++++++++++++
4 files changed, 128 insertions(+), 173 deletions(-)
delete mode 100644 net/sunrpc/xprtrdma/svc_rdma_marshal.c

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index f3787d8..3ca9916 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -185,9 +185,6 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
__be32 *rdma_resp,
struct xdr_buf *rcvbuf);

-/* svc_rdma_marshal.c */
-extern int svc_rdma_xdr_decode_req(struct xdr_buf *);
-
/* svc_rdma_recvfrom.c */
extern int svc_rdma_recvfrom(struct svc_rqst *);
extern int rdma_read_chunk_lcl(struct svcxprt_rdma *, struct svc_rqst *,
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index c1ae814..b8213dd 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -3,6 +3,6 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
rpcrdma-y := transport.o rpc_rdma.o verbs.o \
fmr_ops.o frwr_ops.o \
svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
- svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
- svc_rdma_rw.o module.o
+ svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \
+ module.o
rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
deleted file mode 100644
index bdcf7d8..0000000
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright (c) 2016 Oracle. All rights reserved.
- * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
- *
- * This software is available to you under a choice of one of two
- * licenses. You may choose to be licensed under the terms of the GNU
- * General Public License (GPL) Version 2, available from the file
- * COPYING in the main directory of this source tree, or the BSD-type
- * license below:
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- *
- * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided
- * with the distribution.
- *
- * Neither the name of the Network Appliance, Inc. nor the names of
- * its contributors may be used to endorse or promote products
- * derived from this software without specific prior written
- * permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * Author: Tom Tucker <[email protected]>
- */
-
-#include <linux/sunrpc/xdr.h>
-#include <linux/sunrpc/debug.h>
-#include <asm/unaligned.h>
-#include <linux/sunrpc/rpc_rdma.h>
-#include <linux/sunrpc/svc_rdma.h>
-
-#define RPCDBG_FACILITY RPCDBG_SVCXPRT
-
-static __be32 *xdr_check_read_list(__be32 *p, __be32 *end)
-{
- __be32 *next;
-
- while (*p++ != xdr_zero) {
- next = p + rpcrdma_readchunk_maxsz - 1;
- if (next > end)
- return NULL;
- p = next;
- }
- return p;
-}
-
-static __be32 *xdr_check_write_list(__be32 *p, __be32 *end)
-{
- __be32 *next;
-
- while (*p++ != xdr_zero) {
- next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
- if (next > end)
- return NULL;
- p = next;
- }
- return p;
-}
-
-static __be32 *xdr_check_reply_chunk(__be32 *p, __be32 *end)
-{
- __be32 *next;
-
- if (*p++ != xdr_zero) {
- next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
- if (next > end)
- return NULL;
- p = next;
- }
- return p;
-}
-
-/**
- * svc_rdma_xdr_decode_req - Parse incoming RPC-over-RDMA header
- * @rq_arg: Receive buffer
- *
- * On entry, xdr->head[0].iov_base points to first byte in the
- * RPC-over-RDMA header.
- *
- * On successful exit, head[0] points to first byte past the
- * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
- * The length of the RPC-over-RDMA header is returned.
- */
-int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
-{
- __be32 *p, *end, *rdma_argp;
- unsigned int hdr_len;
-
- /* Verify that there's enough bytes for header + something */
- if (rq_arg->len <= RPCRDMA_HDRLEN_ERR)
- goto out_short;
-
- rdma_argp = rq_arg->head[0].iov_base;
- if (*(rdma_argp + 1) != rpcrdma_version)
- goto out_version;
-
- switch (*(rdma_argp + 3)) {
- case rdma_msg:
- case rdma_nomsg:
- break;
-
- case rdma_done:
- goto out_drop;
-
- case rdma_error:
- goto out_drop;
-
- default:
- goto out_proc;
- }
-
- end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len);
- p = xdr_check_read_list(rdma_argp + 4, end);
- if (!p)
- goto out_inval;
- p = xdr_check_write_list(p, end);
- if (!p)
- goto out_inval;
- p = xdr_check_reply_chunk(p, end);
- if (!p)
- goto out_inval;
- if (p > end)
- goto out_inval;
-
- rq_arg->head[0].iov_base = p;
- hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
- rq_arg->head[0].iov_len -= hdr_len;
- return hdr_len;
-
-out_short:
- dprintk("svcrdma: header too short = %d\n", rq_arg->len);
- return -EINVAL;
-
-out_version:
- dprintk("svcrdma: bad xprt version: %u\n",
- be32_to_cpup(rdma_argp + 1));
- return -EPROTONOSUPPORT;
-
-out_drop:
- dprintk("svcrdma: dropping RDMA_DONE/ERROR message\n");
- return 0;
-
-out_proc:
- dprintk("svcrdma: bad rdma procedure (%u)\n",
- be32_to_cpup(rdma_argp + 3));
- return -EINVAL;
-
-out_inval:
- dprintk("svcrdma: failed to parse transport header\n");
- return -EINVAL;
-}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 27a99bf..55ad335 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (c) 2016, 2017 Oracle. All rights reserved.
* Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
*
@@ -40,6 +41,7 @@
* Author: Tom Tucker <[email protected]>
*/

+#include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/debug.h>
#include <linux/sunrpc/rpc_rdma.h>
#include <linux/spinlock.h>
@@ -115,6 +117,130 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
rqstp->rq_arg.tail[0].iov_len = 0;
}

+static __be32 *xdr_check_read_list(__be32 *p, __be32 *end)
+{
+ __be32 *next;
+
+ while (*p++ != xdr_zero) {
+ next = p + rpcrdma_readchunk_maxsz - 1;
+ if (next > end)
+ return NULL;
+ p = next;
+ }
+ return p;
+}
+
+static __be32 *xdr_check_write_list(__be32 *p, __be32 *end)
+{
+ __be32 *next;
+
+ while (*p++ != xdr_zero) {
+ next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
+ if (next > end)
+ return NULL;
+ p = next;
+ }
+ return p;
+}
+
+static __be32 *xdr_check_reply_chunk(__be32 *p, __be32 *end)
+{
+ __be32 *next;
+
+ if (*p++ != xdr_zero) {
+ next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
+ if (next > end)
+ return NULL;
+ p = next;
+ }
+ return p;
+}
+
+/* On entry, xdr->head[0].iov_base points to first byte in the
+ * RPC-over-RDMA header.
+ *
+ * On successful exit, head[0] points to first byte past the
+ * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
+ * The length of the RPC-over-RDMA header is returned.
+ *
+ * Assumptions:
+ * - The transport header is entirely contained in the head iovec.
+ */
+static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
+{
+ __be32 *p, *end, *rdma_argp;
+ unsigned int hdr_len;
+ char *proc;
+
+ /* Verify that there's enough bytes for header + something */
+ if (rq_arg->len <= RPCRDMA_HDRLEN_ERR)
+ goto out_short;
+
+ rdma_argp = rq_arg->head[0].iov_base;
+ if (*(rdma_argp + 1) != rpcrdma_version)
+ goto out_version;
+
+ switch (*(rdma_argp + 3)) {
+ case rdma_msg:
+ proc = "RDMA_MSG";
+ break;
+ case rdma_nomsg:
+ proc = "RDMA_NOMSG";
+ break;
+
+ case rdma_done:
+ goto out_drop;
+
+ case rdma_error:
+ goto out_drop;
+
+ default:
+ goto out_proc;
+ }
+
+ end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len);
+ p = xdr_check_read_list(rdma_argp + 4, end);
+ if (!p)
+ goto out_inval;
+ p = xdr_check_write_list(p, end);
+ if (!p)
+ goto out_inval;
+ p = xdr_check_reply_chunk(p, end);
+ if (!p)
+ goto out_inval;
+ if (p > end)
+ goto out_inval;
+
+ rq_arg->head[0].iov_base = p;
+ hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
+ rq_arg->head[0].iov_len -= hdr_len;
+ dprintk("svcrdma: received %s request for XID 0x%08x, hdr_len=%u\n",
+ proc, be32_to_cpup(rdma_argp), hdr_len);
+ return hdr_len;
+
+out_short:
+ dprintk("svcrdma: header too short = %d\n", rq_arg->len);
+ return -EINVAL;
+
+out_version:
+ dprintk("svcrdma: bad xprt version: %u\n",
+ be32_to_cpup(rdma_argp + 1));
+ return -EPROTONOSUPPORT;
+
+out_drop:
+ dprintk("svcrdma: dropping RDMA_DONE/ERROR message\n");
+ return 0;
+
+out_proc:
+ dprintk("svcrdma: bad rdma procedure (%u)\n",
+ be32_to_cpup(rdma_argp + 3));
+ return -EINVAL;
+
+out_inval:
+ dprintk("svcrdma: failed to parse transport header\n");
+ return -EINVAL;
+}
+
/* Issue an RDMA_READ using the local lkey to map the data sink */
int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
struct svc_rqst *rqstp,


2017-06-23 21:17:53

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 06/18] svcrdma: Improve Write chunk sanity checking

Identify malformed transport headers and unsupported chunk
combinations as early as possible.

- Reject RPC-over-RDMA messages that contain more than one Write
chunk, since this implementation does not support more than one per
message.

- Ensure that segment lengths are not crazy.

- Ensure that the chunk's segment count is not crazy.

With a 1KB inline threshold, the largest number of Write segments
that can be conveyed is about 60 (for a RDMA_NOMSG Reply message).

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 52 ++++++++++++++++++++++++++++---
1 file changed, 47 insertions(+), 5 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 885ad95..cf8be18 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -117,6 +117,11 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
rqstp->rq_arg.tail[0].iov_len = 0;
}

+/* This accommodates the largest possible Write chunk,
+ * in one segment.
+ */
+#define MAX_BYTES_WRITE_SEG ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT))
+
/* This accommodates the largest possible Position-Zero
* Read chunk or Reply chunk, in one segment.
*/
@@ -162,15 +167,52 @@ static __be32 *xdr_check_read_list(__be32 *p, const __be32 *end)
return p;
}

-static __be32 *xdr_check_write_list(__be32 *p, __be32 *end)
+/* The segment count is limited to how many segments can
+ * fit in the transport header without overflowing the
+ * buffer. That's about 60 Write segments for a 1KB inline
+ * threshold.
+ */
+static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
+ u32 maxlen)
{
- __be32 *next;
+ u32 i, segcount;
+
+ segcount = be32_to_cpup(p++);
+ for (i = 0; i < segcount; i++) {
+ p++; /* handle */
+ if (be32_to_cpup(p++) > maxlen)
+ return NULL;
+ p += 2; /* offset */
+
+ if (p > end)
+ return NULL;
+ }
+
+ return p;
+}

+/* Sanity check the Write list.
+ *
+ * Implementation limits:
+ * - This implementation supports only one Write chunk.
+ *
+ * Sanity checks:
+ * - Write list does not overflow buffer.
+ * - Segment size limited by largest NFS data payload.
+ *
+ * Returns pointer to the following Reply chunk.
+ */
+static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end)
+{
+ u32 chcount;
+
+ chcount = 0;
while (*p++ != xdr_zero) {
- next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
- if (next > end)
+ p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
+ if (!p)
+ return NULL;
+ if (chcount++ > 1)
return NULL;
- p = next;
}
return p;
}


2017-06-23 21:17:45

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 05/18] svcrdma: Improve Read chunk sanity checking

Identify malformed transport headers and unsupported chunk
combinations as early as possible.

- Reject RPC-over-RDMA messages that contain more than one Read chunk,
since this implementation currently does not support more than one
per RPC transaction.

- Ensure that segment lengths are not crazy.

- Remove the segment count check. With a 1KB inline threshold, the
largest number of Read segments that can be conveyed is about 40
(for a RDMA_NOMSG Call message). This is nowhere near
RPCSVC_MAXPAGES. As far as I can tell, that was just a sanity
check and does not enforce an implementation limit.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 55 +++++++++++++++++++++----------
1 file changed, 37 insertions(+), 18 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 55ad335..885ad95 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -117,15 +117,47 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
rqstp->rq_arg.tail[0].iov_len = 0;
}

-static __be32 *xdr_check_read_list(__be32 *p, __be32 *end)
+/* This accommodates the largest possible Position-Zero
+ * Read chunk or Reply chunk, in one segment.
+ */
+#define MAX_BYTES_SPECIAL_SEG ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT))
+
+/* Sanity check the Read list.
+ *
+ * Implementation limits:
+ * - This implementation supports only one Read chunk.
+ *
+ * Sanity checks:
+ * - Read list does not overflow buffer.
+ * - Segment size limited by largest NFS data payload.
+ *
+ * The segment count is limited to how many segments can
+ * fit in the transport header without overflowing the
+ * buffer. That's about 40 Read segments for a 1KB inline
+ * threshold.
+ *
+ * Returns pointer to the following Write list.
+ */
+static __be32 *xdr_check_read_list(__be32 *p, const __be32 *end)
{
- __be32 *next;
+ u32 position;
+ bool first;

+ first = true;
while (*p++ != xdr_zero) {
- next = p + rpcrdma_readchunk_maxsz - 1;
- if (next > end)
+ if (first) {
+ position = be32_to_cpup(p++);
+ first = false;
+ } else if (be32_to_cpup(p++) != position) {
+ return NULL;
+ }
+ p++; /* handle */
+ if (be32_to_cpup(p++) > MAX_BYTES_SPECIAL_SEG)
+ return NULL;
+ p += 2; /* offset */
+
+ if (p > end)
return NULL;
- p = next;
}
return p;
}
@@ -478,16 +510,6 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
return ret;
}

-static unsigned int
-rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch)
-{
- unsigned int count;
-
- for (count = 0; ch->rc_discrim != xdr_zero; ch++)
- count++;
- return count;
-}
-
/* If there was additional inline content, append it to the end of arg.pages.
* Tail copy has to be done after the reader function has determined how many
* pages are needed for RDMA READ.
@@ -567,9 +589,6 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
if (!ch)
return 0;

- if (rdma_rcl_chunk_count(ch) > RPCSVC_MAXPAGES)
- return -EINVAL;
-
/* The request is completed when the RDMA_READs complete. The
* head context keeps all the pages that comprise the
* request.


2017-06-23 21:18:10

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 08/18] svcrdma: Don't account for Receive queue "starvation"

>From what I can tell, calling ->recvfrom when there is no work to do
is a normal part of operation. This is the only way svc_recv can
tell when there is no more data ready to receive on the transport.

Neither the TCP nor the UDP transport implementations have a
"starve" metric.

The cost of receive starvation accounting is bumping an atomic, which
results in extra (IMO unnecessary) bus traffic between CPU sockets,
while holding a spin lock.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 21 ++++++---------------
1 file changed, 6 insertions(+), 15 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index b480893..1452bd0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -844,9 +844,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svc_xprt *xprt = rqstp->rq_xprt;
struct svcxprt_rdma *rdma_xprt =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
- struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct svc_rdma_op_ctxt *ctxt;
struct rpcrdma_msg *rmsgp;
- int ret = 0;
+ int ret;

dprintk("svcrdma: rqstp=%p\n", rqstp);

@@ -863,21 +863,13 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svc_rdma_op_ctxt, list);
list_del(&ctxt->list);
} else {
- atomic_inc(&rdma_stat_rq_starve);
+ /* No new incoming requests, terminate the loop */
clear_bit(XPT_DATA, &xprt->xpt_flags);
- ctxt = NULL;
+ spin_unlock(&rdma_xprt->sc_rq_dto_lock);
+ return 0;
}
spin_unlock(&rdma_xprt->sc_rq_dto_lock);
- if (!ctxt) {
- /* This is the EAGAIN path. The svc_recv routine will
- * return -EAGAIN, the nfsd thread will go to call into
- * svc_recv again and we shouldn't be on the active
- * transport list
- */
- if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
- goto defer;
- goto out;
- }
+
dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p\n",
ctxt, rdma_xprt, rqstp);
atomic_inc(&rdma_stat_recv);
@@ -920,7 +912,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
+ rqstp->rq_arg.page_len
+ rqstp->rq_arg.tail[0].iov_len;
svc_rdma_put_context(ctxt, 0);
- out:
dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
"rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
ret, rqstp->rq_arg.len,


2017-06-23 21:18:23

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 09/18] sunrpc: Allocate one more page per svc_rqst

svcrdma needs 259 pages for 1MB NFSv4.0 READ requests:

- 1 page for the transport header and head iovec
- 256 pages for the data payload
- 1 page for the trailing GETATTR request (since NFSD XDR decoding
does not look for a tail iovec, the GETATTR is stuck at the end
of the rqstp->rq_arg.pages list)
- 1 page for building the reply xdr_buf

Note that RPCSVC_MAXPAGES is defined as:

((RPCSVC_MAXPAYLOAD+PAGE_SIZE-1)/PAGE_SIZE + 2 + 1)

I don't understand why the "+PAGE_SIZE-1" is in there, because
division will always round the result down to

(RPCSVC_MAXPAYLOAD / PAGE_SIZE + 2 + 1)

Let's remove the "-1" to get 260 pages maximum.

Then svc_alloc_args() needs to ask for pages according to this
same formula, otherwise it will never allocate enough.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc.h | 3 +--
net/sunrpc/svc_xprt.c | 8 +++++---
2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 11cef5a..35c274f 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -176,8 +176,7 @@ static inline void svc_get(struct svc_serv *serv)
* We using ->sendfile to return read data, we might need one extra page
* if the request is not page-aligned. So add another '1'.
*/
-#define RPCSVC_MAXPAGES ((RPCSVC_MAXPAYLOAD+PAGE_SIZE-1)/PAGE_SIZE \
- + 2 + 1)
+#define RPCSVC_MAXPAGES ((RPCSVC_MAXPAYLOAD + PAGE_SIZE) / PAGE_SIZE + 2 + 1)

static inline u32 svc_getnl(struct kvec *iov)
{
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 7bfe1fb..9bd484d 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -659,11 +659,13 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
int i;

/* now allocate needed pages. If we get a failure, sleep briefly */
- pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
- WARN_ON_ONCE(pages >= RPCSVC_MAXPAGES);
- if (pages >= RPCSVC_MAXPAGES)
+ pages = (serv->sv_max_mesg + (2 * PAGE_SIZE)) >> PAGE_SHIFT;
+ if (pages >= RPCSVC_MAXPAGES) {
+ pr_warn_once("svc: warning: pages=%u >= RPCSVC_MAXPAGES=%lu\n",
+ pages, RPCSVC_MAXPAGES);
/* use as many pages as possible */
pages = RPCSVC_MAXPAGES - 1;
+ }
for (i = 0; i < pages ; i++)
while (rqstp->rq_pages[i] == NULL) {
struct page *p = alloc_page(GFP_KERNEL);


2017-06-23 21:18:01

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 07/18] svcrdma: Improve Reply chunk sanity checking

Identify malformed transport headers and unsupported chunk
combinations as early as possible.

- Ensure that segment lengths are not crazy.

- Ensure that the Reply chunk's segment count is not crazy.

With a 1KB inline threshold, the largest number of Write segments
that can be conveyed is about 60 (for a RDMA_NOMSG Reply message).

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index cf8be18..b480893 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -217,15 +217,20 @@ static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end)
return p;
}

-static __be32 *xdr_check_reply_chunk(__be32 *p, __be32 *end)
+/* Sanity check the Reply chunk.
+ *
+ * Sanity checks:
+ * - Reply chunk does not overflow buffer.
+ * - Segment size limited by largest NFS data payload.
+ *
+ * Returns pointer to the following RPC header.
+ */
+static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end)
{
- __be32 *next;
-
if (*p++ != xdr_zero) {
- next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
- if (next > end)
+ p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
+ if (!p)
return NULL;
- p = next;
}
return p;
}


2017-06-23 21:18:27

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 10/18] svcrdma: Add recvfrom helpers to svc_rdma_rw.c

svc_rdma_rw.c already contains helpers for the sendto path.
Introduce helpers for the recvfrom path.

The plan is to replace the local NFSD bespoke code that constructs
and posts RDMA Read Work Requests with calls to the rdma_rw API.
This shares code with other RDMA-enabled ULPs that manages the gory
details of buffer registration and posting Work Requests.

This new code also puts all RDMA_NOMSG-specific logic in one place.

Lastly, the use of rqstp->rq_arg.pages is deprecated in favor of
using rqstp->rq_pages directly, for clarity.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 3
net/sunrpc/xprtrdma/svc_rdma_rw.c | 425 +++++++++++++++++++++++++++++++++++++
2 files changed, 427 insertions(+), 1 deletion(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 3ca9916..cf5d541 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -196,6 +196,9 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,

/* svc_rdma_rw.c */
extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
+extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head, __be32 *p);
extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
__be32 *wr_ch, struct xdr_buf *xdr);
extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 3b35364..77d38a4 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -12,6 +12,9 @@

#define RPCDBG_FACILITY RPCDBG_SVCXPRT

+static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
+static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
+
/* Each R/W context contains state for one chain of RDMA Read or
* Write Work Requests.
*
@@ -177,6 +180,7 @@ struct svc_rdma_write_info {
info->wi_nsegs = be32_to_cpup(++chunk);
info->wi_segs = ++chunk;
svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE);
+ info->wi_cc.cc_cqe.done = svc_rdma_write_done;
return info;
}

@@ -216,6 +220,76 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
svc_rdma_write_info_free(info);
}

+/* State for pulling a Read chunk.
+ */
+struct svc_rdma_read_info {
+ struct svc_rdma_op_ctxt *ri_readctxt;
+ unsigned int ri_position;
+ unsigned int ri_pageno;
+ unsigned int ri_pageoff;
+ unsigned int ri_chunklen;
+
+ struct svc_rdma_chunk_ctxt ri_cc;
+};
+
+static struct svc_rdma_read_info *
+svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_read_info *info;
+
+ info = kmalloc(sizeof(*info), GFP_KERNEL);
+ if (!info)
+ return info;
+
+ svc_rdma_cc_init(rdma, &info->ri_cc, DMA_FROM_DEVICE);
+ info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
+ return info;
+}
+
+static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
+{
+ svc_rdma_cc_release(&info->ri_cc);
+ kfree(info);
+}
+
+/**
+ * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion
+ *
+ */
+static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_chunk_ctxt *cc =
+ container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
+ struct svcxprt_rdma *rdma = cc->cc_rdma;
+ struct svc_rdma_read_info *info =
+ container_of(cc, struct svc_rdma_read_info, ri_cc);
+
+ atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+ wake_up(&rdma->sc_send_wait);
+
+ if (unlikely(wc->status != IB_WC_SUCCESS)) {
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ svc_rdma_put_context(info->ri_readctxt, 1);
+ } else {
+ spin_lock(&rdma->sc_rq_dto_lock);
+ list_add_tail(&info->ri_readctxt->list,
+ &rdma->sc_read_complete_q);
+ spin_unlock(&rdma->sc_rq_dto_lock);
+
+ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&rdma->sc_xprt);
+ }
+
+ svc_rdma_read_info_free(info);
+}
+
/* This function sleeps when the transport's Send Queue is congested.
*
* Assumptions:
@@ -335,7 +409,6 @@ static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
__be32 *seg;
int ret;

- cc->cc_cqe.done = svc_rdma_write_done;
seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
do {
unsigned int write_len;
@@ -515,3 +588,353 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
svc_rdma_write_info_free(info);
return ret;
}
+
+static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
+ struct svc_rqst *rqstp,
+ u32 rkey, u32 len, u64 offset)
+{
+ struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+ struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
+ struct svc_rdma_rw_ctxt *ctxt;
+ unsigned int sge_no, seg_len;
+ struct scatterlist *sg;
+ int ret;
+
+ sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
+ ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
+ if (!ctxt)
+ goto out_noctx;
+ ctxt->rw_nents = sge_no;
+
+ dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x (%u sges)\n",
+ len, offset, rkey, sge_no);
+
+ sg = ctxt->rw_sg_table.sgl;
+ for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
+ seg_len = min_t(unsigned int, len,
+ PAGE_SIZE - info->ri_pageoff);
+
+ head->arg.pages[info->ri_pageno] =
+ rqstp->rq_pages[info->ri_pageno];
+ if (!info->ri_pageoff)
+ head->count++;
+
+ sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
+ seg_len, info->ri_pageoff);
+ sg = sg_next(sg);
+
+ info->ri_pageoff += seg_len;
+ if (info->ri_pageoff == PAGE_SIZE) {
+ info->ri_pageno++;
+ info->ri_pageoff = 0;
+ }
+ len -= seg_len;
+
+ /* Safety check */
+ if (len &&
+ &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
+ goto out_overrun;
+ }
+
+ ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp,
+ cc->cc_rdma->sc_port_num,
+ ctxt->rw_sg_table.sgl, ctxt->rw_nents,
+ 0, offset, rkey, DMA_FROM_DEVICE);
+ if (ret < 0)
+ goto out_initerr;
+
+ list_add(&ctxt->rw_list, &cc->cc_rwctxts);
+ cc->cc_sqecount += ret;
+ return 0;
+
+out_noctx:
+ dprintk("svcrdma: no R/W ctxs available\n");
+ return -ENOMEM;
+
+out_overrun:
+ dprintk("svcrdma: request overruns rq_pages\n");
+ return -EINVAL;
+
+out_initerr:
+ svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt);
+ pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
+ return -EIO;
+}
+
+static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
+ struct svc_rdma_read_info *info,
+ __be32 *p)
+{
+ int ret;
+
+ info->ri_chunklen = 0;
+ while (*p++ != xdr_zero) {
+ u32 rs_handle, rs_length;
+ u64 rs_offset;
+
+ if (be32_to_cpup(p++) != info->ri_position)
+ break;
+ rs_handle = be32_to_cpup(p++);
+ rs_length = be32_to_cpup(p++);
+ p = xdr_decode_hyper(p, &rs_offset);
+
+ ret = svc_rdma_build_read_segment(info, rqstp,
+ rs_handle, rs_length,
+ rs_offset);
+ if (ret < 0)
+ break;
+
+ info->ri_chunklen += rs_length;
+ }
+
+ return ret;
+}
+
+/* If there is inline content following the Read chunk, append it to
+ * the page list immediately following the data payload. This has to
+ * be done after the reader function has determined how many pages
+ * were consumed for RDMA Read.
+ *
+ * On entry, ri_pageno and ri_pageoff point directly to the end of the
+ * page list. On exit, both have been updated to the new "next byte".
+ *
+ * Assumptions:
+ * - Inline content fits entirely in rq_pages[0]
+ * - Trailing content is only a handful of bytes
+ */
+static int svc_rdma_copy_tail(struct svc_rqst *rqstp,
+ struct svc_rdma_read_info *info)
+{
+ struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+ unsigned int tail_length, remaining;
+ u8 *srcp, *destp;
+
+ /* Assert that all inline content fits in page 0. This is an
+ * implementation limit, not a protocol limit.
+ */
+ if (head->arg.head[0].iov_len > PAGE_SIZE) {
+ pr_warn_once("svcrdma: too much trailing inline content\n");
+ return -EINVAL;
+ }
+
+ srcp = head->arg.head[0].iov_base;
+ srcp += info->ri_position;
+ tail_length = head->arg.head[0].iov_len - info->ri_position;
+ remaining = tail_length;
+
+ /* If there is room on the last page in the page list, try to
+ * fit the trailing content there.
+ */
+ if (info->ri_pageoff > 0) {
+ unsigned int len;
+
+ len = min_t(unsigned int, remaining,
+ PAGE_SIZE - info->ri_pageoff);
+ destp = page_address(rqstp->rq_pages[info->ri_pageno]);
+ destp += info->ri_pageoff;
+
+ memcpy(destp, srcp, len);
+ srcp += len;
+ destp += len;
+ info->ri_pageoff += len;
+ remaining -= len;
+
+ if (info->ri_pageoff == PAGE_SIZE) {
+ info->ri_pageno++;
+ info->ri_pageoff = 0;
+ }
+ }
+
+ /* Otherwise, a fresh page is needed. */
+ if (remaining) {
+ head->arg.pages[info->ri_pageno] =
+ rqstp->rq_pages[info->ri_pageno];
+ head->count++;
+
+ destp = page_address(rqstp->rq_pages[info->ri_pageno]);
+ memcpy(destp, srcp, remaining);
+ info->ri_pageoff += remaining;
+ }
+
+ head->arg.page_len += tail_length;
+ head->arg.len += tail_length;
+ head->arg.buflen += tail_length;
+ return 0;
+}
+
+/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
+ * data lands in the page list of head->arg.pages.
+ *
+ * Currently NFSD does not look at the head->arg.tail[0] iovec.
+ * Therefore, XDR round-up of the Read chunk and trailing
+ * inline content must both be added at the end of the pagelist.
+ */
+static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
+ struct svc_rdma_read_info *info,
+ __be32 *p)
+{
+ struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+ int ret;
+
+ dprintk("svcrdma: Reading Read chunk at position %u\n",
+ info->ri_position);
+
+ info->ri_pageno = head->hdr_count;
+ info->ri_pageoff = 0;
+
+ ret = svc_rdma_build_read_chunk(rqstp, info, p);
+ if (ret < 0)
+ goto out;
+
+ /* Read chunk may need XDR round-up (see RFC 5666, s. 3.7).
+ */
+ if (info->ri_chunklen & 3) {
+ u32 padlen = 4 - (info->ri_chunklen & 3);
+
+ info->ri_chunklen += padlen;
+
+ /* NB: data payload always starts on XDR alignment,
+ * thus the pad can never contain a page boundary.
+ */
+ info->ri_pageoff += padlen;
+ if (info->ri_pageoff == PAGE_SIZE) {
+ info->ri_pageno++;
+ info->ri_pageoff = 0;
+ }
+ }
+
+ head->arg.page_len = info->ri_chunklen;
+ head->arg.len += info->ri_chunklen;
+ head->arg.buflen += info->ri_chunklen;
+
+ if (info->ri_position < head->arg.head[0].iov_len) {
+ ret = svc_rdma_copy_tail(rqstp, info);
+ if (ret < 0)
+ goto out;
+ }
+ head->arg.head[0].iov_len = info->ri_position;
+
+out:
+ return ret;
+}
+
+/* Construct RDMA Reads to pull over a Position Zero Read chunk.
+ * The start of the data lands in the first page just after
+ * the Transport header, and the rest lands in the page list of
+ * head->arg.pages.
+ *
+ * Assumptions:
+ * - A PZRC has an XDR-aligned length (no implicit round-up).
+ * - There can be no trailing inline content (IOW, we assume
+ * a PZRC is never sent in an RDMA_MSG message, though it's
+ * allowed by spec).
+ */
+static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
+ struct svc_rdma_read_info *info,
+ __be32 *p)
+{
+ struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+ int ret;
+
+ dprintk("svcrdma: Reading Position Zero Read chunk\n");
+
+ info->ri_pageno = head->hdr_count - 1;
+ info->ri_pageoff = offset_in_page(head->byte_len);
+
+ ret = svc_rdma_build_read_chunk(rqstp, info, p);
+ if (ret < 0)
+ goto out;
+
+ head->arg.len += info->ri_chunklen;
+ head->arg.buflen += info->ri_chunklen;
+
+ if (head->arg.len <= head->sge[0].length) {
+ /* Transport header and RPC message fit entirely
+ * in page where head iovec resides.
+ */
+ head->arg.head[0].iov_len = info->ri_chunklen;
+ } else {
+ /* Transport header and part of RPC message reside
+ * in the head iovec's page.
+ */
+ head->arg.head[0].iov_len =
+ head->sge[0].length - head->byte_len;
+ head->arg.page_len =
+ info->ri_chunklen - head->arg.head[0].iov_len;
+ }
+
+out:
+ return ret;
+}
+
+/**
+ * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
+ * @rdma: controlling RDMA transport
+ * @rqstp: set of pages to use as Read sink buffers
+ * @head: pages under I/O collect here
+ * @p: pointer to start of Read chunk
+ *
+ * Returns:
+ * %0 if all needed RDMA Reads were posted successfully,
+ * %-EINVAL if client provided too many segments,
+ * %-ENOMEM if rdma_rw context pool was exhausted,
+ * %-ENOTCONN if posting failed (connection is lost),
+ * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ *
+ * Assumptions:
+ * - All Read segments in @p have the same Position value.
+ */
+int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head, __be32 *p)
+{
+ struct svc_rdma_read_info *info;
+ struct page **page;
+ int ret;
+
+ /* The request (with page list) is constructed in
+ * head->arg. Pages involved with RDMA Read I/O are
+ * transferred there.
+ */
+ head->hdr_count = head->count;
+ head->arg.head[0] = rqstp->rq_arg.head[0];
+ head->arg.tail[0] = rqstp->rq_arg.tail[0];
+ head->arg.pages = head->pages;
+ head->arg.page_base = 0;
+ head->arg.page_len = 0;
+ head->arg.len = rqstp->rq_arg.len;
+ head->arg.buflen = rqstp->rq_arg.buflen;
+
+ info = svc_rdma_read_info_alloc(rdma);
+ if (!info)
+ return -ENOMEM;
+ info->ri_readctxt = head;
+
+ info->ri_position = be32_to_cpup(p + 1);
+ if (info->ri_position)
+ ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
+ else
+ ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
+
+ /* Mark the start of the pages that can be used for the reply */
+ if (info->ri_pageoff > 0)
+ info->ri_pageno++;
+ rqstp->rq_respages = &rqstp->rq_pages[info->ri_pageno];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+ if (ret < 0)
+ goto out;
+
+ ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
+
+out:
+ /* Read sink pages have been moved from rqstp->rq_pages to
+ * head->arg.pages. Force svc_recv to refill those slots
+ * in rq_pages.
+ */
+ for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++)
+ *page = NULL;
+
+ if (ret < 0)
+ svc_rdma_read_info_free(info);
+ return ret;
+}


2017-06-23 21:18:42

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 12/18] svcrdma: Properly compute .len and .buflen for received RPC Calls

When an RPC-over-RDMA request is received, the Receive buffer
contains a Transport Header possibly followed by an RPC message.

Even though rq_arg.head[0] (as passed to NFSD) does not contain the
Transport Header header, currently rq_arg.len includes the size of
the Transport Header.

That violates the intent of the xdr_buf API contract. .buflen should
include everything, but .len should be exactly the length of the RPC
message in the buffer.

The rq_arg fields are summed together at the end of
svc_rdma_recvfrom to obtain the correct return value. rq_arg.len
really ought to contain the correct number of bytes already, but it
currently doesn't due to the above misbehavior.

Let's instead ensure that .buflen includes the length of the
transport header, and that .len is always equal to head.iov_len +
.page_len + tail.iov_len .

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 14 ++++----------
net/sunrpc/xprtrdma/svc_rdma_rw.c | 2 +-
2 files changed, 5 insertions(+), 11 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 5fbcb73..ad4bd62 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -143,7 +143,6 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
put_page(rqstp->rq_pages[sge_no]);
rqstp->rq_pages[sge_no] = page;
bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
- rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
sge_no++;
}
rqstp->rq_respages = &rqstp->rq_pages[sge_no];
@@ -338,6 +337,7 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
rq_arg->head[0].iov_base = p;
hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
rq_arg->head[0].iov_len -= hdr_len;
+ rq_arg->len -= hdr_len;
dprintk("svcrdma: received %s request for XID 0x%08x, hdr_len=%u\n",
proc, be32_to_cpup(rdma_argp), hdr_len);
return hdr_len;
@@ -564,18 +564,12 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
goto out_readchunk;

complete:
- ret = rqstp->rq_arg.head[0].iov_len
- + rqstp->rq_arg.page_len
- + rqstp->rq_arg.tail[0].iov_len;
svc_rdma_put_context(ctxt, 0);
- dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
- "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
- ret, rqstp->rq_arg.len,
- rqstp->rq_arg.head[0].iov_base,
- rqstp->rq_arg.head[0].iov_len);
+ dprintk("svcrdma: recvfrom: xprt=%p, rqstp=%p, rq_arg.len=%u\n",
+ rdma_xprt, rqstp, rqstp->rq_arg.len);
rqstp->rq_prot = IPPROTO_MAX;
svc_xprt_copy_addrs(rqstp, xprt);
- return ret;
+ return rqstp->rq_arg.len;

out_readchunk:
ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 77d38a4..9d7a151 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -848,7 +848,7 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
head->arg.len += info->ri_chunklen;
head->arg.buflen += info->ri_chunklen;

- if (head->arg.len <= head->sge[0].length) {
+ if (head->arg.buflen <= head->sge[0].length) {
/* Transport header and RPC message fit entirely
* in page where head iovec resides.
*/


2017-06-23 21:18:35

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 11/18] svcrdma: Use generic RDMA R/W API in RPC Call path

The current svcrdma recvfrom code path has a lot of detail about
registration mode and the type of port (iWARP, IB, etc).

Instead, use the RDMA core's generic R/W API. This shares code with
other RDMA-enabled ULPs that manages the gory details of buffer
registration and the posting of RDMA Read Work Requests.

Since the Read list marshaling code is being replaced, I took the
opportunity to replace C structure-based XDR encoding code with more
portable code that uses pointer arithmetic.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 14 -
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 547 ++++++------------------------
net/sunrpc/xprtrdma/svc_rdma_transport.c | 13 -
3 files changed, 106 insertions(+), 468 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index cf5d541..b1ba19b 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -82,10 +82,7 @@ struct svc_rdma_op_ctxt {
int hdr_count;
struct xdr_buf arg;
struct ib_cqe cqe;
- struct ib_cqe reg_cqe;
- struct ib_cqe inv_cqe;
u32 byte_len;
- u32 position;
struct svcxprt_rdma *xprt;
unsigned long flags;
enum dma_data_direction direction;
@@ -116,7 +113,6 @@ struct svcxprt_rdma {
struct list_head sc_accept_q; /* Conn. waiting accept */
int sc_ord; /* RDMA read limit */
int sc_max_sge;
- int sc_max_sge_rd; /* max sge for read target */
bool sc_snd_w_inv; /* OK to use Send With Invalidate */

atomic_t sc_sq_avail; /* SQEs ready to be consumed */
@@ -141,10 +137,6 @@ struct svcxprt_rdma {
struct ib_qp *sc_qp;
struct ib_cq *sc_rq_cq;
struct ib_cq *sc_sq_cq;
- int (*sc_reader)(struct svcxprt_rdma *,
- struct svc_rqst *,
- struct svc_rdma_op_ctxt *,
- int *, u32 *, u32, u32, u64, bool);
u32 sc_dev_caps; /* distilled device caps */
unsigned int sc_frmr_pg_list_len;
struct list_head sc_frmr_q;
@@ -187,12 +179,6 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,

/* svc_rdma_recvfrom.c */
extern int svc_rdma_recvfrom(struct svc_rqst *);
-extern int rdma_read_chunk_lcl(struct svcxprt_rdma *, struct svc_rqst *,
- struct svc_rdma_op_ctxt *, int *, u32 *,
- u32, u32, u64, bool);
-extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
- struct svc_rdma_op_ctxt *, int *, u32 *,
- u32, u32, u64, bool);

/* svc_rdma_rw.c */
extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 1452bd0..5fbcb73 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -41,13 +41,66 @@
* Author: Tom Tucker <[email protected]>
*/

-#include <linux/sunrpc/xdr.h>
-#include <linux/sunrpc/debug.h>
-#include <linux/sunrpc/rpc_rdma.h>
-#include <linux/spinlock.h>
+/* Operation
+ *
+ * The main entry point is svc_rdma_recvfrom. This is called from
+ * svc_recv when the transport indicates there is incoming data to
+ * be read. "Data Ready" is signaled when an RDMA Receive completes,
+ * or when a set of RDMA Reads complete.
+ *
+ * An svc_rqst is passed in. This structure contains an array of
+ * free pages (rq_pages) that will contain the incoming RPC message.
+ *
+ * Short messages are moved directly into svc_rqst::rq_arg, and
+ * the RPC Call is ready to be processed by the Upper Layer.
+ * svc_rdma_recvfrom returns the length of the RPC Call message,
+ * completing the reception of the RPC Call.
+ *
+ * However, when an incoming message has Read chunks,
+ * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's
+ * data payload from the client. svc_rdma_recvfrom sets up the
+ * RDMA Reads using pages in svc_rqst::rq_pages, which are
+ * transferred to an svc_rdma_op_ctxt for the duration of the
+ * I/O. svc_rdma_recvfrom then returns zero, since the RPC message
+ * is still not yet ready.
+ *
+ * When the Read chunk payloads have become available on the
+ * server, "Data Ready" is raised again, and svc_recv calls
+ * svc_rdma_recvfrom again. This second call may use a different
+ * svc_rqst than the first one, thus any information that needs
+ * to be preserved across these two calls is kept in an
+ * svc_rdma_op_ctxt.
+ *
+ * The second call to svc_rdma_recvfrom performs final assembly
+ * of the RPC Call message, using the RDMA Read sink pages kept in
+ * the svc_rdma_op_ctxt. The xdr_buf is copied from the
+ * svc_rdma_op_ctxt to the second svc_rqst. The second call returns
+ * the length of the completed RPC Call message.
+ *
+ * Page Management
+ *
+ * Pages under I/O must be transferred from the first svc_rqst to an
+ * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns.
+ *
+ * The first svc_rqst supplies pages for RDMA Reads. These are moved
+ * from rqstp::rq_pages into ctxt::pages. The consumed elements of
+ * the rq_pages array are set to NULL and refilled with the first
+ * svc_rdma_recvfrom call returns.
+ *
+ * During the second svc_rdma_recvfrom call, RDMA Read sink pages
+ * are transferred from the svc_rdma_op_ctxt to the second svc_rqst
+ * (see rdma_read_complete() below).
+ */
+
#include <asm/unaligned.h>
#include <rdma/ib_verbs.h>
#include <rdma/rdma_cm.h>
+
+#include <linux/spinlock.h>
+
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
#include <linux/sunrpc/svc_rdma.h>

#define RPCDBG_FACILITY RPCDBG_SVCXPRT
@@ -61,7 +114,6 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *ctxt,
u32 byte_count)
{
- struct rpcrdma_msg *rmsgp;
struct page *page;
u32 bc;
int sge_no;
@@ -85,13 +137,6 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
rqstp->rq_arg.page_len = bc;
rqstp->rq_arg.page_base = 0;

- /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
- rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
- if (rmsgp->rm_type == rdma_nomsg)
- rqstp->rq_arg.pages = &rqstp->rq_pages[0];
- else
- rqstp->rq_arg.pages = &rqstp->rq_pages[1];
-
sge_no = 1;
while (bc && sge_no < ctxt->count) {
page = ctxt->pages[sge_no];
@@ -320,395 +365,6 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
return -EINVAL;
}

-/* Issue an RDMA_READ using the local lkey to map the data sink */
-int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head,
- int *page_no,
- u32 *page_offset,
- u32 rs_handle,
- u32 rs_length,
- u64 rs_offset,
- bool last)
-{
- struct ib_rdma_wr read_wr;
- int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
- struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
- int ret, read, pno;
- u32 pg_off = *page_offset;
- u32 pg_no = *page_no;
-
- ctxt->direction = DMA_FROM_DEVICE;
- ctxt->read_hdr = head;
- pages_needed = min_t(int, pages_needed, xprt->sc_max_sge_rd);
- read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset,
- rs_length);
-
- for (pno = 0; pno < pages_needed; pno++) {
- int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
-
- head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
- head->arg.page_len += len;
-
- head->arg.len += len;
- if (!pg_off)
- head->count++;
- rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
- ctxt->sge[pno].addr =
- ib_dma_map_page(xprt->sc_cm_id->device,
- head->arg.pages[pg_no], pg_off,
- PAGE_SIZE - pg_off,
- DMA_FROM_DEVICE);
- ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
- ctxt->sge[pno].addr);
- if (ret)
- goto err;
- svc_rdma_count_mappings(xprt, ctxt);
-
- ctxt->sge[pno].lkey = xprt->sc_pd->local_dma_lkey;
- ctxt->sge[pno].length = len;
- ctxt->count++;
-
- /* adjust offset and wrap to next page if needed */
- pg_off += len;
- if (pg_off == PAGE_SIZE) {
- pg_off = 0;
- pg_no++;
- }
- rs_length -= len;
- }
-
- if (last && rs_length == 0)
- set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
- else
- clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
-
- memset(&read_wr, 0, sizeof(read_wr));
- ctxt->cqe.done = svc_rdma_wc_read;
- read_wr.wr.wr_cqe = &ctxt->cqe;
- read_wr.wr.opcode = IB_WR_RDMA_READ;
- read_wr.wr.send_flags = IB_SEND_SIGNALED;
- read_wr.rkey = rs_handle;
- read_wr.remote_addr = rs_offset;
- read_wr.wr.sg_list = ctxt->sge;
- read_wr.wr.num_sge = pages_needed;
-
- ret = svc_rdma_send(xprt, &read_wr.wr);
- if (ret) {
- pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- goto err;
- }
-
- /* return current location in page array */
- *page_no = pg_no;
- *page_offset = pg_off;
- ret = read;
- atomic_inc(&rdma_stat_read);
- return ret;
- err:
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 0);
- return ret;
-}
-
-/* Issue an RDMA_READ using an FRMR to map the data sink */
-int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head,
- int *page_no,
- u32 *page_offset,
- u32 rs_handle,
- u32 rs_length,
- u64 rs_offset,
- bool last)
-{
- struct ib_rdma_wr read_wr;
- struct ib_send_wr inv_wr;
- struct ib_reg_wr reg_wr;
- u8 key;
- int nents = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
- struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
- struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
- int ret, read, pno, dma_nents, n;
- u32 pg_off = *page_offset;
- u32 pg_no = *page_no;
-
- if (IS_ERR(frmr))
- return -ENOMEM;
-
- ctxt->direction = DMA_FROM_DEVICE;
- ctxt->frmr = frmr;
- nents = min_t(unsigned int, nents, xprt->sc_frmr_pg_list_len);
- read = min_t(int, (nents << PAGE_SHIFT) - *page_offset, rs_length);
-
- frmr->direction = DMA_FROM_DEVICE;
- frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
- frmr->sg_nents = nents;
-
- for (pno = 0; pno < nents; pno++) {
- int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
-
- head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
- head->arg.page_len += len;
- head->arg.len += len;
- if (!pg_off)
- head->count++;
-
- sg_set_page(&frmr->sg[pno], rqstp->rq_arg.pages[pg_no],
- len, pg_off);
-
- rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
-
- /* adjust offset and wrap to next page if needed */
- pg_off += len;
- if (pg_off == PAGE_SIZE) {
- pg_off = 0;
- pg_no++;
- }
- rs_length -= len;
- }
-
- if (last && rs_length == 0)
- set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
- else
- clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
-
- dma_nents = ib_dma_map_sg(xprt->sc_cm_id->device,
- frmr->sg, frmr->sg_nents,
- frmr->direction);
- if (!dma_nents) {
- pr_err("svcrdma: failed to dma map sg %p\n",
- frmr->sg);
- return -ENOMEM;
- }
-
- n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE);
- if (unlikely(n != frmr->sg_nents)) {
- pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n",
- frmr->mr, n, frmr->sg_nents);
- return n < 0 ? n : -EINVAL;
- }
-
- /* Bump the key */
- key = (u8)(frmr->mr->lkey & 0x000000FF);
- ib_update_fast_reg_key(frmr->mr, ++key);
-
- ctxt->sge[0].addr = frmr->mr->iova;
- ctxt->sge[0].lkey = frmr->mr->lkey;
- ctxt->sge[0].length = frmr->mr->length;
- ctxt->count = 1;
- ctxt->read_hdr = head;
-
- /* Prepare REG WR */
- ctxt->reg_cqe.done = svc_rdma_wc_reg;
- reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
- reg_wr.wr.opcode = IB_WR_REG_MR;
- reg_wr.wr.send_flags = IB_SEND_SIGNALED;
- reg_wr.wr.num_sge = 0;
- reg_wr.mr = frmr->mr;
- reg_wr.key = frmr->mr->lkey;
- reg_wr.access = frmr->access_flags;
- reg_wr.wr.next = &read_wr.wr;
-
- /* Prepare RDMA_READ */
- memset(&read_wr, 0, sizeof(read_wr));
- ctxt->cqe.done = svc_rdma_wc_read;
- read_wr.wr.wr_cqe = &ctxt->cqe;
- read_wr.wr.send_flags = IB_SEND_SIGNALED;
- read_wr.rkey = rs_handle;
- read_wr.remote_addr = rs_offset;
- read_wr.wr.sg_list = ctxt->sge;
- read_wr.wr.num_sge = 1;
- if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
- read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
- read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
- } else {
- read_wr.wr.opcode = IB_WR_RDMA_READ;
- read_wr.wr.next = &inv_wr;
- /* Prepare invalidate */
- memset(&inv_wr, 0, sizeof(inv_wr));
- ctxt->inv_cqe.done = svc_rdma_wc_inv;
- inv_wr.wr_cqe = &ctxt->inv_cqe;
- inv_wr.opcode = IB_WR_LOCAL_INV;
- inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
- inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
- }
-
- /* Post the chain */
- ret = svc_rdma_send(xprt, &reg_wr.wr);
- if (ret) {
- pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- goto err;
- }
-
- /* return current location in page array */
- *page_no = pg_no;
- *page_offset = pg_off;
- ret = read;
- atomic_inc(&rdma_stat_read);
- return ret;
- err:
- svc_rdma_put_context(ctxt, 0);
- svc_rdma_put_frmr(xprt, frmr);
- return ret;
-}
-
-/* If there was additional inline content, append it to the end of arg.pages.
- * Tail copy has to be done after the reader function has determined how many
- * pages are needed for RDMA READ.
- */
-static int
-rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head,
- u32 position, u32 byte_count, u32 page_offset, int page_no)
-{
- char *srcp, *destp;
-
- srcp = head->arg.head[0].iov_base + position;
- byte_count = head->arg.head[0].iov_len - position;
- if (byte_count > PAGE_SIZE) {
- dprintk("svcrdma: large tail unsupported\n");
- return 0;
- }
-
- /* Fit as much of the tail on the current page as possible */
- if (page_offset != PAGE_SIZE) {
- destp = page_address(rqstp->rq_arg.pages[page_no]);
- destp += page_offset;
- while (byte_count--) {
- *destp++ = *srcp++;
- page_offset++;
- if (page_offset == PAGE_SIZE && byte_count)
- goto more;
- }
- goto done;
- }
-
-more:
- /* Fit the rest on the next page */
- page_no++;
- destp = page_address(rqstp->rq_arg.pages[page_no]);
- while (byte_count--)
- *destp++ = *srcp++;
-
- rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
-
-done:
- byte_count = head->arg.head[0].iov_len - position;
- head->arg.page_len += byte_count;
- head->arg.len += byte_count;
- head->arg.buflen += byte_count;
- return 1;
-}
-
-/* Returns the address of the first read chunk or <nul> if no read chunk
- * is present
- */
-static struct rpcrdma_read_chunk *
-svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp)
-{
- struct rpcrdma_read_chunk *ch =
- (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
-
- if (ch->rc_discrim == xdr_zero)
- return NULL;
- return ch;
-}
-
-static int rdma_read_chunks(struct svcxprt_rdma *xprt,
- struct rpcrdma_msg *rmsgp,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head)
-{
- int page_no, ret;
- struct rpcrdma_read_chunk *ch;
- u32 handle, page_offset, byte_count;
- u32 position;
- u64 rs_offset;
- bool last;
-
- /* If no read list is present, return 0 */
- ch = svc_rdma_get_read_chunk(rmsgp);
- if (!ch)
- return 0;
-
- /* The request is completed when the RDMA_READs complete. The
- * head context keeps all the pages that comprise the
- * request.
- */
- head->arg.head[0] = rqstp->rq_arg.head[0];
- head->arg.tail[0] = rqstp->rq_arg.tail[0];
- head->hdr_count = head->count;
- head->arg.page_base = 0;
- head->arg.page_len = 0;
- head->arg.len = rqstp->rq_arg.len;
- head->arg.buflen = rqstp->rq_arg.buflen;
-
- /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
- position = be32_to_cpu(ch->rc_position);
- if (position == 0) {
- head->arg.pages = &head->pages[0];
- page_offset = head->byte_len;
- } else {
- head->arg.pages = &head->pages[head->count];
- page_offset = 0;
- }
-
- ret = 0;
- page_no = 0;
- for (; ch->rc_discrim != xdr_zero; ch++) {
- if (be32_to_cpu(ch->rc_position) != position)
- goto err;
-
- handle = be32_to_cpu(ch->rc_target.rs_handle),
- byte_count = be32_to_cpu(ch->rc_target.rs_length);
- xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
- &rs_offset);
-
- while (byte_count > 0) {
- last = (ch + 1)->rc_discrim == xdr_zero;
- ret = xprt->sc_reader(xprt, rqstp, head,
- &page_no, &page_offset,
- handle, byte_count,
- rs_offset, last);
- if (ret < 0)
- goto err;
- byte_count -= ret;
- rs_offset += ret;
- head->arg.buflen += ret;
- }
- }
-
- /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
- if (page_offset & 3) {
- u32 pad = 4 - (page_offset & 3);
-
- head->arg.tail[0].iov_len += pad;
- head->arg.len += pad;
- head->arg.buflen += pad;
- page_offset += pad;
- }
-
- ret = 1;
- if (position && position < head->arg.head[0].iov_len)
- ret = rdma_copy_tail(rqstp, head, position,
- byte_count, page_offset, page_no);
- head->arg.head[0].iov_len = position;
- head->position = position;
-
- err:
- /* Detach arg pages. svc_recv will replenish them */
- for (page_no = 0;
- &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
- rqstp->rq_pages[page_no] = NULL;
-
- return ret;
-}
-
static void rdma_read_complete(struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *head)
{
@@ -720,24 +376,9 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
rqstp->rq_pages[page_no] = head->pages[page_no];
}

- /* Adjustments made for RDMA_NOMSG type requests */
- if (head->position == 0) {
- if (head->arg.len <= head->sge[0].length) {
- head->arg.head[0].iov_len = head->arg.len -
- head->byte_len;
- head->arg.page_len = 0;
- } else {
- head->arg.head[0].iov_len = head->sge[0].length -
- head->byte_len;
- head->arg.page_len = head->arg.len -
- head->sge[0].length;
- }
- }
-
/* Point rq_arg.pages past header */
rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
rqstp->rq_arg.page_len = head->arg.page_len;
- rqstp->rq_arg.page_base = head->arg.page_base;

/* rq_respages starts after the last arg page */
rqstp->rq_respages = &rqstp->rq_pages[page_no];
@@ -834,10 +475,35 @@ static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
return true;
}

-/*
- * Set up the rqstp thread context to point to the RQ buffer. If
- * necessary, pull additional data from the client with an RDMA_READ
- * request.
+/**
+ * svc_rdma_recvfrom - Receive an RPC call
+ * @rqstp: request structure into which to receive an RPC Call
+ *
+ * Returns:
+ * The positive number of bytes in the RPC Call message,
+ * %0 if there were no Calls ready to return,
+ * %-EINVAL if the Read chunk data is too large,
+ * %-ENOMEM if rdma_rw context pool was exhausted,
+ * %-ENOTCONN if posting failed (connection is lost),
+ * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ *
+ * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only
+ * when there are no remaining ctxt's to process.
+ *
+ * The next ctxt is removed from the "receive" lists.
+ *
+ * - If the ctxt completes a Read, then finish assembling the Call
+ * message and return the number of bytes in the message.
+ *
+ * - If the ctxt completes a Receive, then construct the Call
+ * message from the contents of the Receive buffer.
+ *
+ * - If there are no Read chunks in this message, then finish
+ * assembling the Call message and return the number of bytes
+ * in the message.
+ *
+ * - If there are Read chunks in this message, post Read WRs to
+ * pull that payload and return 0.
*/
int svc_rdma_recvfrom(struct svc_rqst *rqstp)
{
@@ -845,11 +511,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svcxprt_rdma *rdma_xprt =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
struct svc_rdma_op_ctxt *ctxt;
- struct rpcrdma_msg *rmsgp;
+ __be32 *p;
int ret;

- dprintk("svcrdma: rqstp=%p\n", rqstp);
-
spin_lock(&rdma_xprt->sc_rq_dto_lock);
if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
ctxt = list_first_entry(&rdma_xprt->sc_read_complete_q,
@@ -870,7 +534,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
}
spin_unlock(&rdma_xprt->sc_rq_dto_lock);

- dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p\n",
+ dprintk("svcrdma: recvfrom: ctxt=%p on xprt=%p, rqstp=%p\n",
ctxt, rdma_xprt, rqstp);
atomic_inc(&rdma_stat_recv);

@@ -878,7 +542,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);

/* Decode the RDMA header. */
- rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+ p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg);
if (ret < 0)
goto out_err;
@@ -886,9 +550,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
goto out_drop;
rqstp->rq_xprt_hlen = ret;

- if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) {
- ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt,
- &rmsgp->rm_xid,
+ if (svc_rdma_is_backchannel_reply(xprt, p)) {
+ ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p,
&rqstp->rq_arg);
svc_rdma_put_context(ctxt, 0);
if (ret)
@@ -896,16 +559,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
return ret;
}

- /* Read read-list data. */
- ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
- if (ret > 0) {
- /* read-list posted, defer until data received from client. */
- goto defer;
- } else if (ret < 0) {
- /* Post of read-list failed, free context. */
- svc_rdma_put_context(ctxt, 1);
- return 0;
- }
+ p += rpcrdma_fixed_maxsz;
+ if (*p != xdr_zero)
+ goto out_readchunk;

complete:
ret = rqstp->rq_arg.head[0].iov_len
@@ -921,13 +577,22 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
svc_xprt_copy_addrs(rqstp, xprt);
return ret;

+out_readchunk:
+ ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p);
+ if (ret < 0)
+ goto out_postfail;
+ return 0;
+
out_err:
- svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret);
+ svc_rdma_send_error(rdma_xprt, p, ret);
svc_rdma_put_context(ctxt, 0);
return 0;

-defer:
- return 0;
+out_postfail:
+ if (ret == -EINVAL)
+ svc_rdma_send_error(rdma_xprt, p, ret);
+ svc_rdma_put_context(ctxt, 1);
+ return ret;

out_drop:
svc_rdma_put_context(ctxt, 1);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index a9d9cb1..72d2dcd 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -908,8 +908,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
* capabilities of this particular device */
newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge,
(size_t)RPCSVC_MAXPAGES);
- newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd,
- RPCSVC_MAXPAGES);
newxprt->sc_max_req_size = svcrdma_max_req_size;
newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr,
svcrdma_max_requests);
@@ -998,12 +996,10 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
* NB: iWARP requires remote write access for the data sink
* of an RDMA_READ. IB does not.
*/
- newxprt->sc_reader = rdma_read_chunk_lcl;
if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
newxprt->sc_frmr_pg_list_len =
dev->attrs.max_fast_reg_page_list_len;
newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
- newxprt->sc_reader = rdma_read_chunk_frmr;
} else
newxprt->sc_snd_w_inv = false;

@@ -1056,7 +1052,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap));
dprintk(" max_sge : %d\n", newxprt->sc_max_sge);
- dprintk(" max_sge_rd : %d\n", newxprt->sc_max_sge_rd);
dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth);
dprintk(" max_requests : %d\n", newxprt->sc_max_requests);
dprintk(" ord : %d\n", newxprt->sc_ord);
@@ -1117,12 +1112,6 @@ static void __svc_rdma_free(struct work_struct *work)
pr_err("svcrdma: sc_xprt still in use? (%d)\n",
kref_read(&xprt->xpt_ref));

- /*
- * Destroy queued, but not processed read completions. Note
- * that this cleanup has to be done before destroying the
- * cm_id because the device ptr is needed to unmap the dma in
- * svc_rdma_put_context.
- */
while (!list_empty(&rdma->sc_read_complete_q)) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = list_first_entry(&rdma->sc_read_complete_q,
@@ -1130,8 +1119,6 @@ static void __svc_rdma_free(struct work_struct *work)
list_del(&ctxt->list);
svc_rdma_put_context(ctxt, 1);
}
-
- /* Destroy queued, but not processed recv completions */
while (!list_empty(&rdma->sc_rq_dto_q)) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = list_first_entry(&rdma->sc_rq_dto_q,


2017-06-23 21:18:51

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 13/18] svcrdma: Remove unused Read completion handlers

Clean up:

The generic RDMA R/W API conversion of svc_rdma_recvfrom replaced
the Register, Read, and Invalidate completion handlers. Remove the
old ones, which are no longer used.

These handlers shared some helper code with svc_rdma_wc_send. Fold
the wc_common helper back into the one remaining completion handler.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 4 -
net/sunrpc/xprtrdma/svc_rdma_transport.c | 93 +++---------------------------
2 files changed, 10 insertions(+), 87 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index b1ba19b..06d58a3 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -77,17 +77,15 @@ enum {
*/
struct svc_rdma_op_ctxt {
struct list_head list;
- struct svc_rdma_op_ctxt *read_hdr;
struct svc_rdma_fastreg_mr *frmr;
- int hdr_count;
struct xdr_buf arg;
struct ib_cqe cqe;
u32 byte_len;
struct svcxprt_rdma *xprt;
- unsigned long flags;
enum dma_data_direction direction;
int count;
unsigned int mapped_sges;
+ int hdr_count;
struct ib_send_wr send_wr;
struct ib_sge sge[1 + RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE];
struct page *pages[RPCSVC_MAXPAGES];
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 72d2dcd..c915cba 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -346,36 +346,6 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
svc_xprt_put(&xprt->sc_xprt);
}

-static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
- struct ib_wc *wc,
- const char *opname)
-{
- if (wc->status != IB_WC_SUCCESS)
- goto err;
-
-out:
- atomic_inc(&xprt->sc_sq_avail);
- wake_up(&xprt->sc_send_wait);
- return;
-
-err:
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: %s: %s (%u/0x%x)\n",
- opname, ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
- goto out;
-}
-
-static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
- const char *opname)
-{
- struct svcxprt_rdma *xprt = cq->cq_context;
-
- svc_rdma_send_wc_common(xprt, wc, opname);
- svc_xprt_put(&xprt->sc_xprt);
-}
-
/**
* svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
* @cq: completion queue
@@ -384,73 +354,28 @@ static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
*/
void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct svc_rdma_op_ctxt *ctxt;
-
- svc_rdma_send_wc_common_put(cq, wc, "send");
-
- ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
-}
-
-/**
- * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
- * @cq: completion queue
- * @wc: completed WR
- *
- */
-void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
-{
- svc_rdma_send_wc_common_put(cq, wc, "fastreg");
-}
-
-/**
- * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
- * @cq: completion queue
- * @wc: completed WR
- *
- */
-void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
-{
struct svcxprt_rdma *xprt = cq->cq_context;
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_op_ctxt *ctxt;

- svc_rdma_send_wc_common(xprt, wc, "read");
+ atomic_inc(&xprt->sc_sq_avail);
+ wake_up(&xprt->sc_send_wait);

ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_frmr(xprt, ctxt->frmr);
-
- if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
- struct svc_rdma_op_ctxt *read_hdr;
-
- read_hdr = ctxt->read_hdr;
- spin_lock(&xprt->sc_rq_dto_lock);
- list_add_tail(&read_hdr->list,
- &xprt->sc_read_complete_q);
- spin_unlock(&xprt->sc_rq_dto_lock);
+ svc_rdma_put_context(ctxt, 1);

- set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- svc_xprt_enqueue(&xprt->sc_xprt);
+ if (unlikely(wc->status != IB_WC_SUCCESS)) {
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("svcrdma: Send: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
}

- svc_rdma_put_context(ctxt, 0);
svc_xprt_put(&xprt->sc_xprt);
}

-/**
- * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
- * @cq: completion queue
- * @wc: completed WR
- *
- */
-void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
-{
- svc_rdma_send_wc_common_put(cq, wc, "localInv");
-}
-
static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
int listener)
{


2017-06-23 21:18:59

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 14/18] svcrdma: Remove frmr cache

Clean up: Now that the svc_rdma_recvfrom path uses the rdma_rw API,
the details of Read sink buffer registration are dealt with by the
kernel's RDMA core. This cache is no longer used, and can be
removed.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 18 ------
net/sunrpc/xprtrdma/svc_rdma_transport.c | 86 ------------------------------
2 files changed, 104 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 06d58a3..fd7775f 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -77,7 +77,6 @@ enum {
*/
struct svc_rdma_op_ctxt {
struct list_head list;
- struct svc_rdma_fastreg_mr *frmr;
struct xdr_buf arg;
struct ib_cqe cqe;
u32 byte_len;
@@ -91,17 +90,6 @@ struct svc_rdma_op_ctxt {
struct page *pages[RPCSVC_MAXPAGES];
};

-struct svc_rdma_fastreg_mr {
- struct ib_mr *mr;
- struct scatterlist *sg;
- int sg_nents;
- unsigned long access_flags;
- enum dma_data_direction direction;
- struct list_head frmr_list;
-};
-
-#define RDMACTXT_F_LAST_CTXT 2
-
#define SVCRDMA_DEVCAP_FAST_REG 1 /* fast mr registration */
#define SVCRDMA_DEVCAP_READ_W_INV 2 /* read w/ invalidate */

@@ -136,9 +124,6 @@ struct svcxprt_rdma {
struct ib_cq *sc_rq_cq;
struct ib_cq *sc_sq_cq;
u32 sc_dev_caps; /* distilled device caps */
- unsigned int sc_frmr_pg_list_len;
- struct list_head sc_frmr_q;
- spinlock_t sc_frmr_q_lock;

spinlock_t sc_lock; /* transport lock */

@@ -210,9 +195,6 @@ extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
-extern struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *);
-extern void svc_rdma_put_frmr(struct svcxprt_rdma *,
- struct svc_rdma_fastreg_mr *);
extern void svc_sq_reap(struct svcxprt_rdma *);
extern void svc_rq_reap(struct svcxprt_rdma *);
extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index c915cba..8864105 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -202,7 +202,6 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
out:
ctxt->count = 0;
ctxt->mapped_sges = 0;
- ctxt->frmr = NULL;
return ctxt;

out_empty:
@@ -387,14 +386,12 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
- INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
init_waitqueue_head(&cma_xprt->sc_send_wait);

spin_lock_init(&cma_xprt->sc_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock);
- spin_lock_init(&cma_xprt->sc_frmr_q_lock);
spin_lock_init(&cma_xprt->sc_ctxt_lock);
spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);

@@ -705,86 +702,6 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
return ERR_PTR(ret);
}

-static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
-{
- struct ib_mr *mr;
- struct scatterlist *sg;
- struct svc_rdma_fastreg_mr *frmr;
- u32 num_sg;
-
- frmr = kmalloc(sizeof(*frmr), GFP_KERNEL);
- if (!frmr)
- goto err;
-
- num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len);
- mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg);
- if (IS_ERR(mr))
- goto err_free_frmr;
-
- sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL);
- if (!sg)
- goto err_free_mr;
-
- sg_init_table(sg, RPCSVC_MAXPAGES);
-
- frmr->mr = mr;
- frmr->sg = sg;
- INIT_LIST_HEAD(&frmr->frmr_list);
- return frmr;
-
- err_free_mr:
- ib_dereg_mr(mr);
- err_free_frmr:
- kfree(frmr);
- err:
- return ERR_PTR(-ENOMEM);
-}
-
-static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
-{
- struct svc_rdma_fastreg_mr *frmr;
-
- while (!list_empty(&xprt->sc_frmr_q)) {
- frmr = list_entry(xprt->sc_frmr_q.next,
- struct svc_rdma_fastreg_mr, frmr_list);
- list_del_init(&frmr->frmr_list);
- kfree(frmr->sg);
- ib_dereg_mr(frmr->mr);
- kfree(frmr);
- }
-}
-
-struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
-{
- struct svc_rdma_fastreg_mr *frmr = NULL;
-
- spin_lock(&rdma->sc_frmr_q_lock);
- if (!list_empty(&rdma->sc_frmr_q)) {
- frmr = list_entry(rdma->sc_frmr_q.next,
- struct svc_rdma_fastreg_mr, frmr_list);
- list_del_init(&frmr->frmr_list);
- frmr->sg_nents = 0;
- }
- spin_unlock(&rdma->sc_frmr_q_lock);
- if (frmr)
- return frmr;
-
- return rdma_alloc_frmr(rdma);
-}
-
-void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
- struct svc_rdma_fastreg_mr *frmr)
-{
- if (frmr) {
- ib_dma_unmap_sg(rdma->sc_cm_id->device,
- frmr->sg, frmr->sg_nents, frmr->direction);
- spin_lock(&rdma->sc_frmr_q_lock);
- WARN_ON_ONCE(!list_empty(&frmr->frmr_list));
- list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
- spin_unlock(&rdma->sc_frmr_q_lock);
- }
-}
-
/*
* This is the xpo_recvfrom function for listening endpoints. Its
* purpose is to accept incoming connections. The CMA callback handler
@@ -922,8 +839,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
* of an RDMA_READ. IB does not.
*/
if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
- newxprt->sc_frmr_pg_list_len =
- dev->attrs.max_fast_reg_page_list_len;
newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
} else
newxprt->sc_snd_w_inv = false;
@@ -1063,7 +978,6 @@ static void __svc_rdma_free(struct work_struct *work)
xprt->xpt_bc_xprt = NULL;
}

- rdma_dealloc_frmr_q(rdma);
svc_rdma_destroy_rw_ctxts(rdma);
svc_rdma_destroy_ctxts(rdma);



2017-06-23 21:19:16

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 15/18] svcrdma: Clean-up svc_rdma_unmap_dma

There's no longer a need to compare each SGE's lkey with the PD's
local_dma_lkey. Now that FRWR is gone, all DMA mappings are for
pages that were registered with this key.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_transport.c | 19 +++++--------------
1 file changed, 5 insertions(+), 14 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 8864105..75bd11f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -225,22 +225,13 @@ void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
{
struct svcxprt_rdma *xprt = ctxt->xprt;
struct ib_device *device = xprt->sc_cm_id->device;
- u32 lkey = xprt->sc_pd->local_dma_lkey;
unsigned int i;

- for (i = 0; i < ctxt->mapped_sges; i++) {
- /*
- * Unmap the DMA addr in the SGE if the lkey matches
- * the local_dma_lkey, otherwise, ignore it since it is
- * an FRMR lkey and will be unmapped later when the
- * last WR that uses it completes.
- */
- if (ctxt->sge[i].lkey == lkey)
- ib_dma_unmap_page(device,
- ctxt->sge[i].addr,
- ctxt->sge[i].length,
- ctxt->direction);
- }
+ for (i = 0; i < ctxt->mapped_sges; i++)
+ ib_dma_unmap_page(device,
+ ctxt->sge[i].addr,
+ ctxt->sge[i].length,
+ ctxt->direction);
ctxt->mapped_sges = 0;
}



2017-06-23 21:19:15

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 16/18] svcrdma: Clean up after converting svc_rdma_recvfrom to rdma_rw API

Clean up: Registration mode details are now handled by the rdma_rw
API, and thus can be removed from svcrdma.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 4 ---
net/sunrpc/xprtrdma/svc_rdma_transport.c | 39 +++---------------------------
2 files changed, 4 insertions(+), 39 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index fd7775f..995c6fe 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -90,9 +90,6 @@ struct svc_rdma_op_ctxt {
struct page *pages[RPCSVC_MAXPAGES];
};

-#define SVCRDMA_DEVCAP_FAST_REG 1 /* fast mr registration */
-#define SVCRDMA_DEVCAP_READ_W_INV 2 /* read w/ invalidate */
-
struct svcxprt_rdma {
struct svc_xprt sc_xprt; /* SVC transport structure */
struct rdma_cm_id *sc_cm_id; /* RDMA connection id */
@@ -123,7 +120,6 @@ struct svcxprt_rdma {
struct ib_qp *sc_qp;
struct ib_cq *sc_rq_cq;
struct ib_cq *sc_sq_cq;
- u32 sc_dev_caps; /* distilled device caps */

spinlock_t sc_lock; /* transport lock */

diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 75bd11f..e660d49 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -783,7 +783,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
memset(&qp_attr, 0, sizeof qp_attr);
qp_attr.event_handler = qp_event_handler;
qp_attr.qp_context = &newxprt->sc_xprt;
- qp_attr.port_num = newxprt->sc_cm_id->port_num;
+ qp_attr.port_num = newxprt->sc_port_num;
qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests;
qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
@@ -807,43 +807,12 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
}
newxprt->sc_qp = newxprt->sc_cm_id->qp;

- /*
- * Use the most secure set of MR resources based on the
- * transport type and available memory management features in
- * the device. Here's the table implemented below:
- *
- * Fast Global DMA Remote WR
- * Reg LKEY MR Access
- * Sup'd Sup'd Needed Needed
- *
- * IWARP N N Y Y
- * N Y Y Y
- * Y N Y N
- * Y Y N -
- *
- * IB N N Y N
- * N Y N -
- * Y N Y N
- * Y Y N -
- *
- * NB: iWARP requires remote write access for the data sink
- * of an RDMA_READ. IB does not.
- */
- if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
- newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
- } else
+ if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
newxprt->sc_snd_w_inv = false;
-
- /*
- * Determine if a DMA MR is required and if so, what privs are required
- */
- if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) &&
- !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num))
+ if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) &&
+ !rdma_ib_or_roce(dev, newxprt->sc_port_num))
goto errout;

- if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num))
- newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
-
/* Post receive buffers */
for (i = 0; i < newxprt->sc_max_requests; i++) {
ret = svc_rdma_post_recv(newxprt, GFP_KERNEL);


2017-06-23 21:19:28

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 17/18] svcrdma: use offset_in_page() macro

Clean up: Use offset_in_page() macro instead of open-coding.

Reported-by: Geliang Tang <[email protected]>
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_rw.c | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 9d7a151..9859736 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -372,8 +372,9 @@ static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
struct scatterlist *sg;
struct page **page;

- page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK;
- page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT;
+ page_off = info->wi_next_off + xdr->page_base;
+ page_no = page_off >> PAGE_SHIFT;
+ page_off = offset_in_page(page_off);
page = xdr->pages + page_no;
info->wi_next_off += remaining;
sg = ctxt->rw_sg_table.sgl;


2017-06-23 21:19:31

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 18/18] svcrdma: Remove svc_rdma_chunk_ctxt::cc_dir field

Clean up: No need to save the I/O direction. The functions that
release svc_rdma_chunk_ctxt already know what direction to use.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_rw.c | 18 ++++++++----------
1 file changed, 8 insertions(+), 10 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 9859736..933f79b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -116,22 +116,20 @@ struct svc_rdma_chunk_ctxt {
struct svcxprt_rdma *cc_rdma;
struct list_head cc_rwctxts;
int cc_sqecount;
- enum dma_data_direction cc_dir;
};

static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
- struct svc_rdma_chunk_ctxt *cc,
- enum dma_data_direction dir)
+ struct svc_rdma_chunk_ctxt *cc)
{
cc->cc_rdma = rdma;
svc_xprt_get(&rdma->sc_xprt);

INIT_LIST_HEAD(&cc->cc_rwctxts);
cc->cc_sqecount = 0;
- cc->cc_dir = dir;
}

-static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)
+static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
+ enum dma_data_direction dir)
{
struct svcxprt_rdma *rdma = cc->cc_rdma;
struct svc_rdma_rw_ctxt *ctxt;
@@ -141,7 +139,7 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)

rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
rdma->sc_port_num, ctxt->rw_sg_table.sgl,
- ctxt->rw_nents, cc->cc_dir);
+ ctxt->rw_nents, dir);
svc_rdma_put_rw_ctxt(rdma, ctxt);
}
svc_xprt_put(&rdma->sc_xprt);
@@ -179,14 +177,14 @@ struct svc_rdma_write_info {
info->wi_seg_no = 0;
info->wi_nsegs = be32_to_cpup(++chunk);
info->wi_segs = ++chunk;
- svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE);
+ svc_rdma_cc_init(rdma, &info->wi_cc);
info->wi_cc.cc_cqe.done = svc_rdma_write_done;
return info;
}

static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
{
- svc_rdma_cc_release(&info->wi_cc);
+ svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
kfree(info);
}

@@ -241,14 +239,14 @@ struct svc_rdma_read_info {
if (!info)
return info;

- svc_rdma_cc_init(rdma, &info->ri_cc, DMA_FROM_DEVICE);
+ svc_rdma_cc_init(rdma, &info->ri_cc);
info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
return info;
}

static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
{
- svc_rdma_cc_release(&info->ri_cc);
+ svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
kfree(info);
}



2017-06-29 20:20:04

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH v3 09/18] sunrpc: Allocate one more page per svc_rqst

I'm confused by this one:

On Fri, Jun 23, 2017 at 05:18:16PM -0400, Chuck Lever wrote:
> svcrdma needs 259 pages for 1MB NFSv4.0 READ requests:
>
> - 1 page for the transport header and head iovec
> - 256 pages for the data payload
> - 1 page for the trailing GETATTR request (since NFSD XDR decoding
> does not look for a tail iovec, the GETATTR is stuck at the end
> of the rqstp->rq_arg.pages list)
> - 1 page for building the reply xdr_buf
>
> Note that RPCSVC_MAXPAGES is defined as:
>
> ((RPCSVC_MAXPAYLOAD+PAGE_SIZE-1)/PAGE_SIZE + 2 + 1)
>
> I don't understand why the "+PAGE_SIZE-1" is in there, because
> division will always round the result down to
>
> (RPCSVC_MAXPAYLOAD / PAGE_SIZE + 2 + 1)

I think you're assuming that RPCSVC_MAXPAYLOAD is a multiple of
PAGE_SIZE. Maybe that's true in all cases (I haven't checked), but it
seems harmless to handle the case where it's not.

> Let's remove the "-1" to get 260 pages maximum.

Why? (I'm not necessarily opposed, I just missed the explanation.)

--b.

> Then svc_alloc_args() needs to ask for pages according to this
> same formula, otherwise it will never allocate enough.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/svc.h | 3 +--
> net/sunrpc/svc_xprt.c | 8 +++++---
> 2 files changed, 6 insertions(+), 5 deletions(-)
>
> diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
> index 11cef5a..35c274f 100644
> --- a/include/linux/sunrpc/svc.h
> +++ b/include/linux/sunrpc/svc.h
> @@ -176,8 +176,7 @@ static inline void svc_get(struct svc_serv *serv)
> * We using ->sendfile to return read data, we might need one extra page
> * if the request is not page-aligned. So add another '1'.
> */
> -#define RPCSVC_MAXPAGES ((RPCSVC_MAXPAYLOAD+PAGE_SIZE-1)/PAGE_SIZE \
> - + 2 + 1)
> +#define RPCSVC_MAXPAGES ((RPCSVC_MAXPAYLOAD + PAGE_SIZE) / PAGE_SIZE + 2 + 1)
>
> static inline u32 svc_getnl(struct kvec *iov)
> {
> diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
> index 7bfe1fb..9bd484d 100644
> --- a/net/sunrpc/svc_xprt.c
> +++ b/net/sunrpc/svc_xprt.c
> @@ -659,11 +659,13 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
> int i;
>
> /* now allocate needed pages. If we get a failure, sleep briefly */
> - pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
> - WARN_ON_ONCE(pages >= RPCSVC_MAXPAGES);
> - if (pages >= RPCSVC_MAXPAGES)
> + pages = (serv->sv_max_mesg + (2 * PAGE_SIZE)) >> PAGE_SHIFT;
> + if (pages >= RPCSVC_MAXPAGES) {
> + pr_warn_once("svc: warning: pages=%u >= RPCSVC_MAXPAGES=%lu\n",
> + pages, RPCSVC_MAXPAGES);
> /* use as many pages as possible */
> pages = RPCSVC_MAXPAGES - 1;
> + }
> for (i = 0; i < pages ; i++)
> while (rqstp->rq_pages[i] == NULL) {
> struct page *p = alloc_page(GFP_KERNEL);

2017-06-29 20:44:47

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v3 09/18] sunrpc: Allocate one more page per svc_rqst

Thanks for the review!

> On Jun 29, 2017, at 4:20 PM, J. Bruce Fields <[email protected]> wrote:
>
> I'm confused by this one:
>
> On Fri, Jun 23, 2017 at 05:18:16PM -0400, Chuck Lever wrote:
>> svcrdma needs 259 pages for 1MB NFSv4.0 READ requests:
>>
>> - 1 page for the transport header and head iovec
>> - 256 pages for the data payload
>> - 1 page for the trailing GETATTR request (since NFSD XDR decoding
>> does not look for a tail iovec, the GETATTR is stuck at the end
>> of the rqstp->rq_arg.pages list)
>> - 1 page for building the reply xdr_buf
>>
>> Note that RPCSVC_MAXPAGES is defined as:
>>
>> ((RPCSVC_MAXPAYLOAD+PAGE_SIZE-1)/PAGE_SIZE + 2 + 1)
>>
>> I don't understand why the "+PAGE_SIZE-1" is in there, because
>> division will always round the result down to
>>
>> (RPCSVC_MAXPAYLOAD / PAGE_SIZE + 2 + 1)
>
> I think you're assuming that RPCSVC_MAXPAYLOAD is a multiple of
> PAGE_SIZE. Maybe that's true in all cases (I haven't checked), but it
> seems harmless to handle the case where it's not.

include/linux/sunrpc/svc.h says:

126 * Maximum payload size supported by a kernel RPC server.
127 * This is use to determine the max number of pages nfsd is
128 * willing to return in a single READ operation.
129 *
130 * These happen to all be powers of 2, which is not strictly
131 * necessary but helps enforce the real limitation, which is
132 * that they should be multiples of PAGE_SIZE.

Note that the same calculation in svc_alloc_args() assumes
that serv->sv_max_mesg is a multiple of PAGE_SIZE:

pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;

It's confusing that the value of RPCSVC_MAXPAGES is not computed
the same way as "pages" is.


>> Let's remove the "-1" to get 260 pages maximum.
>
> Why? (I'm not necessarily opposed, I just missed the explanation.)

The top of the patch description explains why I need 259 pages.

MAX_PAGES has to be 260 in order for this check in svc_alloc_args()

if (pages >= RPCSVC_MAXPAGES) {
/* use as many pages as possible */
pages = RPCSVC_MAXPAGES - 1;

to allow 260 elements of the array to be filled in.


> --b.
>
>> Then svc_alloc_args() needs to ask for pages according to this
>> same formula, otherwise it will never allocate enough.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> include/linux/sunrpc/svc.h | 3 +--
>> net/sunrpc/svc_xprt.c | 8 +++++---
>> 2 files changed, 6 insertions(+), 5 deletions(-)
>>
>> diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
>> index 11cef5a..35c274f 100644
>> --- a/include/linux/sunrpc/svc.h
>> +++ b/include/linux/sunrpc/svc.h
>> @@ -176,8 +176,7 @@ static inline void svc_get(struct svc_serv *serv)
>> * We using ->sendfile to return read data, we might need one extra page
>> * if the request is not page-aligned. So add another '1'.
>> */
>> -#define RPCSVC_MAXPAGES ((RPCSVC_MAXPAYLOAD+PAGE_SIZE-1)/PAGE_SIZE \
>> - + 2 + 1)
>> +#define RPCSVC_MAXPAGES ((RPCSVC_MAXPAYLOAD + PAGE_SIZE) / PAGE_SIZE + 2 + 1)
>>
>> static inline u32 svc_getnl(struct kvec *iov)
>> {
>> diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
>> index 7bfe1fb..9bd484d 100644
>> --- a/net/sunrpc/svc_xprt.c
>> +++ b/net/sunrpc/svc_xprt.c
>> @@ -659,11 +659,13 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
>> int i;
>>
>> /* now allocate needed pages. If we get a failure, sleep briefly */
>> - pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
>> - WARN_ON_ONCE(pages >= RPCSVC_MAXPAGES);
>> - if (pages >= RPCSVC_MAXPAGES)
>> + pages = (serv->sv_max_mesg + (2 * PAGE_SIZE)) >> PAGE_SHIFT;
>> + if (pages >= RPCSVC_MAXPAGES) {
>> + pr_warn_once("svc: warning: pages=%u >= RPCSVC_MAXPAGES=%lu\n",
>> + pages, RPCSVC_MAXPAGES);
>> /* use as many pages as possible */
>> pages = RPCSVC_MAXPAGES - 1;
>> + }
>> for (i = 0; i < pages ; i++)
>> while (rqstp->rq_pages[i] == NULL) {
>> struct page *p = alloc_page(GFP_KERNEL);
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever




2017-06-30 00:23:11

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v3 09/18] sunrpc: Allocate one more page per svc_rqst


> On Jun 29, 2017, at 4:44 PM, Chuck Lever <[email protected]> wrote:
>
> Thanks for the review!
>
>> On Jun 29, 2017, at 4:20 PM, J. Bruce Fields <[email protected]> wrote:
>>
>> I'm confused by this one:
>>
>> On Fri, Jun 23, 2017 at 05:18:16PM -0400, Chuck Lever wrote:
>>> svcrdma needs 259 pages for 1MB NFSv4.0 READ requests:
>>>
>>> - 1 page for the transport header and head iovec
>>> - 256 pages for the data payload
>>> - 1 page for the trailing GETATTR request (since NFSD XDR decoding
>>> does not look for a tail iovec, the GETATTR is stuck at the end
>>> of the rqstp->rq_arg.pages list)
>>> - 1 page for building the reply xdr_buf
>>>
>>> Note that RPCSVC_MAXPAGES is defined as:
>>>
>>> ((RPCSVC_MAXPAYLOAD+PAGE_SIZE-1)/PAGE_SIZE + 2 + 1)
>>>
>>> I don't understand why the "+PAGE_SIZE-1" is in there, because
>>> division will always round the result down to
>>>
>>> (RPCSVC_MAXPAYLOAD / PAGE_SIZE + 2 + 1)
>>
>> I think you're assuming that RPCSVC_MAXPAYLOAD is a multiple of
>> PAGE_SIZE. Maybe that's true in all cases (I haven't checked), but it
>> seems harmless to handle the case where it's not.
>
> include/linux/sunrpc/svc.h says:
>
> 126 * Maximum payload size supported by a kernel RPC server.
> 127 * This is use to determine the max number of pages nfsd is
> 128 * willing to return in a single READ operation.
> 129 *
> 130 * These happen to all be powers of 2, which is not strictly
> 131 * necessary but helps enforce the real limitation, which is
> 132 * that they should be multiples of PAGE_SIZE.
>
> Note that the same calculation in svc_alloc_args() assumes
> that serv->sv_max_mesg is a multiple of PAGE_SIZE:
>
> pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
>
> It's confusing that the value of RPCSVC_MAXPAGES is not computed
> the same way as "pages" is.
>
>
>>> Let's remove the "-1" to get 260 pages maximum.
>>
>> Why? (I'm not necessarily opposed, I just missed the explanation.)
>
> The top of the patch description explains why I need 259 pages.
>
> MAX_PAGES has to be 260 in order for this check in svc_alloc_args()
>
> if (pages >= RPCSVC_MAXPAGES) {
> /* use as many pages as possible */
> pages = RPCSVC_MAXPAGES - 1;
>
> to allow 260 elements of the array to be filled in.

OK, I see why I was confused about this. I expected a variable
called "pages" to be the count of pages. But it actually contains
the maximum index of the rq_pages array, which is one less than
the number of array elements. However...

It looks like the rq_pages array already has 259 elements, but
the last element always contains NULL.

682 rqstp->rq_page_end = &rqstp->rq_pages[i];
683 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */

So we actually have just 258 pages in this 259-entry array. That's
one short of the 259 pages needed by svcrdma, and one less than
the value of RPCSVC_MAXPAGES.

(Note that "i++" isn't necessary, as "i" is not accessed again in
this function.).

I'm sure there's a better way to fix this. For instance, since
the last element of rq_pages is always NULL, perhaps the structure
definition could be

struct page *rq_pages[RPCSVC_MAXPAGES + 1];

And the logic in svc_alloc_arg() could be adjusted accordingly
to fill in at most RPCSVC_MAXPAGES pages.


>> --b.
>>
>>> Then svc_alloc_args() needs to ask for pages according to this
>>> same formula, otherwise it will never allocate enough.
>>>
>>> Signed-off-by: Chuck Lever <[email protected]>
>>> ---
>>> include/linux/sunrpc/svc.h | 3 +--
>>> net/sunrpc/svc_xprt.c | 8 +++++---
>>> 2 files changed, 6 insertions(+), 5 deletions(-)
>>>
>>> diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
>>> index 11cef5a..35c274f 100644
>>> --- a/include/linux/sunrpc/svc.h
>>> +++ b/include/linux/sunrpc/svc.h
>>> @@ -176,8 +176,7 @@ static inline void svc_get(struct svc_serv *serv)
>>> * We using ->sendfile to return read data, we might need one extra page
>>> * if the request is not page-aligned. So add another '1'.
>>> */
>>> -#define RPCSVC_MAXPAGES ((RPCSVC_MAXPAYLOAD+PAGE_SIZE-1)/PAGE_SIZE \
>>> - + 2 + 1)
>>> +#define RPCSVC_MAXPAGES ((RPCSVC_MAXPAYLOAD + PAGE_SIZE) / PAGE_SIZE + 2 + 1)
>>>
>>> static inline u32 svc_getnl(struct kvec *iov)
>>> {
>>> diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
>>> index 7bfe1fb..9bd484d 100644
>>> --- a/net/sunrpc/svc_xprt.c
>>> +++ b/net/sunrpc/svc_xprt.c
>>> @@ -659,11 +659,13 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
>>> int i;
>>>
>>> /* now allocate needed pages. If we get a failure, sleep briefly */
>>> - pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
>>> - WARN_ON_ONCE(pages >= RPCSVC_MAXPAGES);
>>> - if (pages >= RPCSVC_MAXPAGES)
>>> + pages = (serv->sv_max_mesg + (2 * PAGE_SIZE)) >> PAGE_SHIFT;
>>> + if (pages >= RPCSVC_MAXPAGES) {
>>> + pr_warn_once("svc: warning: pages=%u >= RPCSVC_MAXPAGES=%lu\n",
>>> + pages, RPCSVC_MAXPAGES);
>>> /* use as many pages as possible */
>>> pages = RPCSVC_MAXPAGES - 1;
>>> + }
>>> for (i = 0; i < pages ; i++)
>>> while (rqstp->rq_pages[i] == NULL) {
>>> struct page *p = alloc_page(GFP_KERNEL);
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
> --
> Chuck Lever
>
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever