2016-02-18 17:43:47

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 00/10] NFS/RDMA server patches for v4.6

Two major changes: Finish server-side support for RDMA_ERROR type
messages, and convert the server's completion handlers to the new
core CQ API.

Also available in the "nfsd-rdma-for-4.6" topic branch of this git
repo:

git://git.linux-nfs.org/projects/cel/cel-2.6.git

Or for browsing:

http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfsd-rdma-for-4.6


Changes since v2:
- Rebased on v4.5-rc4
- checkpatch.pl passes
- Fix iWARP RDMA Read path regression
- Reduce NFSv4.1 CB message size maximum in CREATE_SESSION
- Another pass at fixing Write chunk XDR padding
- Various clean ups


Changes since v1:
- Rebased on v4.5-rc3
- Patch 02/10 (pre-allocate frmrs) dropped
- Patch 04/10 replaced with more surgical patch
- Refined reporting of flushed completions
- Function names of completion methods changed to match client's
new completion function naming convention

---

Chuck Lever (10):
svcrdma: Do not write xdr_buf::tail in a Write chunk
nfsd: Lower NFSv4.1 callback message size limit
svcrdma: svc_rdma_post_recv() should close connection on error
rpcrdma: Add RPCRDMA_HDRLEN_ERR
svcrdma: Make RDMA_ERROR messages work
svcrdma: Use correct XID in error replies
svcrdma: Hook up the logic to return ERR_CHUNK
svcrdma: Remove close_out exit path
svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs
svcrdma: Use new CQ API for RPC-over-RDMA server send CQs


fs/nfsd/nfs4state.c | 24 +-
include/linux/sunrpc/auth.h | 7
include/linux/sunrpc/rpc_rdma.h | 1
include/linux/sunrpc/svc_rdma.h | 18 +
net/sunrpc/auth_null.c | 4
net/sunrpc/auth_unix.c | 6
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 15 -
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 64 +++-
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 60 ++--
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 88 +++++-
net/sunrpc/xprtrdma/svc_rdma_transport.c | 445 +++++++++-------------------
11 files changed, 333 insertions(+), 399 deletions(-)

--
Chuck Lever


2016-02-18 17:43:55

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 01/10] svcrdma: Do not write xdr_buf::tail in a Write chunk

When the Linux NFS server writes an odd-length data item into a
Write chunk, it finishes with XDR pad bytes. If the data item is
smaller than the Write chunk, the pad bytes are written at the end
of the data item, but still inside the chunk (ie, in the
application's buffer). Since this is direct data placement, that
exposes the pad bytes.

XDR pad bytes are inserted in order to preserve the XDR alignment
of the next XDR data item in an XDR stream. But Write chunks do not
appear in the payload XDR stream, and only one data item is allowed
in each chunk. So XDR padding is not needed in a Write chunk.

Thus the server should not write XDR pad bytes in Write chunks.

With NFSv4, the Linux NFS server places the results of any
operations that follow an NFSv4 READ or READLINK in the xdr_buf's
tail. Those results also should never be sent as a part of a Write
chunk. The current logic in send_write_chunks() appears to assume
that the xdr_buf's tail contains only pad bytes (ie, NFSv3).

I believe this is not currently an operational problem, however.

Short NFS READs that are returned in a Write chunk could be affected
by the pad exposure issue. But such READs happen only when the read
request goes past the EOF. Those are zero bytes anyway, and there's
no file data in the client's buffer past EOF.

Current NFSv4 clients don't append extra operations to NFSv4 READ
COMPOUNDs. This issue would be exposed if they were to start doing
so.

The fix is to send only the contents of the xdr_buf's page list in
a Write chunk.

When there is more than an XDR pad in the tail buffer, the server
still places the page list's XDR pad in subsequent inline content,
which is incorrect behavior. This patch does not address that issue.
But as far as I can tell, current clients do not place operations in
NFSv4 COMPOUNDs subsequent to READ or READLINK. I will continue to
pursue a fix for that issue.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index df57f3c..832eb54 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -50,6 +50,11 @@

#define RPCDBG_FACILITY RPCDBG_SVCXPRT

+static u32 xdr_padsize(u32 len)
+{
+ return (len & 3) ? (4 - (len & 3)) : 0;
+}
+
int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
struct svc_rdma_req_map *vec)
@@ -308,9 +313,8 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
struct svc_rqst *rqstp,
struct svc_rdma_req_map *vec)
{
- u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+ u32 xfer_len, xdr_off;
int write_len;
- u32 xdr_off;
int chunk_off;
int chunk_no;
int nchunks;
@@ -325,6 +329,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
&rdma_resp->rm_body.rm_chunks[1];

/* Write chunks start at the pagelist */
+ xfer_len = rqstp->rq_res.page_len;
nchunks = be32_to_cpu(arg_ary->wc_nchunks);
for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
xfer_len && chunk_no < nchunks;
@@ -364,7 +369,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
/* Update the req with the number of chunks actually used */
svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);

- return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+ return rqstp->rq_res.page_len + xdr_padsize(rqstp->rq_res.page_len);
}

static int send_reply_chunks(struct svcxprt_rdma *xprt,


2016-02-18 17:44:03

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 02/10] nfsd: Lower NFSv4.1 callback message size limit

The maximum size of a backchannel message on RPC-over-RDMA depends
on the connection's inline threshold. Today that threshold is
typically 1024 bytes, making the maximum message size 996 bytes.

The Linux server's CREATE_SESSION operation checks that the size
of callback Calls can be as large as 1044 bytes, to accommodate
RPCSEC_GSS. Thus CREATE_SESSION fails if a client advertises the
true message size maximum of 996 bytes.

But the server's backchannel currently does not support RPCSEC_GSS.
The actual maximum size it needs is much smaller. It is safe to
reduce the limit to enable NFSv4.1 on RDMA backchannel operation.

Signed-off-by: Chuck Lever <[email protected]>
---
fs/nfsd/nfs4state.c | 24 ++++++++++++++++--------
include/linux/sunrpc/auth.h | 7 +++++++
net/sunrpc/auth_null.c | 4 ++--
net/sunrpc/auth_unix.c | 6 ++----
4 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index c484a2b..0f56e57 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -2586,21 +2586,29 @@ static __be32 check_forechannel_attrs(struct nfsd4_channel_attrs *ca, struct nfs
return nfs_ok;
}

+/*
+ * Server's NFSv4.1 backchannel support is AUTH_SYS-only for now.
+ * These are based on similar macros in linux/sunrpc/msg_prot.h .
+ */
+#define RPC_MAX_HEADER_WITH_AUTH_SYS \
+ (RPC_CALLHDRSIZE + 2 * (2 + UNX_CALLSLACK))
+
+#define RPC_MAX_REPHEADER_WITH_AUTH_SYS \
+ (RPC_REPHDRSIZE + (2 + NUL_REPLYSLACK))
+
#define NFSD_CB_MAX_REQ_SZ ((NFS4_enc_cb_recall_sz + \
- RPC_MAX_HEADER_WITH_AUTH) * sizeof(__be32))
+ RPC_MAX_HEADER_WITH_AUTH_SYS) * sizeof(__be32))
#define NFSD_CB_MAX_RESP_SZ ((NFS4_dec_cb_recall_sz + \
- RPC_MAX_REPHEADER_WITH_AUTH) * sizeof(__be32))
+ RPC_MAX_REPHEADER_WITH_AUTH_SYS) * \
+ sizeof(__be32))

static __be32 check_backchannel_attrs(struct nfsd4_channel_attrs *ca)
{
ca->headerpadsz = 0;

- /*
- * These RPC_MAX_HEADER macros are overkill, especially since we
- * don't even do gss on the backchannel yet. But this is still
- * less than 1k. Tighten up this estimate in the unlikely event
- * it turns out to be a problem for some client:
- */
+ pr_err("%s: NFSD_CB_MAX_REQ_SZ=%lu, NFSD_CB_MAX_RESP_SZ=%lu\n",
+ __func__, NFSD_CB_MAX_REQ_SZ, NFSD_CB_MAX_RESP_SZ);
+
if (ca->maxreq_sz < NFSD_CB_MAX_REQ_SZ)
return nfserr_toosmall;
if (ca->maxresp_sz < NFSD_CB_MAX_RESP_SZ)
diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
index 1ecf13e..6a241a2 100644
--- a/include/linux/sunrpc/auth.h
+++ b/include/linux/sunrpc/auth.h
@@ -21,10 +21,17 @@
#include <linux/utsname.h>

/*
+ * Maximum size of AUTH_NONE authentication information, in XDR words.
+ */
+#define NUL_CALLSLACK (4)
+#define NUL_REPLYSLACK (2)
+
+/*
* Size of the nodename buffer. RFC1831 specifies a hard limit of 255 bytes,
* but Linux hostnames are actually limited to __NEW_UTS_LEN bytes.
*/
#define UNX_MAXNODENAME __NEW_UTS_LEN
+#define UNX_CALLSLACK (21 + XDR_QUADLEN(UNX_MAXNODENAME))

struct rpcsec_gss_info;

diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c
index c2a2b58..8d9eb4d 100644
--- a/net/sunrpc/auth_null.c
+++ b/net/sunrpc/auth_null.c
@@ -113,8 +113,8 @@ const struct rpc_authops authnull_ops = {

static
struct rpc_auth null_auth = {
- .au_cslack = 4,
- .au_rslack = 2,
+ .au_cslack = NUL_CALLSLACK,
+ .au_rslack = NUL_REPLYSLACK,
.au_ops = &authnull_ops,
.au_flavor = RPC_AUTH_NULL,
.au_count = ATOMIC_INIT(0),
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index 548240d..0d3dd36 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -23,8 +23,6 @@ struct unx_cred {
};
#define uc_uid uc_base.cr_uid

-#define UNX_WRITESLACK (21 + XDR_QUADLEN(UNX_MAXNODENAME))
-
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY RPCDBG_AUTH
#endif
@@ -228,8 +226,8 @@ const struct rpc_authops authunix_ops = {

static
struct rpc_auth unix_auth = {
- .au_cslack = UNX_WRITESLACK,
- .au_rslack = 2, /* assume AUTH_NULL verf */
+ .au_cslack = UNX_CALLSLACK,
+ .au_rslack = NUL_REPLYSLACK,
.au_ops = &authunix_ops,
.au_flavor = RPC_AUTH_UNIX,
.au_count = ATOMIC_INIT(0),


2016-02-18 17:44:11

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 03/10] svcrdma: svc_rdma_post_recv() should close connection on error

Clean up: Most svc_rdma_post_recv() call sites close the transport
connection when a receive cannot be posted. Wrap that in a common
helper.

Signed-off-by: Chuck Lever <[email protected]>
Reviewed-by: Devesh Sharma <[email protected]>
Tested-by: Devesh Sharma <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 1 +
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 11 ++---------
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 10 +---------
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 7 +------
net/sunrpc/xprtrdma/svc_rdma_transport.c | 15 +++++++++++++++
5 files changed, 20 insertions(+), 24 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 5322fea..d8fc58d 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -234,6 +234,7 @@ extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
enum rpcrdma_errcode);
extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
+extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 65a7c23..4498eaf 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -111,16 +111,9 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
if (ret)
goto out_err;

- /* Post a recv buffer to handle the reply for this request. */
- ret = svc_rdma_post_recv(rdma, GFP_NOIO);
- if (ret) {
- pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n",
- ret);
- pr_err("svcrdma: closing transport %p.\n", rdma);
- set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- ret = -ENOTCONN;
+ ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
+ if (ret)
goto out_err;
- }

ctxt = svc_rdma_get_context(rdma);
ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index c8b8a8b..acf15b8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -711,13 +711,5 @@ defer:
return 0;

repost:
- ret = svc_rdma_post_recv(rdma_xprt, GFP_KERNEL);
- if (ret) {
- pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
- ret);
- pr_err("svcrdma: closing transport %p.\n", rdma_xprt);
- set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags);
- ret = -ENOTCONN;
- }
- return ret;
+ return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 832eb54..39b3002 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -469,13 +469,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
int pages;
int ret;

- /* Post a recv buffer to handle another request. */
- ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
+ ret = svc_rdma_repost_recv(rdma, GFP_KERNEL);
if (ret) {
- printk(KERN_INFO
- "svcrdma: could not post a receive buffer, err=%d."
- "Closing transport %p.\n", ret, rdma);
- set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_rdma_put_context(ctxt, 0);
return -ENOTCONN;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 5763825..03fdfce 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -722,6 +722,21 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
return -ENOMEM;
}

+int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
+{
+ int ret = 0;
+
+ ret = svc_rdma_post_recv(xprt, flags);
+ if (ret) {
+ pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
+ ret);
+ pr_err("svcrdma: closing transport %p.\n", xprt);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ ret = -ENOTCONN;
+ }
+ return ret;
+}
+
/*
* This function handles the CONNECT_REQUEST event on a listening
* endpoint. It is passed the cma_id for the _new_ connection. The context in


2016-02-18 17:44:19

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 04/10] rpcrdma: Add RPCRDMA_HDRLEN_ERR

Error headers are shorter than either RDMA_MSG or RDMA_NOMSG.

Since HDRLEN_MIN is already used in several other places that would
be annoying to change, add RPCRDMA_HDRLEN_ERR for the one or two
spots where the shorter length is needed.

Signed-off-by: Chuck Lever <[email protected]>
Reviewed-by: Devesh Sharma <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 1 +
1 file changed, 1 insertion(+)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index f33c5a4..8c6d23c 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -102,6 +102,7 @@ struct rpcrdma_msg {
* Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
*/
#define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7)
+#define RPCRDMA_HDRLEN_ERR (sizeof(__be32) * 5)

enum rpcrdma_errcode {
ERR_VERS = 1,


2016-02-18 17:44:28

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 05/10] svcrdma: Make RDMA_ERROR messages work

Fix several issues with svc_rdma_send_error():

- Post a receive buffer to replace the one that was consumed by
the incoming request
- Posting a send should use DMA_TO_DEVICE, not DMA_FROM_DEVICE
- No need to put_page _and_ free pages in svc_rdma_put_context
- Make sure the sge is set up completely in case the error
path goes through svc_rdma_unmap_dma()
- Replace the use of ENOSYS, which has a reserved meaning

Related fixes in svc_rdma_recvfrom():

- Don't leak the ctxt associated with the incoming request
- Don't close the connection after sending an error reply
- Let svc_rdma_send_error() figure out the right header error code

As a last clean up, move svc_rdma_send_error() to svc_rdma_sendto.c
with other similar functions. There is some common logic in these
functions that could someday be combined to reduce code duplication.

Signed-off-by: Chuck Lever <[email protected]>
Reviewed-by: Devesh Sharma <[email protected]>
Tested-by: Devesh Sharma <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 4 +-
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 2 -
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 19 ++++-----
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 62 ++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 54 --------------------------
5 files changed, 74 insertions(+), 67 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index d8fc58d..0589918 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -228,11 +228,11 @@ extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *,
extern int svc_rdma_sendto(struct svc_rqst *);
extern struct rpcrdma_read_chunk *
svc_rdma_get_read_chunk(struct rpcrdma_msg *);
+extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
+ int);

/* svc_rdma_transport.c */
extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
-extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
- enum rpcrdma_errcode);
extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index e2fca76..f74fc52 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -162,7 +162,7 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
}

if (rmsgp->rm_vers != rpcrdma_version)
- return -ENOSYS;
+ return -EPROTONOSUPPORT;

/* Pull in the extra for the padded case and bump our pointer */
if (rmsgp->rm_type == rdma_msgp) {
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index acf15b8..0f09052 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -612,7 +612,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svc_rdma_op_ctxt *ctxt = NULL;
struct rpcrdma_msg *rmsgp;
int ret = 0;
- int len;

dprintk("svcrdma: rqstp=%p\n", rqstp);

@@ -654,15 +653,10 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);

/* Decode the RDMA header. */
- len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
- rqstp->rq_xprt_hlen = len;
-
- /* If the request is invalid, reply with an error */
- if (len < 0) {
- if (len == -ENOSYS)
- svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
- goto close_out;
- }
+ ret = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+ if (ret < 0)
+ goto out_err;
+ rqstp->rq_xprt_hlen = ret;

if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
@@ -698,6 +692,11 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
svc_xprt_copy_addrs(rqstp, xprt);
return ret;

+out_err:
+ svc_rdma_send_error(rdma_xprt, rmsgp, ret);
+ svc_rdma_put_context(ctxt, 0);
+ return 0;
+
close_out:
if (ctxt)
svc_rdma_put_context(ctxt, 1);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 39b3002..1917753 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -644,3 +644,65 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
svc_rdma_put_context(ctxt, 0);
return ret;
}
+
+void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ int status)
+{
+ struct ib_send_wr err_wr;
+ struct page *p;
+ struct svc_rdma_op_ctxt *ctxt;
+ enum rpcrdma_errcode err;
+ __be32 *va;
+ int length;
+ int ret;
+
+ ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
+ if (ret)
+ return;
+
+ p = alloc_page(GFP_KERNEL);
+ if (!p)
+ return;
+ va = page_address(p);
+
+ /* XDR encode an error reply */
+ err = ERR_CHUNK;
+ if (status == -EPROTONOSUPPORT)
+ err = ERR_VERS;
+ length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->direction = DMA_TO_DEVICE;
+ ctxt->count = 1;
+ ctxt->pages[0] = p;
+
+ /* Prepare SGE for local address */
+ ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
+ ctxt->sge[0].length = length;
+ ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
+ p, 0, length, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
+ dprintk("svcrdma: Error mapping buffer for protocol error\n");
+ svc_rdma_put_context(ctxt, 1);
+ return;
+ }
+ atomic_inc(&xprt->sc_dma_used);
+
+ /* Prepare SEND WR */
+ memset(&err_wr, 0, sizeof(err_wr));
+ ctxt->wr_op = IB_WR_SEND;
+ err_wr.wr_id = (unsigned long)ctxt;
+ err_wr.sg_list = ctxt->sge;
+ err_wr.num_sge = 1;
+ err_wr.opcode = IB_WR_SEND;
+ err_wr.send_flags = IB_SEND_SIGNALED;
+
+ /* Post It */
+ ret = svc_rdma_send(xprt, &err_wr);
+ if (ret) {
+ dprintk("svcrdma: Error %d posting send for protocol error\n",
+ ret);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
+ }
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 03fdfce..15c8fa3e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1433,57 +1433,3 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
}
return ret;
}
-
-void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
- enum rpcrdma_errcode err)
-{
- struct ib_send_wr err_wr;
- struct page *p;
- struct svc_rdma_op_ctxt *ctxt;
- __be32 *va;
- int length;
- int ret;
-
- p = alloc_page(GFP_KERNEL);
- if (!p)
- return;
- va = page_address(p);
-
- /* XDR encode error */
- length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
-
- ctxt = svc_rdma_get_context(xprt);
- ctxt->direction = DMA_FROM_DEVICE;
- ctxt->count = 1;
- ctxt->pages[0] = p;
-
- /* Prepare SGE for local address */
- ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
- p, 0, length, DMA_FROM_DEVICE);
- if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
- put_page(p);
- svc_rdma_put_context(ctxt, 1);
- return;
- }
- atomic_inc(&xprt->sc_dma_used);
- ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
- ctxt->sge[0].length = length;
-
- /* Prepare SEND WR */
- memset(&err_wr, 0, sizeof err_wr);
- ctxt->wr_op = IB_WR_SEND;
- err_wr.wr_id = (unsigned long)ctxt;
- err_wr.sg_list = ctxt->sge;
- err_wr.num_sge = 1;
- err_wr.opcode = IB_WR_SEND;
- err_wr.send_flags = IB_SEND_SIGNALED;
-
- /* Post It */
- ret = svc_rdma_send(xprt, &err_wr);
- if (ret) {
- dprintk("svcrdma: Error %d posting send for protocol error\n",
- ret);
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
- }
-}


2016-02-18 17:44:36

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 06/10] svcrdma: Use correct XID in error replies

When constructing an error reply, svc_rdma_xdr_encode_error()
needs to view the client's request message so it can get the
failing request's XID.

svc_rdma_xdr_decode_req() is supposed to return a pointer to the
client's request header. But if it fails to decode the client's
message (and thus an error reply is needed) it does not return the
pointer. The server then sends a bogus XID in the error reply.

Instead, unconditionally generate the pointer to the client's header
in svc_rdma_recvfrom(), and pass that pointer to both functions.

Signed-off-by: Chuck Lever <[email protected]>
Reviewed-by: Devesh Sharma <[email protected]>
Tested-by: Devesh Sharma <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 2 +-
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 7 +------
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 3 ++-
3 files changed, 4 insertions(+), 8 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 0589918..4ce7b74 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -199,7 +199,7 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
struct xdr_buf *rcvbuf);

/* svc_rdma_marshal.c */
-extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg **, struct svc_rqst *);
+extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg *, struct svc_rqst *);
extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
struct rpcrdma_msg *,
enum rpcrdma_errcode, __be32 *);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index f74fc52..b9ce01f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -145,15 +145,11 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
return (__be32 *)&ary->wc_array[nchunks];
}

-int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
- struct svc_rqst *rqstp)
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
{
- struct rpcrdma_msg *rmsgp = NULL;
__be32 *va, *vaend;
u32 hdr_len;

- rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
-
/* Verify that there's enough bytes for header + something */
if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
dprintk("svcrdma: header too short = %d\n",
@@ -201,7 +197,6 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
hdr_len = (unsigned long)va - (unsigned long)rmsgp;
rqstp->rq_arg.head[0].iov_len -= hdr_len;

- *rdma_req = rmsgp;
return hdr_len;
}

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 0f09052..8f68cb6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -653,7 +653,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);

/* Decode the RDMA header. */
- ret = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+ ret = svc_rdma_xdr_decode_req(rmsgp, rqstp);
if (ret < 0)
goto out_err;
rqstp->rq_xprt_hlen = ret;


2016-02-18 17:44:44

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 07/10] svcrdma: Hook up the logic to return ERR_CHUNK

RFC 5666 Section 4.2 states:

> When the peer detects an RPC-over-RDMA header version that it does
> not support (currently this document defines only version 1), it
> replies with an error code of ERR_VERS, and provides the low and
> high inclusive version numbers it does, in fact, support.

And:

> When other decoding errors are detected in the header or chunks,
> either an RPC decode error MAY be returned or the RPC/RDMA error
> code ERR_CHUNK MUST be returned.

The Linux NFS server does throw ERR_VERS when a client sends it
a request whose rdma_version is not "one." But it does not return
ERR_CHUNK when a header decoding error occurs. It just drops the
request.

To improve protocol extensibility, it should reject invalid values
in the rdma_proc field instead of treating them all like RDMA_MSG.
Otherwise clients can't detect when the server doesn't support
new rdma_proc values.

Signed-off-by: Chuck Lever <[email protected]>
Reviewed-by: Devesh Sharma <[email protected]>
Tested-by: Devesh Sharma <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 55 ++++++++++++++++++++++++-------
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 4 ++
2 files changed, 46 insertions(+), 13 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index b9ce01f..765bca4 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -148,22 +148,41 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
{
__be32 *va, *vaend;
+ unsigned int len;
u32 hdr_len;

/* Verify that there's enough bytes for header + something */
- if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+ if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_ERR) {
dprintk("svcrdma: header too short = %d\n",
rqstp->rq_arg.len);
return -EINVAL;
}

- if (rmsgp->rm_vers != rpcrdma_version)
+ if (rmsgp->rm_vers != rpcrdma_version) {
+ dprintk("%s: bad version %u\n", __func__,
+ be32_to_cpu(rmsgp->rm_vers));
return -EPROTONOSUPPORT;
+ }

- /* Pull in the extra for the padded case and bump our pointer */
- if (rmsgp->rm_type == rdma_msgp) {
- int hdrlen;
-
+ switch (be32_to_cpu(rmsgp->rm_type)) {
+ case RDMA_MSG:
+ case RDMA_NOMSG:
+ break;
+
+ case RDMA_DONE:
+ /* Just drop it */
+ dprintk("svcrdma: dropping RDMA_DONE message\n");
+ return 0;
+
+ case RDMA_ERROR:
+ /* Possible if this is a backchannel reply.
+ * XXX: We should cancel this XID, though.
+ */
+ dprintk("svcrdma: dropping RDMA_ERROR message\n");
+ return 0;
+
+ case RDMA_MSGP:
+ /* Pull in the extra for the padded case, bump our pointer */
rmsgp->rm_body.rm_padded.rm_align =
be32_to_cpu(rmsgp->rm_body.rm_padded.rm_align);
rmsgp->rm_body.rm_padded.rm_thresh =
@@ -171,11 +190,15 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)

va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
rqstp->rq_arg.head[0].iov_base = va;
- hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
- rqstp->rq_arg.head[0].iov_len -= hdrlen;
- if (hdrlen > rqstp->rq_arg.len)
+ len = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= len;
+ if (len > rqstp->rq_arg.len)
return -EINVAL;
- return hdrlen;
+ return len;
+ default:
+ dprintk("svcrdma: bad rdma procedure (%u)\n",
+ be32_to_cpu(rmsgp->rm_type));
+ return -EINVAL;
}

/* The chunk list may contain either a read chunk list or a write
@@ -184,14 +207,20 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
va = &rmsgp->rm_body.rm_chunks[0];
vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
va = decode_read_list(va, vaend);
- if (!va)
+ if (!va) {
+ dprintk("svcrdma: failed to decode read list\n");
return -EINVAL;
+ }
va = decode_write_list(va, vaend);
- if (!va)
+ if (!va) {
+ dprintk("svcrdma: failed to decode write list\n");
return -EINVAL;
+ }
va = decode_reply_array(va, vaend);
- if (!va)
+ if (!va) {
+ dprintk("svcrdma: failed to decode reply chunk\n");
return -EINVAL;
+ }

rqstp->rq_arg.head[0].iov_base = va;
hdr_len = (unsigned long)va - (unsigned long)rmsgp;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 8f68cb6..f8b840b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -657,6 +657,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
ret = svc_rdma_xdr_decode_req(rmsgp, rqstp);
if (ret < 0)
goto out_err;
+ if (ret == 0)
+ goto out_drop;
rqstp->rq_xprt_hlen = ret;

if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
@@ -710,6 +712,8 @@ out_err:
defer:
return 0;

+out_drop:
+ svc_rdma_put_context(ctxt, 1);
repost:
return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
}


2016-02-18 17:44:52

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 08/10] svcrdma: Remove close_out exit path

Clean up: close_out is reached only when ctxt == NULL and XPT_CLOSE
is already set.

Signed-off-by: Chuck Lever <[email protected]>
Reviewed-by: Devesh Sharma <[email protected]>
Tested-by: Devesh Sharma <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 12 +-----------
1 file changed, 1 insertion(+), 11 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index f8b840b..d3718e9 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -641,8 +641,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
* transport list
*/
if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
- goto close_out;
-
+ goto defer;
goto out;
}
dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
@@ -700,15 +699,6 @@ out_err:
svc_rdma_put_context(ctxt, 0);
return 0;

- close_out:
- if (ctxt)
- svc_rdma_put_context(ctxt, 1);
- dprintk("svcrdma: transport %p is closing\n", xprt);
- /*
- * Set the close bit and enqueue it. svc_recv will see the
- * close bit and call svc_xprt_delete
- */
- set_bit(XPT_CLOSE, &xprt->xpt_flags);
defer:
return 0;



2016-02-18 17:45:00

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 09/10] svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs

Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, svcrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

Because each ib_cqe carries a pointer to a completion method, the
core can now post operations on a consumer's QP, and handle the
completions itself.

svcrdma receive completions no longer use the dto_tasklet. Each
polled Receive WC is now handled individually in soft IRQ context.

The server transport's rdma_stat_rq_poll and rdma_stat_rq_prod
metrics are no longer updated.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 2
net/sunrpc/xprtrdma/svc_rdma_transport.c | 129 +++++++++---------------------
2 files changed, 40 insertions(+), 91 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 4ce7b74..7de9cbb 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -75,6 +75,7 @@ struct svc_rdma_op_ctxt {
struct svc_rdma_fastreg_mr *frmr;
int hdr_count;
struct xdr_buf arg;
+ struct ib_cqe cqe;
struct list_head dto_q;
enum ib_wr_opcode wr_op;
enum ib_wc_status wc_status;
@@ -174,7 +175,6 @@ struct svcxprt_rdma {
struct work_struct sc_work;
};
/* sc_flags */
-#define RDMAXPRT_RQ_PENDING 1
#define RDMAXPRT_SQ_PENDING 2
#define RDMAXPRT_CONN_PENDING 3

diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 15c8fa3e..5dfa1b6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -68,7 +68,6 @@ static void svc_rdma_detach(struct svc_xprt *xprt);
static void svc_rdma_free(struct svc_xprt *xprt);
static int svc_rdma_has_wspace(struct svc_xprt *xprt);
static int svc_rdma_secure_port(struct svc_rqst *);
-static void rq_cq_reap(struct svcxprt_rdma *xprt);
static void sq_cq_reap(struct svcxprt_rdma *xprt);

static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
@@ -413,7 +412,6 @@ static void dto_tasklet_func(unsigned long data)
list_del_init(&xprt->sc_dto_q);
spin_unlock_irqrestore(&dto_lock, flags);

- rq_cq_reap(xprt);
sq_cq_reap(xprt);

svc_xprt_put(&xprt->sc_xprt);
@@ -422,93 +420,48 @@ static void dto_tasklet_func(unsigned long data)
spin_unlock_irqrestore(&dto_lock, flags);
}

-/*
- * Receive Queue Completion Handler
- *
- * Since an RQ completion handler is called on interrupt context, we
- * need to defer the handling of the I/O to a tasklet
- */
-static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
-{
- struct svcxprt_rdma *xprt = cq_context;
- unsigned long flags;
-
- /* Guard against unconditional flush call for destroyed QP */
- if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
- return;
-
- /*
- * Set the bit regardless of whether or not it's on the list
- * because it may be on the list already due to an SQ
- * completion.
- */
- set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
-
- /*
- * If this transport is not already on the DTO transport queue,
- * add it
- */
- spin_lock_irqsave(&dto_lock, flags);
- if (list_empty(&xprt->sc_dto_q)) {
- svc_xprt_get(&xprt->sc_xprt);
- list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
- }
- spin_unlock_irqrestore(&dto_lock, flags);
-
- /* Tasklet does all the work to avoid irqsave locks. */
- tasklet_schedule(&dto_tasklet);
-}
-
-/*
- * rq_cq_reap - Process the RQ CQ.
+/**
+ * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
+ * @cq: completion queue
+ * @wc: completed WR
*
- * Take all completing WC off the CQE and enqueue the associated DTO
- * context on the dto_q for the transport.
- *
- * Note that caller must hold a transport reference.
*/
-static void rq_cq_reap(struct svcxprt_rdma *xprt)
+static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
{
- int ret;
- struct ib_wc wc;
- struct svc_rdma_op_ctxt *ctxt = NULL;
-
- if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
- return;
+ struct svcxprt_rdma *xprt = cq->cq_context;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;

- ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
- atomic_inc(&rdma_stat_rq_poll);
+ /* WARNING: Only wc->wr_cqe and wc->status are reliable */
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ ctxt->wc_status = wc->status;
+ svc_rdma_unmap_dma(ctxt);

- while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
- ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
- ctxt->wc_status = wc.status;
- ctxt->byte_len = wc.byte_len;
- svc_rdma_unmap_dma(ctxt);
- if (wc.status != IB_WC_SUCCESS) {
- /* Close the transport */
- dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- svc_rdma_put_context(ctxt, 1);
- svc_xprt_put(&xprt->sc_xprt);
- continue;
- }
- spin_lock_bh(&xprt->sc_rq_dto_lock);
- list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
- spin_unlock_bh(&xprt->sc_rq_dto_lock);
- svc_xprt_put(&xprt->sc_xprt);
- }
+ if (wc->status != IB_WC_SUCCESS)
+ goto flushed;

- if (ctxt)
- atomic_inc(&rdma_stat_rq_prod);
+ /* All wc fields are now known to be valid */
+ ctxt->byte_len = wc->byte_len;
+ spin_lock(&xprt->sc_rq_dto_lock);
+ list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ spin_unlock(&xprt->sc_rq_dto_lock);

set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- /*
- * If data arrived before established event,
- * don't enqueue. This defers RPC I/O until the
- * RDMA connection is complete.
- */
- if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
- svc_xprt_enqueue(&xprt->sc_xprt);
+ if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+ goto out;
+ svc_xprt_enqueue(&xprt->sc_xprt);
+ goto out;
+
+flushed:
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_warn("svcrdma: receive: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ svc_rdma_put_context(ctxt, 1);
+
+out:
+ svc_xprt_put(&xprt->sc_xprt);
}

/*
@@ -681,6 +634,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
ctxt = svc_rdma_get_context(xprt);
buflen = 0;
ctxt->direction = DMA_FROM_DEVICE;
+ ctxt->cqe.done = svc_rdma_wc_receive;
for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
if (sge_no >= xprt->sc_max_sge) {
pr_err("svcrdma: Too many sges (%d)\n", sge_no);
@@ -705,7 +659,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
recv_wr.next = NULL;
recv_wr.sg_list = &ctxt->sge[0];
recv_wr.num_sge = ctxt->count;
- recv_wr.wr_id = (u64)(unsigned long)ctxt;
+ recv_wr.wr_cqe = &ctxt->cqe;

svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
@@ -1094,12 +1048,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout;
}
- cq_attr.cqe = newxprt->sc_rq_depth;
- newxprt->sc_rq_cq = ib_create_cq(dev,
- rq_comp_handler,
- cq_event_handler,
- newxprt,
- &cq_attr);
+ newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(newxprt->sc_rq_cq)) {
dprintk("svcrdma: error creating RQ CQ for connect request\n");
goto errout;
@@ -1193,7 +1143,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
* miss the first message
*/
ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
- ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);

/* Accept Connection */
set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
@@ -1337,7 +1286,7 @@ static void __svc_rdma_free(struct work_struct *work)
ib_destroy_cq(rdma->sc_sq_cq);

if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
- ib_destroy_cq(rdma->sc_rq_cq);
+ ib_free_cq(rdma->sc_rq_cq);

if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
ib_dealloc_pd(rdma->sc_pd);


2016-02-18 17:45:09

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 10/10] svcrdma: Use new CQ API for RPC-over-RDMA server send CQs

Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, svcrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

This new API also aims each completion at a function that is
specific to the WR's opcode. Thus the ctxt->wr_op field and the
switch in process_context is replaced by a set of methods that
handle each completion type.

Because each ib_cqe carries a pointer to a completion method, the
core can now post operations on a consumer's QP, and handle the
completions itself.

The server's rdma_stat_sq_poll and rdma_stat_sq_prod metrics are no
longer updated.

As a clean up, the cq_event_handler, the dto_tasklet, and all
associated locking is removed, as they are no longer referenced or
used.

Signed-off-by: Chuck Lever <[email protected]>
Tested-by: Steve Wise <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 9 +
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 4
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 14 +-
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 12 +
net/sunrpc/xprtrdma/svc_rdma_transport.c | 259 +++++++++++-----------------
5 files changed, 121 insertions(+), 177 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 7de9cbb..5275969 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -76,8 +76,9 @@ struct svc_rdma_op_ctxt {
int hdr_count;
struct xdr_buf arg;
struct ib_cqe cqe;
+ struct ib_cqe reg_cqe;
+ struct ib_cqe inv_cqe;
struct list_head dto_q;
- enum ib_wr_opcode wr_op;
enum ib_wc_status wc_status;
u32 byte_len;
u32 position;
@@ -175,7 +176,6 @@ struct svcxprt_rdma {
struct work_struct sc_work;
};
/* sc_flags */
-#define RDMAXPRT_SQ_PENDING 2
#define RDMAXPRT_CONN_PENDING 3

#define RPCRDMA_LISTEN_BACKLOG 10
@@ -232,6 +232,11 @@ extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
int);

/* svc_rdma_transport.c */
+extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_write(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 4498eaf..64282ae 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -119,7 +119,6 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
ctxt->count = 1;

- ctxt->wr_op = IB_WR_SEND;
ctxt->direction = DMA_TO_DEVICE;
ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
ctxt->sge[0].length = sndbuf->len;
@@ -133,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
atomic_inc(&rdma->sc_dma_used);

memset(&send_wr, 0, sizeof(send_wr));
- send_wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_wc_send;
+ send_wr.wr_cqe = &ctxt->cqe;
send_wr.sg_list = ctxt->sge;
send_wr.num_sge = 1;
send_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index d3718e9..3b24a64 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);

memset(&read_wr, 0, sizeof(read_wr));
- read_wr.wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_wc_read;
+ read_wr.wr.wr_cqe = &ctxt->cqe;
read_wr.wr.opcode = IB_WR_RDMA_READ;
- ctxt->wr_op = read_wr.wr.opcode;
read_wr.wr.send_flags = IB_SEND_SIGNALED;
read_wr.rkey = rs_handle;
read_wr.remote_addr = rs_offset;
@@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
ctxt->read_hdr = head;

/* Prepare REG WR */
+ ctxt->reg_cqe.done = svc_rdma_wc_reg;
+ reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
reg_wr.wr.opcode = IB_WR_REG_MR;
- reg_wr.wr.wr_id = 0;
reg_wr.wr.send_flags = IB_SEND_SIGNALED;
reg_wr.wr.num_sge = 0;
reg_wr.mr = frmr->mr;
@@ -310,6 +311,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,

/* Prepare RDMA_READ */
memset(&read_wr, 0, sizeof(read_wr));
+ ctxt->cqe.done = svc_rdma_wc_read;
+ read_wr.wr.wr_cqe = &ctxt->cqe;
read_wr.wr.send_flags = IB_SEND_SIGNALED;
read_wr.rkey = rs_handle;
read_wr.remote_addr = rs_offset;
@@ -317,19 +320,18 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
read_wr.wr.num_sge = 1;
if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
- read_wr.wr.wr_id = (unsigned long)ctxt;
read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
} else {
read_wr.wr.opcode = IB_WR_RDMA_READ;
read_wr.wr.next = &inv_wr;
/* Prepare invalidate */
memset(&inv_wr, 0, sizeof(inv_wr));
- inv_wr.wr_id = (unsigned long)ctxt;
+ ctxt->inv_cqe.done = svc_rdma_wc_inv;
+ inv_wr.wr_cqe = &ctxt->inv_cqe;
inv_wr.opcode = IB_WR_LOCAL_INV;
inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
}
- ctxt->wr_op = read_wr.wr.opcode;

/* Post the chain */
ret = svc_rdma_send(xprt, &reg_wr.wr);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 1917753..ec2e365 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -286,8 +286,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,

/* Prepare WRITE WR */
memset(&write_wr, 0, sizeof write_wr);
- ctxt->wr_op = IB_WR_RDMA_WRITE;
- write_wr.wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_wc_write;
+ write_wr.wr.wr_cqe = &ctxt->cqe;
write_wr.wr.sg_list = &sge[0];
write_wr.wr.num_sge = sge_no;
write_wr.wr.opcode = IB_WR_RDMA_WRITE;
@@ -543,8 +543,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
goto err;
}
memset(&send_wr, 0, sizeof send_wr);
- ctxt->wr_op = IB_WR_SEND;
- send_wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_wc_send;
+ send_wr.wr_cqe = &ctxt->cqe;
send_wr.sg_list = ctxt->sge;
send_wr.num_sge = sge_no;
send_wr.opcode = IB_WR_SEND;
@@ -690,8 +690,8 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,

/* Prepare SEND WR */
memset(&err_wr, 0, sizeof(err_wr));
- ctxt->wr_op = IB_WR_SEND;
- err_wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_wc_send;
+ err_wr.wr_cqe = &ctxt->cqe;
err_wr.sg_list = ctxt->sge;
err_wr.num_sge = 1;
err_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 5dfa1b6..9066896 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -63,16 +63,10 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
int flags);
static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
static void svc_rdma_release_rqst(struct svc_rqst *);
-static void dto_tasklet_func(unsigned long data);
static void svc_rdma_detach(struct svc_xprt *xprt);
static void svc_rdma_free(struct svc_xprt *xprt);
static int svc_rdma_has_wspace(struct svc_xprt *xprt);
static int svc_rdma_secure_port(struct svc_rqst *);
-static void sq_cq_reap(struct svcxprt_rdma *xprt);
-
-static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
-static DEFINE_SPINLOCK(dto_lock);
-static LIST_HEAD(dto_xprt_q);

static struct svc_xprt_ops svc_rdma_ops = {
.xpo_create = svc_rdma_create,
@@ -351,15 +345,6 @@ static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
}
}

-/* ib_cq event handler */
-static void cq_event_handler(struct ib_event *event, void *context)
-{
- struct svc_xprt *xprt = context;
- dprintk("svcrdma: received CQ event %s (%d), context=%p\n",
- ib_event_msg(event->event), event->event, context);
- set_bit(XPT_CLOSE, &xprt->xpt_flags);
-}
-
/* QP event handler */
static void qp_event_handler(struct ib_event *event, void *context)
{
@@ -391,35 +376,6 @@ static void qp_event_handler(struct ib_event *event, void *context)
}
}

-/*
- * Data Transfer Operation Tasklet
- *
- * Walks a list of transports with I/O pending, removing entries as
- * they are added to the server's I/O pending list. Two bits indicate
- * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
- * spinlock that serializes access to the transport list with the RQ
- * and SQ interrupt handlers.
- */
-static void dto_tasklet_func(unsigned long data)
-{
- struct svcxprt_rdma *xprt;
- unsigned long flags;
-
- spin_lock_irqsave(&dto_lock, flags);
- while (!list_empty(&dto_xprt_q)) {
- xprt = list_entry(dto_xprt_q.next,
- struct svcxprt_rdma, sc_dto_q);
- list_del_init(&xprt->sc_dto_q);
- spin_unlock_irqrestore(&dto_lock, flags);
-
- sq_cq_reap(xprt);
-
- svc_xprt_put(&xprt->sc_xprt);
- spin_lock_irqsave(&dto_lock, flags);
- }
- spin_unlock_irqrestore(&dto_lock, flags);
-}
-
/**
* svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
* @cq: completion queue
@@ -464,132 +420,127 @@ out:
svc_xprt_put(&xprt->sc_xprt);
}

-/*
- * Process a completion context
- */
-static void process_context(struct svcxprt_rdma *xprt,
- struct svc_rdma_op_ctxt *ctxt)
+static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
+ struct ib_wc *wc,
+ const char *opname)
{
- struct svc_rdma_op_ctxt *read_hdr;
- int free_pages = 0;
-
- svc_rdma_unmap_dma(ctxt);
-
- switch (ctxt->wr_op) {
- case IB_WR_SEND:
- free_pages = 1;
- break;
+ if (wc->status != IB_WC_SUCCESS)
+ goto err;

- case IB_WR_RDMA_WRITE:
- break;
+out:
+ atomic_dec(&xprt->sc_sq_count);
+ wake_up(&xprt->sc_send_wait);
+ return;

- case IB_WR_RDMA_READ:
- case IB_WR_RDMA_READ_WITH_INV:
- svc_rdma_put_frmr(xprt, ctxt->frmr);
+err:
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("svcrdma: %s: %s (%u/0x%x)\n",
+ opname, ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ goto out;
+}

- if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags))
- break;
+static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
+ const char *opname)
+{
+ struct svcxprt_rdma *xprt = cq->cq_context;

- read_hdr = ctxt->read_hdr;
- svc_rdma_put_context(ctxt, 0);
+ svc_rdma_send_wc_common(xprt, wc, opname);
+ svc_xprt_put(&xprt->sc_xprt);
+}

- spin_lock_bh(&xprt->sc_rq_dto_lock);
- set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- list_add_tail(&read_hdr->dto_q,
- &xprt->sc_read_complete_q);
- spin_unlock_bh(&xprt->sc_rq_dto_lock);
- svc_xprt_enqueue(&xprt->sc_xprt);
- return;
+/**
+ * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;

- default:
- dprintk("svcrdma: unexpected completion opcode=%d\n",
- ctxt->wr_op);
- break;
- }
+ svc_rdma_send_wc_common_put(cq, wc, "send");

- svc_rdma_put_context(ctxt, free_pages);
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
}

-/*
- * Send Queue Completion Handler - potentially called on interrupt context.
+/**
+ * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
+ * @cq: completion queue
+ * @wc: completed WR
*
- * Note that caller must hold a transport reference.
*/
-static void sq_cq_reap(struct svcxprt_rdma *xprt)
+void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
{
- struct svc_rdma_op_ctxt *ctxt = NULL;
- struct ib_wc wc_a[6];
- struct ib_wc *wc;
- struct ib_cq *cq = xprt->sc_sq_cq;
- int ret;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;

- memset(wc_a, 0, sizeof(wc_a));
+ svc_rdma_send_wc_common_put(cq, wc, "write");

- if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
- return;
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 0);
+}

- ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
- atomic_inc(&rdma_stat_sq_poll);
- while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
- int i;
+/**
+ * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
+{
+ svc_rdma_send_wc_common_put(cq, wc, "fastreg");
+}

- for (i = 0; i < ret; i++) {
- wc = &wc_a[i];
- if (wc->status != IB_WC_SUCCESS) {
- dprintk("svcrdma: sq wc err status %s (%d)\n",
- ib_wc_status_msg(wc->status),
- wc->status);
+/**
+ * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct svcxprt_rdma *xprt = cq->cq_context;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;

- /* Close the transport */
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- }
+ svc_rdma_send_wc_common(xprt, wc, "read");

- /* Decrement used SQ WR count */
- atomic_dec(&xprt->sc_sq_count);
- wake_up(&xprt->sc_send_wait);
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_frmr(xprt, ctxt->frmr);

- ctxt = (struct svc_rdma_op_ctxt *)
- (unsigned long)wc->wr_id;
- if (ctxt)
- process_context(xprt, ctxt);
+ if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+ struct svc_rdma_op_ctxt *read_hdr;

- svc_xprt_put(&xprt->sc_xprt);
- }
+ read_hdr = ctxt->read_hdr;
+ spin_lock(&xprt->sc_rq_dto_lock);
+ list_add_tail(&read_hdr->dto_q,
+ &xprt->sc_read_complete_q);
+ spin_unlock(&xprt->sc_rq_dto_lock);
+
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&xprt->sc_xprt);
}

- if (ctxt)
- atomic_inc(&rdma_stat_sq_prod);
+ svc_rdma_put_context(ctxt, 0);
+ svc_xprt_put(&xprt->sc_xprt);
}

-static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+/**
+ * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
{
- struct svcxprt_rdma *xprt = cq_context;
- unsigned long flags;
-
- /* Guard against unconditional flush call for destroyed QP */
- if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
- return;
-
- /*
- * Set the bit regardless of whether or not it's on the list
- * because it may be on the list already due to an RQ
- * completion.
- */
- set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
-
- /*
- * If this transport is not already on the DTO transport queue,
- * add it
- */
- spin_lock_irqsave(&dto_lock, flags);
- if (list_empty(&xprt->sc_dto_q)) {
- svc_xprt_get(&xprt->sc_xprt);
- list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
- }
- spin_unlock_irqrestore(&dto_lock, flags);
-
- /* Tasklet does all the work to avoid irqsave locks. */
- tasklet_schedule(&dto_tasklet);
+ svc_rdma_send_wc_common_put(cq, wc, "localInv");
}

static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -980,7 +931,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
struct svcxprt_rdma *listen_rdma;
struct svcxprt_rdma *newxprt = NULL;
struct rdma_conn_param conn_param;
- struct ib_cq_init_attr cq_attr = {};
struct ib_qp_init_attr qp_attr;
struct ib_device *dev;
unsigned int i;
@@ -1038,12 +988,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dprintk("svcrdma: error creating PD for connect request\n");
goto errout;
}
- cq_attr.cqe = newxprt->sc_sq_depth;
- newxprt->sc_sq_cq = ib_create_cq(dev,
- sq_comp_handler,
- cq_event_handler,
- newxprt,
- &cq_attr);
+ newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(newxprt->sc_sq_cq)) {
dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout;
@@ -1138,12 +1084,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
/* Swap out the handler */
newxprt->sc_cm_id->event_handler = rdma_cma_handler;

- /*
- * Arm the CQs for the SQ and RQ before accepting so we can't
- * miss the first message
- */
- ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-
/* Accept Connection */
set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
memset(&conn_param, 0, sizeof conn_param);
@@ -1283,7 +1223,7 @@ static void __svc_rdma_free(struct work_struct *work)
ib_destroy_qp(rdma->sc_qp);

if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
- ib_destroy_cq(rdma->sc_sq_cq);
+ ib_free_cq(rdma->sc_sq_cq);

if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
ib_free_cq(rdma->sc_rq_cq);
@@ -1347,9 +1287,6 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
spin_unlock_bh(&xprt->sc_lock);
atomic_inc(&rdma_stat_sq_starve);

- /* See if we can opportunistically reap SQ WR to make room */
- sq_cq_reap(xprt);
-
/* Wait until SQ WR available if SQ still full */
wait_event(xprt->sc_send_wait,
atomic_read(&xprt->sc_sq_count) <


2016-02-18 17:53:52

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v3 02/10] nfsd: Lower NFSv4.1 callback message size limit


> On Feb 18, 2016, at 12:44 PM, Chuck Lever <[email protected]> wrote:
>
> The maximum size of a backchannel message on RPC-over-RDMA depends
> on the connection's inline threshold. Today that threshold is
> typically 1024 bytes, making the maximum message size 996 bytes.
>
> The Linux server's CREATE_SESSION operation checks that the size
> of callback Calls can be as large as 1044 bytes, to accommodate
> RPCSEC_GSS. Thus CREATE_SESSION fails if a client advertises the
> true message size maximum of 996 bytes.
>
> But the server's backchannel currently does not support RPCSEC_GSS.
> The actual maximum size it needs is much smaller. It is safe to
> reduce the limit to enable NFSv4.1 on RDMA backchannel operation.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> fs/nfsd/nfs4state.c | 24 ++++++++++++++++--------
> include/linux/sunrpc/auth.h | 7 +++++++
> net/sunrpc/auth_null.c | 4 ++--
> net/sunrpc/auth_unix.c | 6 ++----
> 4 files changed, 27 insertions(+), 14 deletions(-)
>
> diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
> index c484a2b..0f56e57 100644
> --- a/fs/nfsd/nfs4state.c
> +++ b/fs/nfsd/nfs4state.c
> @@ -2586,21 +2586,29 @@ static __be32 check_forechannel_attrs(struct nfsd4_channel_attrs *ca, struct nfs
> return nfs_ok;
> }
>
> +/*
> + * Server's NFSv4.1 backchannel support is AUTH_SYS-only for now.
> + * These are based on similar macros in linux/sunrpc/msg_prot.h .
> + */
> +#define RPC_MAX_HEADER_WITH_AUTH_SYS \
> + (RPC_CALLHDRSIZE + 2 * (2 + UNX_CALLSLACK))
> +
> +#define RPC_MAX_REPHEADER_WITH_AUTH_SYS \
> + (RPC_REPHDRSIZE + (2 + NUL_REPLYSLACK))
> +
> #define NFSD_CB_MAX_REQ_SZ ((NFS4_enc_cb_recall_sz + \
> - RPC_MAX_HEADER_WITH_AUTH) * sizeof(__be32))
> + RPC_MAX_HEADER_WITH_AUTH_SYS) * sizeof(__be32))
> #define NFSD_CB_MAX_RESP_SZ ((NFS4_dec_cb_recall_sz + \
> - RPC_MAX_REPHEADER_WITH_AUTH) * sizeof(__be32))
> + RPC_MAX_REPHEADER_WITH_AUTH_SYS) * \
> + sizeof(__be32))
>
> static __be32 check_backchannel_attrs(struct nfsd4_channel_attrs *ca)
> {
> ca->headerpadsz = 0;
>
> - /*
> - * These RPC_MAX_HEADER macros are overkill, especially since we
> - * don't even do gss on the backchannel yet. But this is still
> - * less than 1k. Tighten up this estimate in the unlikely event
> - * it turns out to be a problem for some client:
> - */
> + pr_err("%s: NFSD_CB_MAX_REQ_SZ=%lu, NFSD_CB_MAX_RESP_SZ=%lu\n",
> + __func__, NFSD_CB_MAX_REQ_SZ, NFSD_CB_MAX_RESP_SZ);
> +

Oops, forgot to remove this pr_err. That will be gone in the
next version of this patch.


> if (ca->maxreq_sz < NFSD_CB_MAX_REQ_SZ)
> return nfserr_toosmall;
> if (ca->maxresp_sz < NFSD_CB_MAX_RESP_SZ)
> diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
> index 1ecf13e..6a241a2 100644
> --- a/include/linux/sunrpc/auth.h
> +++ b/include/linux/sunrpc/auth.h
> @@ -21,10 +21,17 @@
> #include <linux/utsname.h>
>
> /*
> + * Maximum size of AUTH_NONE authentication information, in XDR words.
> + */
> +#define NUL_CALLSLACK (4)
> +#define NUL_REPLYSLACK (2)
> +
> +/*
> * Size of the nodename buffer. RFC1831 specifies a hard limit of 255 bytes,
> * but Linux hostnames are actually limited to __NEW_UTS_LEN bytes.
> */
> #define UNX_MAXNODENAME __NEW_UTS_LEN
> +#define UNX_CALLSLACK (21 + XDR_QUADLEN(UNX_MAXNODENAME))
>
> struct rpcsec_gss_info;
>
> diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c
> index c2a2b58..8d9eb4d 100644
> --- a/net/sunrpc/auth_null.c
> +++ b/net/sunrpc/auth_null.c
> @@ -113,8 +113,8 @@ const struct rpc_authops authnull_ops = {
>
> static
> struct rpc_auth null_auth = {
> - .au_cslack = 4,
> - .au_rslack = 2,
> + .au_cslack = NUL_CALLSLACK,
> + .au_rslack = NUL_REPLYSLACK,
> .au_ops = &authnull_ops,
> .au_flavor = RPC_AUTH_NULL,
> .au_count = ATOMIC_INIT(0),
> diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
> index 548240d..0d3dd36 100644
> --- a/net/sunrpc/auth_unix.c
> +++ b/net/sunrpc/auth_unix.c
> @@ -23,8 +23,6 @@ struct unx_cred {
> };
> #define uc_uid uc_base.cr_uid
>
> -#define UNX_WRITESLACK (21 + XDR_QUADLEN(UNX_MAXNODENAME))
> -
> #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
> # define RPCDBG_FACILITY RPCDBG_AUTH
> #endif
> @@ -228,8 +226,8 @@ const struct rpc_authops authunix_ops = {
>
> static
> struct rpc_auth unix_auth = {
> - .au_cslack = UNX_WRITESLACK,
> - .au_rslack = 2, /* assume AUTH_NULL verf */
> + .au_cslack = UNX_CALLSLACK,
> + .au_rslack = NUL_REPLYSLACK,
> .au_ops = &authunix_ops,
> .au_flavor = RPC_AUTH_UNIX,
> .au_count = ATOMIC_INIT(0),
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever





2016-02-22 18:32:12

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH v3 01/10] svcrdma: Do not write xdr_buf::tail in a Write chunk

On Thu, Feb 18, 2016 at 12:43:53PM -0500, Chuck Lever wrote:
> The fix is to send only the contents of the xdr_buf's page list in
> a Write chunk.

This isn't quite right either. A large-enough non-readlike reply will
use the page list if that's the only way to fit the reply.

You might be able to hit such cases, for example, using nfs4_getfacl on
a file with a sufficiently large ACL.

--b.

2016-02-22 18:35:11

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v3 01/10] svcrdma: Do not write xdr_buf::tail in a Write chunk


> On Feb 22, 2016, at 1:32 PM, [email protected] wrote:
>
> On Thu, Feb 18, 2016 at 12:43:53PM -0500, Chuck Lever wrote:
>> The fix is to send only the contents of the xdr_buf's page list in
>> a Write chunk.
>
> This isn't quite right either. A large-enough non-readlike reply will
> use the page list if that's the only way to fit the reply.
>
> You might be able to hit such cases, for example, using nfs4_getfacl on
> a file with a sufficiently large ACL.

Clients provide a write chunk only for READ and READLINK.
No way to hit this with GETFACL unless the client is doing
something wrong.

Anyway, I've got another revision of this work coming, this
time it should be a complete fix.

Thanks as always for your careful review!

--
Chuck Lever





2016-02-22 18:39:31

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v3 01/10] svcrdma: Do not write xdr_buf::tail in a Write chunk


> On Feb 22, 2016, at 1:35 PM, Chuck Lever <[email protected]> wrote:
>
>
>> On Feb 22, 2016, at 1:32 PM, [email protected] wrote:
>>
>> On Thu, Feb 18, 2016 at 12:43:53PM -0500, Chuck Lever wrote:
>>> The fix is to send only the contents of the xdr_buf's page list in
>>> a Write chunk.
>>
>> This isn't quite right either. A large-enough non-readlike reply will
>> use the page list if that's the only way to fit the reply.
>>
>> You might be able to hit such cases, for example, using nfs4_getfacl on
>> a file with a sufficiently large ACL.
>
> Clients provide a write chunk only for READ and READLINK.
> No way to hit this with GETFACL unless the client is doing
> something wrong.

OK, I see your point: and yes, I agree. The next revision
of the patch is more careful about trimming the page list
pad _only_ for the write chunk case, not for the reply case,
which is what you hit with GETFACL.


> Anyway, I've got another revision of this work coming, this
> time it should be a complete fix.
>
> Thanks as always for your careful review!
>
> --
> Chuck Lever
>
>
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever