2016-02-03 15:51:23

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 00/10] NFS/RDMA server patches for v4.6

For review.

- Fix XDR round-up in write chunks (take 3)
- Pre-allocate FRWRs when accepting a connection
- Add proper handling of bad header XDR
- Convert the server to use the new core CQ API added in v4.5.

Also available in the "nfsd-rdma-for-4.6" topic branch of this git
repo:

git://git.linux-nfs.org/projects/cel/cel-2.6.git

Or for browsing:

http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfsd-rdma-for-4.6

---

Chuck Lever (10):
svcrdma: Do not send XDR roundup bytes for a write chunk
svcrdma: Make svc_rdma_get_frmr() not sleep
svcrdma: svc_rdma_post_recv() should close connection on error
rpcrdma: Add missing XDR union fields for RDMA errors
svcrdma: Make RDMA_ERROR messages work
svcrdma: Use correct XID in error replies
svcrdma: Hook up the logic to return ERR_CHUNK
svcrdma: Remove close_out exit path
svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs
svcrdma: Use new CQ API for RPC-over-RDMA server send CQs


include/linux/sunrpc/rpc_rdma.h | 13 +
include/linux/sunrpc/svc_rdma.h | 18 +
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 15 -
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 62 ++--
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 61 +---
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 79 ++++-
net/sunrpc/xprtrdma/svc_rdma_transport.c | 488 +++++++++++-----------------
7 files changed, 346 insertions(+), 390 deletions(-)

--
Chuck Lever


2016-02-03 15:51:32

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 01/10] svcrdma: Do not send XDR roundup bytes for a write chunk

When the Linux server writes an odd-length data item into a Write
chunk, it finishes with XDR pad bytes. If the data item is smaller
than the Write chunk, the pad bytes are written at the end of the
data item, but still inside the chunk. That can expose these zero
bytes to the RPC consumer on the client.

XDR pad bytes are inserted in order to preserve the XDR alignment
of the next XDR data item in an XDR stream. But Write chunks do not
appear in the payload XDR stream, and only one data item is allowed
in each chunk. So XDR padding is unneeded.

Thus the server should not write XDR pad bytes in Write chunks.

I believe this is not an operational problem. Short NFS READs that
are returned in a Write chunk would be affected by this issue. But
they happen only when the read request goes past the EOF. Those are
zero bytes anyway, and there's no file data in the client's buffer
past EOF.

Otherwise, current NFS clients provide a separate extra segment for
catching XDR padding. If an odd-size data item fills the chunk,
then the XDR pad will be written to the extra segment.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index df57f3c..8591314 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -308,7 +308,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
struct svc_rqst *rqstp,
struct svc_rdma_req_map *vec)
{
- u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+ u32 xfer_len = rqstp->rq_res.page_len;
int write_len;
u32 xdr_off;
int chunk_off;


2016-02-03 15:51:40

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 02/10] svcrdma: Make svc_rdma_get_frmr() not sleep

Want to call it in a context that cannot sleep. So pre-alloc
the memory and the MRs.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_transport.c | 44 ++++++++++++++++++++++++++----
1 file changed, 38 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 5763825..02eee12 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -949,7 +949,7 @@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
return ERR_PTR(-ENOMEM);
}

-static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
+static void svc_rdma_destroy_frmrs(struct svcxprt_rdma *xprt)
{
struct svc_rdma_fastreg_mr *frmr;

@@ -963,6 +963,37 @@ static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
}
}

+static bool svc_rdma_prealloc_frmrs(struct svcxprt_rdma *xprt)
+{
+ struct ib_device *dev = xprt->sc_cm_id->device;
+ unsigned int i;
+
+ /* Pre-allocate based on the maximum amount of payload
+ * the server's HCA can handle per RDMA Read, to keep
+ * the number of MRs per connection in check.
+ *
+ * If a client sends small Read chunks (eg. it may be
+ * using physical registration), more RDMA Reads per
+ * NFS WRITE will be needed. svc_rdma_get_frmr() dips
+ * into its reserve in that case. Better would be for
+ * the server to reduce the connection's credit limit.
+ */
+ i = 1 + RPCSVC_MAXPAGES / dev->attrs.max_fast_reg_page_list_len;
+ i *= xprt->sc_max_requests;
+
+ while (i--) {
+ struct svc_rdma_fastreg_mr *frmr;
+
+ frmr = rdma_alloc_frmr(xprt);
+ if (!frmr) {
+ dprintk("svcrdma: No memory for request map\n");
+ return false;
+ }
+ list_add(&frmr->frmr_list, &xprt->sc_frmr_q);
+ }
+ return true;
+}
+
struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
{
struct svc_rdma_fastreg_mr *frmr = NULL;
@@ -975,10 +1006,9 @@ struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
frmr->sg_nents = 0;
}
spin_unlock_bh(&rdma->sc_frmr_q_lock);
- if (frmr)
- return frmr;
-
- return rdma_alloc_frmr(rdma);
+ if (!frmr)
+ return ERR_PTR(-ENOMEM);
+ return frmr;
}

void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
@@ -1149,6 +1179,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dev->attrs.max_fast_reg_page_list_len;
newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
newxprt->sc_reader = rdma_read_chunk_frmr;
+ if (!svc_rdma_prealloc_frmrs(newxprt))
+ goto errout;
}

/*
@@ -1310,7 +1342,7 @@ static void __svc_rdma_free(struct work_struct *work)
xprt->xpt_bc_xprt = NULL;
}

- rdma_dealloc_frmr_q(rdma);
+ svc_rdma_destroy_frmrs(rdma);
svc_rdma_destroy_ctxts(rdma);
svc_rdma_destroy_maps(rdma);



2016-02-03 15:51:56

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 04/10] rpcrdma: Add missing XDR union fields for RDMA errors

Add some infrastructure (shared between xprtrdma and svcrdma) for
constructing and parsing RPC-over-RDMA error status information.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index f33c5a4..a5a36c0 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -93,6 +93,12 @@ struct rpcrdma_msg {
__be32 rm_pempty[3]; /* 3 empty chunk lists */
} rm_padded;

+ struct {
+ __be32 rm_err;
+ __be32 rm_vers_low;
+ __be32 rm_vers_hi;
+ } rm_error;
+
__be32 rm_chunks[0]; /* read, write and reply chunks */

} rm_body;
@@ -102,12 +108,16 @@ struct rpcrdma_msg {
* Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
*/
#define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7)
+#define RPCRDMA_HDRLEN_ERR (sizeof(__be32) * 5)

enum rpcrdma_errcode {
ERR_VERS = 1,
ERR_CHUNK = 2
};

+#define err_vers cpu_to_be32(ERR_VERS)
+#define err_chunk cpu_to_be32(ERR_CHUNK)
+
struct rpcrdma_err_vers {
uint32_t rdma_vers_low; /* Version range supported by peer */
uint32_t rdma_vers_high;
@@ -118,7 +128,8 @@ enum rpcrdma_proc {
RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
RDMA_MSGP = 2, /* An RPC call or reply msg with padding */
RDMA_DONE = 3, /* Client signals reply completion */
- RDMA_ERROR = 4 /* An RPC RDMA encoding error */
+ RDMA_ERROR = 4, /* An RPC RDMA encoding error */
+ RDMA_LAST
};

#define rdma_msg cpu_to_be32(RDMA_MSG)


2016-02-03 15:52:04

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 05/10] svcrdma: Make RDMA_ERROR messages work

Fix several issues with svc_rdma_send_error():

- Post a receive buffer to replace the one that was consumed by
the incoming request
- Posting a send should use DMA_TO_DEVICE, not DMA_FROM_DEVICE
- No need to put_page _and_ free pages in svc_rdma_put_context
- Make sure the sge is set up completely in case the error
path goes through svc_rdma_unmap_dma()

Related fixes in svc_rdma_recvfrom():

- Don't leak the ctxt associated with the incoming request
- Don't close the connection after sending an error reply
- Let svc_rdma_send_error() figure out the right header error code

As a last clean up, move svc_rdma_send_error() to svc_rdma_sendto.c
with other similar functions. There is some common logic in these
functions that could someday be combined to reduce code duplication.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 4 +-
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 19 ++++-----
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 62 ++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 54 --------------------------
4 files changed, 73 insertions(+), 66 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index d8fc58d..0589918 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -228,11 +228,11 @@ extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *,
extern int svc_rdma_sendto(struct svc_rqst *);
extern struct rpcrdma_read_chunk *
svc_rdma_get_read_chunk(struct rpcrdma_msg *);
+extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
+ int);

/* svc_rdma_transport.c */
extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
-extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
- enum rpcrdma_errcode);
extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index acf15b8..0f09052 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -612,7 +612,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svc_rdma_op_ctxt *ctxt = NULL;
struct rpcrdma_msg *rmsgp;
int ret = 0;
- int len;

dprintk("svcrdma: rqstp=%p\n", rqstp);

@@ -654,15 +653,10 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);

/* Decode the RDMA header. */
- len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
- rqstp->rq_xprt_hlen = len;
-
- /* If the request is invalid, reply with an error */
- if (len < 0) {
- if (len == -ENOSYS)
- svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
- goto close_out;
- }
+ ret = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+ if (ret < 0)
+ goto out_err;
+ rqstp->rq_xprt_hlen = ret;

if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
@@ -698,6 +692,11 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
svc_xprt_copy_addrs(rqstp, xprt);
return ret;

+out_err:
+ svc_rdma_send_error(rdma_xprt, rmsgp, ret);
+ svc_rdma_put_context(ctxt, 0);
+ return 0;
+
close_out:
if (ctxt)
svc_rdma_put_context(ctxt, 1);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 687a91afe..73793dd 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -639,3 +639,65 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
svc_rdma_put_context(ctxt, 0);
return ret;
}
+
+void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ int status)
+{
+ struct ib_send_wr err_wr;
+ struct page *p;
+ struct svc_rdma_op_ctxt *ctxt;
+ enum rpcrdma_errcode err;
+ __be32 *va;
+ int length;
+ int ret;
+
+ ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
+ if (ret)
+ return;
+
+ p = alloc_page(GFP_KERNEL);
+ if (!p)
+ return;
+ va = page_address(p);
+
+ /* XDR encode an error reply */
+ err = ERR_CHUNK;
+ if (status == -ENOSYS)
+ err = ERR_VERS;
+ length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->direction = DMA_TO_DEVICE;
+ ctxt->count = 1;
+ ctxt->pages[0] = p;
+
+ /* Prepare SGE for local address */
+ ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
+ ctxt->sge[0].length = length;
+ ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
+ p, 0, length, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
+ dprintk("svcrdma: Error mapping buffer for protocol error\n");
+ svc_rdma_put_context(ctxt, 1);
+ return;
+ }
+ atomic_inc(&xprt->sc_dma_used);
+
+ /* Prepare SEND WR */
+ memset(&err_wr, 0, sizeof err_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ err_wr.wr_id = (unsigned long)ctxt;
+ err_wr.sg_list = ctxt->sge;
+ err_wr.num_sge = 1;
+ err_wr.opcode = IB_WR_SEND;
+ err_wr.send_flags = IB_SEND_SIGNALED;
+
+ /* Post It */
+ ret = svc_rdma_send(xprt, &err_wr);
+ if (ret) {
+ dprintk("svcrdma: Error %d posting send for protocol error\n",
+ ret);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
+ }
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index da82e36..e7bda1e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1465,57 +1465,3 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
}
return ret;
}
-
-void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
- enum rpcrdma_errcode err)
-{
- struct ib_send_wr err_wr;
- struct page *p;
- struct svc_rdma_op_ctxt *ctxt;
- __be32 *va;
- int length;
- int ret;
-
- p = alloc_page(GFP_KERNEL);
- if (!p)
- return;
- va = page_address(p);
-
- /* XDR encode error */
- length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
-
- ctxt = svc_rdma_get_context(xprt);
- ctxt->direction = DMA_FROM_DEVICE;
- ctxt->count = 1;
- ctxt->pages[0] = p;
-
- /* Prepare SGE for local address */
- ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
- p, 0, length, DMA_FROM_DEVICE);
- if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
- put_page(p);
- svc_rdma_put_context(ctxt, 1);
- return;
- }
- atomic_inc(&xprt->sc_dma_used);
- ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
- ctxt->sge[0].length = length;
-
- /* Prepare SEND WR */
- memset(&err_wr, 0, sizeof err_wr);
- ctxt->wr_op = IB_WR_SEND;
- err_wr.wr_id = (unsigned long)ctxt;
- err_wr.sg_list = ctxt->sge;
- err_wr.num_sge = 1;
- err_wr.opcode = IB_WR_SEND;
- err_wr.send_flags = IB_SEND_SIGNALED;
-
- /* Post It */
- ret = svc_rdma_send(xprt, &err_wr);
- if (ret) {
- dprintk("svcrdma: Error %d posting send for protocol error\n",
- ret);
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
- }
-}


2016-02-03 15:52:13

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 06/10] svcrdma: Use correct XID in error replies

When constructing an error reply, svc_rdma_xdr_encode_error()
needs to view the client's request message so it can get the
failing request's XID.

svc_rdma_xdr_decode_req() is supposed to return a pointer to the
client's request header. But if it fails to decode the client's
message (and thus an error reply is needed) it does not return the
pointer. The server then sends a bogus XID in the error reply.

Instead, unconditionally generate the pointer to the client's header
in svc_rdma_recvfrom(), and pass that pointer to both functions.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 2 +-
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 7 +------
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 3 ++-
3 files changed, 4 insertions(+), 8 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 0589918..4ce7b74 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -199,7 +199,7 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
struct xdr_buf *rcvbuf);

/* svc_rdma_marshal.c */
-extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg **, struct svc_rqst *);
+extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg *, struct svc_rqst *);
extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
struct rpcrdma_msg *,
enum rpcrdma_errcode, __be32 *);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index e2fca76..c011b12 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -145,15 +145,11 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
return (__be32 *)&ary->wc_array[nchunks];
}

-int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
- struct svc_rqst *rqstp)
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
{
- struct rpcrdma_msg *rmsgp = NULL;
__be32 *va, *vaend;
u32 hdr_len;

- rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
-
/* Verify that there's enough bytes for header + something */
if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
dprintk("svcrdma: header too short = %d\n",
@@ -201,7 +197,6 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
hdr_len = (unsigned long)va - (unsigned long)rmsgp;
rqstp->rq_arg.head[0].iov_len -= hdr_len;

- *rdma_req = rmsgp;
return hdr_len;
}

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 0f09052..8f68cb6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -653,7 +653,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);

/* Decode the RDMA header. */
- ret = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+ ret = svc_rdma_xdr_decode_req(rmsgp, rqstp);
if (ret < 0)
goto out_err;
rqstp->rq_xprt_hlen = ret;


2016-02-03 15:52:29

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 08/10] svcrdma: Remove close_out exit path

Clean up: close_out is reached only when ctxt == NULL and XPT_CLOSE
is already set.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 12 +-----------
1 file changed, 1 insertion(+), 11 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index f8b840b..d3718e9 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -641,8 +641,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
* transport list
*/
if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
- goto close_out;
-
+ goto defer;
goto out;
}
dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
@@ -700,15 +699,6 @@ out_err:
svc_rdma_put_context(ctxt, 0);
return 0;

- close_out:
- if (ctxt)
- svc_rdma_put_context(ctxt, 1);
- dprintk("svcrdma: transport %p is closing\n", xprt);
- /*
- * Set the close bit and enqueue it. svc_recv will see the
- * close bit and call svc_xprt_delete
- */
- set_bit(XPT_CLOSE, &xprt->xpt_flags);
defer:
return 0;



2016-02-03 15:52:37

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 09/10] svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs

Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, svcrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

Receive completions no longer use the dto_tasklet. Each polled
Receive WC is now handled individually in soft IRQ context.

The server transport's rdma_stat_rq_poll and rdma_stat_rq_prod
metrics are no longer updated.

As far as I can tell, ib_alloc_cq() does not enable a consumer to
specify a callback for handling async CQ events. This may cause
a problem with handling spurious connection loss, though I think
Receive completion flushing should be enough to force the server
to clean up properly anyway.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 2
net/sunrpc/xprtrdma/svc_rdma_transport.c | 130 +++++++++---------------------
2 files changed, 41 insertions(+), 91 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 4ce7b74..7de9cbb 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -75,6 +75,7 @@ struct svc_rdma_op_ctxt {
struct svc_rdma_fastreg_mr *frmr;
int hdr_count;
struct xdr_buf arg;
+ struct ib_cqe cqe;
struct list_head dto_q;
enum ib_wr_opcode wr_op;
enum ib_wc_status wc_status;
@@ -174,7 +175,6 @@ struct svcxprt_rdma {
struct work_struct sc_work;
};
/* sc_flags */
-#define RDMAXPRT_RQ_PENDING 1
#define RDMAXPRT_SQ_PENDING 2
#define RDMAXPRT_CONN_PENDING 3

diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index e7bda1e..316df77 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -68,7 +68,6 @@ static void svc_rdma_detach(struct svc_xprt *xprt);
static void svc_rdma_free(struct svc_xprt *xprt);
static int svc_rdma_has_wspace(struct svc_xprt *xprt);
static int svc_rdma_secure_port(struct svc_rqst *);
-static void rq_cq_reap(struct svcxprt_rdma *xprt);
static void sq_cq_reap(struct svcxprt_rdma *xprt);

static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
@@ -413,7 +412,6 @@ static void dto_tasklet_func(unsigned long data)
list_del_init(&xprt->sc_dto_q);
spin_unlock_irqrestore(&dto_lock, flags);

- rq_cq_reap(xprt);
sq_cq_reap(xprt);

svc_xprt_put(&xprt->sc_xprt);
@@ -422,93 +420,49 @@ static void dto_tasklet_func(unsigned long data)
spin_unlock_irqrestore(&dto_lock, flags);
}

-/*
- * Receive Queue Completion Handler
- *
- * Since an RQ completion handler is called on interrupt context, we
- * need to defer the handling of the I/O to a tasklet
- */
-static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
-{
- struct svcxprt_rdma *xprt = cq_context;
- unsigned long flags;
-
- /* Guard against unconditional flush call for destroyed QP */
- if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
- return;
-
- /*
- * Set the bit regardless of whether or not it's on the list
- * because it may be on the list already due to an SQ
- * completion.
- */
- set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
-
- /*
- * If this transport is not already on the DTO transport queue,
- * add it
- */
- spin_lock_irqsave(&dto_lock, flags);
- if (list_empty(&xprt->sc_dto_q)) {
- svc_xprt_get(&xprt->sc_xprt);
- list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
- }
- spin_unlock_irqrestore(&dto_lock, flags);
-
- /* Tasklet does all the work to avoid irqsave locks. */
- tasklet_schedule(&dto_tasklet);
-}
-
-/*
- * rq_cq_reap - Process the RQ CQ.
+/**
+ * svc_rdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * @cq: completion queue
+ * @wc: completed WR
*
- * Take all completing WC off the CQE and enqueue the associated DTO
- * context on the dto_q for the transport.
- *
- * Note that caller must hold a transport reference.
*/
-static void rq_cq_reap(struct svcxprt_rdma *xprt)
+static void svc_rdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
{
- int ret;
- struct ib_wc wc;
- struct svc_rdma_op_ctxt *ctxt = NULL;
-
- if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
- return;
+ struct svcxprt_rdma *xprt = cq->cq_context;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;

- ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
- atomic_inc(&rdma_stat_rq_poll);
+ /* WARNING: Only wc->wr_cqe and wc->status *
+ * are reliable at this point. */
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ ctxt->wc_status = wc->status;
+ svc_rdma_unmap_dma(ctxt);

- while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
- ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
- ctxt->wc_status = wc.status;
- ctxt->byte_len = wc.byte_len;
- svc_rdma_unmap_dma(ctxt);
- if (wc.status != IB_WC_SUCCESS) {
- /* Close the transport */
- dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- svc_rdma_put_context(ctxt, 1);
- svc_xprt_put(&xprt->sc_xprt);
- continue;
- }
- spin_lock_bh(&xprt->sc_rq_dto_lock);
- list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
- spin_unlock_bh(&xprt->sc_rq_dto_lock);
- svc_xprt_put(&xprt->sc_xprt);
- }
+ if (wc->status != IB_WC_SUCCESS)
+ goto flushed;

- if (ctxt)
- atomic_inc(&rdma_stat_rq_prod);
+ /* All wc fields are now known to be valid */
+ ctxt->byte_len = wc->byte_len;
+ spin_lock(&xprt->sc_rq_dto_lock);
+ list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ spin_unlock(&xprt->sc_rq_dto_lock);

set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- /*
- * If data arrived before established event,
- * don't enqueue. This defers RPC I/O until the
- * RDMA connection is complete.
- */
- if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
- svc_xprt_enqueue(&xprt->sc_xprt);
+ if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+ goto out;
+ svc_xprt_enqueue(&xprt->sc_xprt);
+ goto out;
+
+flushed:
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_warn("svcrdma: Recv completion: %s (%u, vendor %u)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ svc_rdma_put_context(ctxt, 1);
+
+out:
+ svc_xprt_put(&xprt->sc_xprt);
}

/*
@@ -681,6 +635,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
ctxt = svc_rdma_get_context(xprt);
buflen = 0;
ctxt->direction = DMA_FROM_DEVICE;
+ ctxt->cqe.done = svc_rdma_receive_wc;
for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
if (sge_no >= xprt->sc_max_sge) {
pr_err("svcrdma: Too many sges (%d)\n", sge_no);
@@ -705,7 +660,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
recv_wr.next = NULL;
recv_wr.sg_list = &ctxt->sge[0];
recv_wr.num_sge = ctxt->count;
- recv_wr.wr_id = (u64)(unsigned long)ctxt;
+ recv_wr.wr_cqe = &ctxt->cqe;

svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
@@ -1124,12 +1079,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout;
}
- cq_attr.cqe = newxprt->sc_rq_depth;
- newxprt->sc_rq_cq = ib_create_cq(dev,
- rq_comp_handler,
- cq_event_handler,
- newxprt,
- &cq_attr);
+ newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(newxprt->sc_rq_cq)) {
dprintk("svcrdma: error creating RQ CQ for connect request\n");
goto errout;
@@ -1225,7 +1176,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
* miss the first message
*/
ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
- ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);

/* Accept Connection */
set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
@@ -1369,7 +1319,7 @@ static void __svc_rdma_free(struct work_struct *work)
ib_destroy_cq(rdma->sc_sq_cq);

if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
- ib_destroy_cq(rdma->sc_rq_cq);
+ ib_free_cq(rdma->sc_rq_cq);

if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
ib_dealloc_pd(rdma->sc_pd);


2016-02-03 15:52:45

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 10/10] svcrdma: Use new CQ API for RPC-over-RDMA server send CQs

Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, svcrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

This new API also aims each completion at a function that is
specific to the WR's opcode. Thus the ctxt->wr_op field and the
switch in process_context is replaced by a set of methods that
handle each completion type.

The server's rdma_stat_sq_poll and rdma_stat_sq_prod metrics are no
longer updated.

As a clean up, the cq_event_handler, the dto_tasklet, and all
associated locking is removed, as they are no longer referenced or
used.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 9 +
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 4
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 15 +-
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 12 +
net/sunrpc/xprtrdma/svc_rdma_transport.c | 257 ++++++++++------------------
5 files changed, 119 insertions(+), 178 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 7de9cbb..8af9e3b 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -76,8 +76,9 @@ struct svc_rdma_op_ctxt {
int hdr_count;
struct xdr_buf arg;
struct ib_cqe cqe;
+ struct ib_cqe reg_cqe;
+ struct ib_cqe inv_cqe;
struct list_head dto_q;
- enum ib_wr_opcode wr_op;
enum ib_wc_status wc_status;
u32 byte_len;
u32 position;
@@ -175,7 +176,6 @@ struct svcxprt_rdma {
struct work_struct sc_work;
};
/* sc_flags */
-#define RDMAXPRT_SQ_PENDING 2
#define RDMAXPRT_CONN_PENDING 3

#define RPCRDMA_LISTEN_BACKLOG 10
@@ -232,6 +232,11 @@ extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
int);

/* svc_rdma_transport.c */
+extern void svc_rdma_send_wc(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_write_wc(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_reg_wc(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_read_wc(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_inv_wc(struct ib_cq *, struct ib_wc *);
extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 4498eaf..512590a 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -119,7 +119,6 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
ctxt->count = 1;

- ctxt->wr_op = IB_WR_SEND;
ctxt->direction = DMA_TO_DEVICE;
ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
ctxt->sge[0].length = sndbuf->len;
@@ -133,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
atomic_inc(&rdma->sc_dma_used);

memset(&send_wr, 0, sizeof(send_wr));
- send_wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_send_wc;
+ send_wr.wr_cqe = &ctxt->cqe;
send_wr.sg_list = ctxt->sge;
send_wr.num_sge = 1;
send_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index d3718e9..1a318fc 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);

memset(&read_wr, 0, sizeof(read_wr));
- read_wr.wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_read_wc;
+ read_wr.wr.wr_cqe = &ctxt->cqe;
read_wr.wr.opcode = IB_WR_RDMA_READ;
- ctxt->wr_op = read_wr.wr.opcode;
read_wr.wr.send_flags = IB_SEND_SIGNALED;
read_wr.rkey = rs_handle;
read_wr.remote_addr = rs_offset;
@@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
ctxt->read_hdr = head;

/* Prepare REG WR */
+ ctxt->reg_cqe.done = svc_rdma_reg_wc;
+ reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
reg_wr.wr.opcode = IB_WR_REG_MR;
- reg_wr.wr.wr_id = 0;
reg_wr.wr.send_flags = IB_SEND_SIGNALED;
reg_wr.wr.num_sge = 0;
reg_wr.mr = frmr->mr;
@@ -310,26 +311,26 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,

/* Prepare RDMA_READ */
memset(&read_wr, 0, sizeof(read_wr));
+ ctxt->cqe.done = svc_rdma_read_wc;
+ read_wr.wr.wr_cqe = &ctxt->cqe;
read_wr.wr.send_flags = IB_SEND_SIGNALED;
read_wr.rkey = rs_handle;
read_wr.remote_addr = rs_offset;
read_wr.wr.sg_list = ctxt->sge;
read_wr.wr.num_sge = 1;
if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
- read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
- read_wr.wr.wr_id = (unsigned long)ctxt;
read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
} else {
read_wr.wr.opcode = IB_WR_RDMA_READ;
read_wr.wr.next = &inv_wr;
/* Prepare invalidate */
memset(&inv_wr, 0, sizeof(inv_wr));
- inv_wr.wr_id = (unsigned long)ctxt;
+ ctxt->inv_cqe.done = svc_rdma_inv_wc;
+ inv_wr.wr_cqe = &ctxt->inv_cqe;
inv_wr.opcode = IB_WR_LOCAL_INV;
inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
}
- ctxt->wr_op = read_wr.wr.opcode;

/* Post the chain */
ret = svc_rdma_send(xprt, &reg_wr.wr);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 73793dd..9d21205 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -281,8 +281,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,

/* Prepare WRITE WR */
memset(&write_wr, 0, sizeof write_wr);
- ctxt->wr_op = IB_WR_RDMA_WRITE;
- write_wr.wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_write_wc;
+ write_wr.wr.wr_cqe = &ctxt->cqe;
write_wr.wr.sg_list = &sge[0];
write_wr.wr.num_sge = sge_no;
write_wr.wr.opcode = IB_WR_RDMA_WRITE;
@@ -538,8 +538,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
goto err;
}
memset(&send_wr, 0, sizeof send_wr);
- ctxt->wr_op = IB_WR_SEND;
- send_wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_send_wc;
+ send_wr.wr_cqe = &ctxt->cqe;
send_wr.sg_list = ctxt->sge;
send_wr.num_sge = sge_no;
send_wr.opcode = IB_WR_SEND;
@@ -685,8 +685,8 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,

/* Prepare SEND WR */
memset(&err_wr, 0, sizeof err_wr);
- ctxt->wr_op = IB_WR_SEND;
- err_wr.wr_id = (unsigned long)ctxt;
+ ctxt->cqe.done = svc_rdma_send_wc;
+ err_wr.wr_cqe = &ctxt->cqe;
err_wr.sg_list = ctxt->sge;
err_wr.num_sge = 1;
err_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 316df77..bab80f8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -63,16 +63,10 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
int flags);
static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
static void svc_rdma_release_rqst(struct svc_rqst *);
-static void dto_tasklet_func(unsigned long data);
static void svc_rdma_detach(struct svc_xprt *xprt);
static void svc_rdma_free(struct svc_xprt *xprt);
static int svc_rdma_has_wspace(struct svc_xprt *xprt);
static int svc_rdma_secure_port(struct svc_rqst *);
-static void sq_cq_reap(struct svcxprt_rdma *xprt);
-
-static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
-static DEFINE_SPINLOCK(dto_lock);
-static LIST_HEAD(dto_xprt_q);

static struct svc_xprt_ops svc_rdma_ops = {
.xpo_create = svc_rdma_create,
@@ -351,15 +345,6 @@ static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
}
}

-/* ib_cq event handler */
-static void cq_event_handler(struct ib_event *event, void *context)
-{
- struct svc_xprt *xprt = context;
- dprintk("svcrdma: received CQ event %s (%d), context=%p\n",
- ib_event_msg(event->event), event->event, context);
- set_bit(XPT_CLOSE, &xprt->xpt_flags);
-}
-
/* QP event handler */
static void qp_event_handler(struct ib_event *event, void *context)
{
@@ -391,35 +376,6 @@ static void qp_event_handler(struct ib_event *event, void *context)
}
}

-/*
- * Data Transfer Operation Tasklet
- *
- * Walks a list of transports with I/O pending, removing entries as
- * they are added to the server's I/O pending list. Two bits indicate
- * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
- * spinlock that serializes access to the transport list with the RQ
- * and SQ interrupt handlers.
- */
-static void dto_tasklet_func(unsigned long data)
-{
- struct svcxprt_rdma *xprt;
- unsigned long flags;
-
- spin_lock_irqsave(&dto_lock, flags);
- while (!list_empty(&dto_xprt_q)) {
- xprt = list_entry(dto_xprt_q.next,
- struct svcxprt_rdma, sc_dto_q);
- list_del_init(&xprt->sc_dto_q);
- spin_unlock_irqrestore(&dto_lock, flags);
-
- sq_cq_reap(xprt);
-
- svc_xprt_put(&xprt->sc_xprt);
- spin_lock_irqsave(&dto_lock, flags);
- }
- spin_unlock_irqrestore(&dto_lock, flags);
-}
-
/**
* svc_rdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
* @cq: completion queue
@@ -465,132 +421,125 @@ out:
svc_xprt_put(&xprt->sc_xprt);
}

-/*
- * Process a completion context
- */
-static void process_context(struct svcxprt_rdma *xprt,
- struct svc_rdma_op_ctxt *ctxt)
+static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
+ struct ib_wc *wc)
{
- struct svc_rdma_op_ctxt *read_hdr;
- int free_pages = 0;
-
- svc_rdma_unmap_dma(ctxt);
-
- switch (ctxt->wr_op) {
- case IB_WR_SEND:
- free_pages = 1;
- break;
+ if (wc->status != IB_WC_SUCCESS)
+ goto err;

- case IB_WR_RDMA_WRITE:
- break;
+out:
+ atomic_dec(&xprt->sc_sq_count);
+ wake_up(&xprt->sc_send_wait);
+ return;

- case IB_WR_RDMA_READ:
- case IB_WR_RDMA_READ_WITH_INV:
- svc_rdma_put_frmr(xprt, ctxt->frmr);
+err:
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_warn("svcrdma: Send completion: %s (%u, vendor %u)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ goto out;
+}

- if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags))
- break;
+static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct svcxprt_rdma *xprt = cq->cq_context;

- read_hdr = ctxt->read_hdr;
- svc_rdma_put_context(ctxt, 0);
+ svc_rdma_send_wc_common(xprt, wc);
+ svc_xprt_put(&xprt->sc_xprt);
+}

- spin_lock_bh(&xprt->sc_rq_dto_lock);
- set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- list_add_tail(&read_hdr->dto_q,
- &xprt->sc_read_complete_q);
- spin_unlock_bh(&xprt->sc_rq_dto_lock);
- svc_xprt_enqueue(&xprt->sc_xprt);
- return;
+/**
+ * svc_rdma_send_wc - Invoked by RDMA provider for each polled Send WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_send_wc(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;

- default:
- dprintk("svcrdma: unexpected completion opcode=%d\n",
- ctxt->wr_op);
- break;
- }
+ svc_rdma_send_wc_common_put(cq, wc);

- svc_rdma_put_context(ctxt, free_pages);
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
}

-/*
- * Send Queue Completion Handler - potentially called on interrupt context.
+/**
+ * svc_rdma_write_wc - Invoked by RDMA provider for each polled Write WC
+ * @cq: completion queue
+ * @wc: completed WR
*
- * Note that caller must hold a transport reference.
*/
-static void sq_cq_reap(struct svcxprt_rdma *xprt)
+void svc_rdma_write_wc(struct ib_cq *cq, struct ib_wc *wc)
{
- struct svc_rdma_op_ctxt *ctxt = NULL;
- struct ib_wc wc_a[6];
- struct ib_wc *wc;
- struct ib_cq *cq = xprt->sc_sq_cq;
- int ret;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;

- memset(wc_a, 0, sizeof(wc_a));
+ svc_rdma_send_wc_common_put(cq, wc);

- if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
- return;
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 0);
+}

- ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
- atomic_inc(&rdma_stat_sq_poll);
- while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
- int i;
+/**
+ * svc_rdma_reg_wc - Invoked by RDMA provider for each polled FASTREG WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_reg_wc(struct ib_cq *cq, struct ib_wc *wc)
+{
+ svc_rdma_send_wc_common_put(cq, wc);
+}

- for (i = 0; i < ret; i++) {
- wc = &wc_a[i];
- if (wc->status != IB_WC_SUCCESS) {
- dprintk("svcrdma: sq wc err status %s (%d)\n",
- ib_wc_status_msg(wc->status),
- wc->status);
+/**
+ * svc_rdma_read_wc - Invoked by RDMA provider for each polled Read WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_read_wc(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct svcxprt_rdma *xprt = cq->cq_context;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_op_ctxt *ctxt;

- /* Close the transport */
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- }
+ svc_rdma_send_wc_common(xprt, wc);

- /* Decrement used SQ WR count */
- atomic_dec(&xprt->sc_sq_count);
- wake_up(&xprt->sc_send_wait);
+ ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_frmr(xprt, ctxt->frmr);

- ctxt = (struct svc_rdma_op_ctxt *)
- (unsigned long)wc->wr_id;
- if (ctxt)
- process_context(xprt, ctxt);
+ if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+ struct svc_rdma_op_ctxt *read_hdr;

- svc_xprt_put(&xprt->sc_xprt);
- }
+ read_hdr = ctxt->read_hdr;
+ spin_lock(&xprt->sc_rq_dto_lock);
+ list_add_tail(&read_hdr->dto_q,
+ &xprt->sc_read_complete_q);
+ spin_unlock(&xprt->sc_rq_dto_lock);
+
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&xprt->sc_xprt);
}

- if (ctxt)
- atomic_inc(&rdma_stat_sq_prod);
+ svc_rdma_put_context(ctxt, 0);
+ svc_xprt_put(&xprt->sc_xprt);
}

-static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+/**
+ * svc_rdma_inv_wc - Invoked by RDMA provider for each polled LOCAL_INV WC
+ * @cq: completion queue
+ * @wc: completed WR
+ *
+ */
+void svc_rdma_inv_wc(struct ib_cq *cq, struct ib_wc *wc)
{
- struct svcxprt_rdma *xprt = cq_context;
- unsigned long flags;
-
- /* Guard against unconditional flush call for destroyed QP */
- if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
- return;
-
- /*
- * Set the bit regardless of whether or not it's on the list
- * because it may be on the list already due to an RQ
- * completion.
- */
- set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
-
- /*
- * If this transport is not already on the DTO transport queue,
- * add it
- */
- spin_lock_irqsave(&dto_lock, flags);
- if (list_empty(&xprt->sc_dto_q)) {
- svc_xprt_get(&xprt->sc_xprt);
- list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
- }
- spin_unlock_irqrestore(&dto_lock, flags);
-
- /* Tasklet does all the work to avoid irqsave locks. */
- tasklet_schedule(&dto_tasklet);
+ svc_rdma_send_wc_common_put(cq, wc);
}

static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -1011,7 +960,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
struct svcxprt_rdma *listen_rdma;
struct svcxprt_rdma *newxprt = NULL;
struct rdma_conn_param conn_param;
- struct ib_cq_init_attr cq_attr = {};
struct ib_qp_init_attr qp_attr;
struct ib_device *dev;
unsigned int i;
@@ -1069,12 +1017,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dprintk("svcrdma: error creating PD for connect request\n");
goto errout;
}
- cq_attr.cqe = newxprt->sc_sq_depth;
- newxprt->sc_sq_cq = ib_create_cq(dev,
- sq_comp_handler,
- cq_event_handler,
- newxprt,
- &cq_attr);
+ newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(newxprt->sc_sq_cq)) {
dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout;
@@ -1171,12 +1115,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
/* Swap out the handler */
newxprt->sc_cm_id->event_handler = rdma_cma_handler;

- /*
- * Arm the CQs for the SQ and RQ before accepting so we can't
- * miss the first message
- */
- ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-
/* Accept Connection */
set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
memset(&conn_param, 0, sizeof conn_param);
@@ -1316,7 +1254,7 @@ static void __svc_rdma_free(struct work_struct *work)
ib_destroy_qp(rdma->sc_qp);

if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
- ib_destroy_cq(rdma->sc_sq_cq);
+ ib_free_cq(rdma->sc_sq_cq);

if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
ib_free_cq(rdma->sc_rq_cq);
@@ -1380,9 +1318,6 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
spin_unlock_bh(&xprt->sc_lock);
atomic_inc(&rdma_stat_sq_starve);

- /* See if we can opportunistically reap SQ WR to make room */
- sq_cq_reap(xprt);
-
/* Wait until SQ WR available if SQ still full */
wait_event(xprt->sc_send_wait,
atomic_read(&xprt->sc_sq_count) <


2016-02-03 15:52:21

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 07/10] svcrdma: Hook up the logic to return ERR_CHUNK

RFC 5666 Section 4.2 states:

> When the peer detects an RPC-over-RDMA header version that it does
> not support (currently this document defines only version 1), it
> replies with an error code of ERR_VERS, and provides the low and
> high inclusive version numbers it does, in fact, support.

And:

> When other decoding errors are detected in the header or chunks,
> either an RPC decode error MAY be returned or the RPC/RDMA error
> code ERR_CHUNK MUST be returned.

The Linux NFS server does throw ERR_VERS when a client sends it
a request whose rdma_version is not "one." But it does not return
ERR_CHUNK when a header decoding error occurs. It just drops the
request.

To improve protocol extensibility, it should reject invalid values
in the rdma_proc field instead of treating them all like RDMA_MSG.
Otherwise clients can't detect when the server doesn't support
new rdma_proc values.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 55 ++++++++++++++++++++++++-------
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 4 ++
2 files changed, 46 insertions(+), 13 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index c011b12..7df4f07 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -148,22 +148,41 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
{
__be32 *va, *vaend;
+ unsigned int len;
u32 hdr_len;

/* Verify that there's enough bytes for header + something */
- if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+ if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_ERR) {
dprintk("svcrdma: header too short = %d\n",
rqstp->rq_arg.len);
return -EINVAL;
}

- if (rmsgp->rm_vers != rpcrdma_version)
+ if (rmsgp->rm_vers != rpcrdma_version) {
+ dprintk("%s: bad version %u\n", __func__,
+ be32_to_cpu(rmsgp->rm_vers));
return -ENOSYS;
+ }

- /* Pull in the extra for the padded case and bump our pointer */
- if (rmsgp->rm_type == rdma_msgp) {
- int hdrlen;
-
+ switch (be32_to_cpu(rmsgp->rm_type)) {
+ case RDMA_MSG:
+ case RDMA_NOMSG:
+ break;
+
+ case RDMA_DONE:
+ /* Just drop it */
+ dprintk("svcrdma: dropping RDMA_DONE message\n");
+ return 0;
+
+ case RDMA_ERROR:
+ /* Possible if this is a backchannel reply.
+ * XXX: We should cancel this XID, though.
+ */
+ dprintk("svcrdma: dropping RDMA_ERROR message\n");
+ return 0;
+
+ case RDMA_MSGP:
+ /* Pull in the extra for the padded case, bump our pointer */
rmsgp->rm_body.rm_padded.rm_align =
be32_to_cpu(rmsgp->rm_body.rm_padded.rm_align);
rmsgp->rm_body.rm_padded.rm_thresh =
@@ -171,11 +190,15 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)

va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
rqstp->rq_arg.head[0].iov_base = va;
- hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
- rqstp->rq_arg.head[0].iov_len -= hdrlen;
- if (hdrlen > rqstp->rq_arg.len)
+ len = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= len;
+ if (len > rqstp->rq_arg.len)
return -EINVAL;
- return hdrlen;
+ return len;
+ default:
+ dprintk("svcrdma: bad rdma procedure (%u)\n",
+ be32_to_cpu(rmsgp->rm_type));
+ return -EINVAL;
}

/* The chunk list may contain either a read chunk list or a write
@@ -184,14 +207,20 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
va = &rmsgp->rm_body.rm_chunks[0];
vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
va = decode_read_list(va, vaend);
- if (!va)
+ if (!va) {
+ dprintk("svcrdma: failed to decode read list\n");
return -EINVAL;
+ }
va = decode_write_list(va, vaend);
- if (!va)
+ if (!va) {
+ dprintk("svcrdma: failed to decode write list\n");
return -EINVAL;
+ }
va = decode_reply_array(va, vaend);
- if (!va)
+ if (!va) {
+ dprintk("svcrdma: failed to decode reply chunk\n");
return -EINVAL;
+ }

rqstp->rq_arg.head[0].iov_base = va;
hdr_len = (unsigned long)va - (unsigned long)rmsgp;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 8f68cb6..f8b840b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -657,6 +657,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
ret = svc_rdma_xdr_decode_req(rmsgp, rqstp);
if (ret < 0)
goto out_err;
+ if (ret == 0)
+ goto out_drop;
rqstp->rq_xprt_hlen = ret;

if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
@@ -710,6 +712,8 @@ out_err:
defer:
return 0;

+out_drop:
+ svc_rdma_put_context(ctxt, 1);
repost:
return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
}


2016-02-03 15:51:48

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 03/10] svcrdma: svc_rdma_post_recv() should close connection on error

Clean up: Most svc_rdma_post_recv() call sites close the transport
connection when a receive cannot be posted. Wrap that in a common
helper.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 1 +
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 11 ++---------
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 10 +---------
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 7 +------
net/sunrpc/xprtrdma/svc_rdma_transport.c | 15 +++++++++++++++
5 files changed, 20 insertions(+), 24 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 5322fea..d8fc58d 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -234,6 +234,7 @@ extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
enum rpcrdma_errcode);
extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
+extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 65a7c23..4498eaf 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -111,16 +111,9 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
if (ret)
goto out_err;

- /* Post a recv buffer to handle the reply for this request. */
- ret = svc_rdma_post_recv(rdma, GFP_NOIO);
- if (ret) {
- pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n",
- ret);
- pr_err("svcrdma: closing transport %p.\n", rdma);
- set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- ret = -ENOTCONN;
+ ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
+ if (ret)
goto out_err;
- }

ctxt = svc_rdma_get_context(rdma);
ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index c8b8a8b..acf15b8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -711,13 +711,5 @@ defer:
return 0;

repost:
- ret = svc_rdma_post_recv(rdma_xprt, GFP_KERNEL);
- if (ret) {
- pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
- ret);
- pr_err("svcrdma: closing transport %p.\n", rdma_xprt);
- set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags);
- ret = -ENOTCONN;
- }
- return ret;
+ return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 8591314..687a91afe 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -464,13 +464,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
int pages;
int ret;

- /* Post a recv buffer to handle another request. */
- ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
+ ret = svc_rdma_repost_recv(rdma, GFP_KERNEL);
if (ret) {
- printk(KERN_INFO
- "svcrdma: could not post a receive buffer, err=%d."
- "Closing transport %p.\n", ret, rdma);
- set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_rdma_put_context(ctxt, 0);
return -ENOTCONN;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 02eee12..da82e36 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -722,6 +722,21 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
return -ENOMEM;
}

+int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
+{
+ int ret = 0;
+
+ ret = svc_rdma_post_recv(xprt, flags);
+ if (ret) {
+ pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
+ ret);
+ pr_err("svcrdma: closing transport %p.\n", xprt);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ ret = -ENOTCONN;
+ }
+ return ret;
+}
+
/*
* This function handles the CONNECT_REQUEST event on a listening
* endpoint. It is passed the cma_id for the _new_ connection. The context in


2016-02-05 10:17:55

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 03/10] svcrdma: svc_rdma_post_recv() should close connection on error

Looks good.

On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
> Clean up: Most svc_rdma_post_recv() call sites close the transport
> connection when a receive cannot be posted. Wrap that in a common
> helper.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/svc_rdma.h | 1 +
> net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 11 ++---------
> net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 10 +---------
> net/sunrpc/xprtrdma/svc_rdma_sendto.c | 7 +------
> net/sunrpc/xprtrdma/svc_rdma_transport.c | 15 +++++++++++++++
> 5 files changed, 20 insertions(+), 24 deletions(-)
>
> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
> index 5322fea..d8fc58d 100644
> --- a/include/linux/sunrpc/svc_rdma.h
> +++ b/include/linux/sunrpc/svc_rdma.h
> @@ -234,6 +234,7 @@ extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
> extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
> enum rpcrdma_errcode);
> extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
> +extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
> extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
> extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
> extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> index 65a7c23..4498eaf 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> @@ -111,16 +111,9 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
> if (ret)
> goto out_err;
>
> - /* Post a recv buffer to handle the reply for this request. */
> - ret = svc_rdma_post_recv(rdma, GFP_NOIO);
> - if (ret) {
> - pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n",
> - ret);
> - pr_err("svcrdma: closing transport %p.\n", rdma);
> - set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
> - ret = -ENOTCONN;
> + ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
> + if (ret)
> goto out_err;
> - }
>
> ctxt = svc_rdma_get_context(rdma);
> ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> index c8b8a8b..acf15b8 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> @@ -711,13 +711,5 @@ defer:
> return 0;
>
> repost:
> - ret = svc_rdma_post_recv(rdma_xprt, GFP_KERNEL);
> - if (ret) {
> - pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
> - ret);
> - pr_err("svcrdma: closing transport %p.\n", rdma_xprt);
> - set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags);
> - ret = -ENOTCONN;
> - }
> - return ret;
> + return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
> }
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> index 8591314..687a91afe 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> @@ -464,13 +464,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
> int pages;
> int ret;
>
> - /* Post a recv buffer to handle another request. */
> - ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
> + ret = svc_rdma_repost_recv(rdma, GFP_KERNEL);
> if (ret) {
> - printk(KERN_INFO
> - "svcrdma: could not post a receive buffer, err=%d."
> - "Closing transport %p.\n", ret, rdma);
> - set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
> svc_rdma_put_context(ctxt, 0);
> return -ENOTCONN;
> }
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> index 02eee12..da82e36 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> @@ -722,6 +722,21 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
> return -ENOMEM;
> }
>
> +int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
> +{
> + int ret = 0;
> +
> + ret = svc_rdma_post_recv(xprt, flags);
> + if (ret) {
> + pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
> + ret);
> + pr_err("svcrdma: closing transport %p.\n", xprt);
> + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> + ret = -ENOTCONN;
> + }
> + return ret;
> +}
> +
> /*
> * This function handles the CONNECT_REQUEST event on a listening
> * endpoint. It is passed the cma_id for the _new_ connection. The context in
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:19:24

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 01/10] svcrdma: Do not send XDR roundup bytes for a write chunk

Looks Good to me.

On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
> When the Linux server writes an odd-length data item into a Write
> chunk, it finishes with XDR pad bytes. If the data item is smaller
> than the Write chunk, the pad bytes are written at the end of the
> data item, but still inside the chunk. That can expose these zero
> bytes to the RPC consumer on the client.
>
> XDR pad bytes are inserted in order to preserve the XDR alignment
> of the next XDR data item in an XDR stream. But Write chunks do not
> appear in the payload XDR stream, and only one data item is allowed
> in each chunk. So XDR padding is unneeded.
>
> Thus the server should not write XDR pad bytes in Write chunks.
>
> I believe this is not an operational problem. Short NFS READs that
> are returned in a Write chunk would be affected by this issue. But
> they happen only when the read request goes past the EOF. Those are
> zero bytes anyway, and there's no file data in the client's buffer
> past EOF.
>
> Otherwise, current NFS clients provide a separate extra segment for
> catching XDR padding. If an odd-size data item fills the chunk,
> then the XDR pad will be written to the extra segment.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/svc_rdma_sendto.c | 2 +-
> 1 file changed, 1 insertion(+), 1 deletion(-)
>
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> index df57f3c..8591314 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> @@ -308,7 +308,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
> struct svc_rqst *rqstp,
> struct svc_rdma_req_map *vec)
> {
> - u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
> + u32 xfer_len = rqstp->rq_res.page_len;
> int write_len;
> u32 xdr_off;
> int chunk_off;
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:22:26

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 02/10] svcrdma: Make svc_rdma_get_frmr() not sleep

As I understand, you are pre-allocating because alloc_mr() can sleep
so separate it out while map-frmr-wqe would be non-blocking context by
definition, we are on the same page?


On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
> Want to call it in a context that cannot sleep. So pre-alloc
> the memory and the MRs.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/svc_rdma_transport.c | 44 ++++++++++++++++++++++++++----
> 1 file changed, 38 insertions(+), 6 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> index 5763825..02eee12 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> @@ -949,7 +949,7 @@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
> return ERR_PTR(-ENOMEM);
> }
>
> -static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
> +static void svc_rdma_destroy_frmrs(struct svcxprt_rdma *xprt)
> {
> struct svc_rdma_fastreg_mr *frmr;
>
> @@ -963,6 +963,37 @@ static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
> }
> }
>
> +static bool svc_rdma_prealloc_frmrs(struct svcxprt_rdma *xprt)
> +{
> + struct ib_device *dev = xprt->sc_cm_id->device;
> + unsigned int i;
> +
> + /* Pre-allocate based on the maximum amount of payload
> + * the server's HCA can handle per RDMA Read, to keep
> + * the number of MRs per connection in check.
> + *
> + * If a client sends small Read chunks (eg. it may be
> + * using physical registration), more RDMA Reads per
> + * NFS WRITE will be needed. svc_rdma_get_frmr() dips
> + * into its reserve in that case. Better would be for
> + * the server to reduce the connection's credit limit.
> + */
> + i = 1 + RPCSVC_MAXPAGES / dev->attrs.max_fast_reg_page_list_len;
> + i *= xprt->sc_max_requests;
> +
> + while (i--) {
> + struct svc_rdma_fastreg_mr *frmr;
> +
> + frmr = rdma_alloc_frmr(xprt);
> + if (!frmr) {
> + dprintk("svcrdma: No memory for request map\n");
> + return false;
> + }
> + list_add(&frmr->frmr_list, &xprt->sc_frmr_q);
> + }
> + return true;
> +}
> +
> struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
> {
> struct svc_rdma_fastreg_mr *frmr = NULL;
> @@ -975,10 +1006,9 @@ struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
> frmr->sg_nents = 0;
> }
> spin_unlock_bh(&rdma->sc_frmr_q_lock);
> - if (frmr)
> - return frmr;
> -
> - return rdma_alloc_frmr(rdma);
> + if (!frmr)
> + return ERR_PTR(-ENOMEM);
> + return frmr;
> }
>
> void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
> @@ -1149,6 +1179,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
> dev->attrs.max_fast_reg_page_list_len;
> newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
> newxprt->sc_reader = rdma_read_chunk_frmr;
> + if (!svc_rdma_prealloc_frmrs(newxprt))
> + goto errout;
> }
>
> /*
> @@ -1310,7 +1342,7 @@ static void __svc_rdma_free(struct work_struct *work)
> xprt->xpt_bc_xprt = NULL;
> }
>
> - rdma_dealloc_frmr_q(rdma);
> + svc_rdma_destroy_frmrs(rdma);
> svc_rdma_destroy_ctxts(rdma);
> svc_rdma_destroy_maps(rdma);
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:24:12

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 04/10] rpcrdma: Add missing XDR union fields for RDMA errors

On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
> Add some infrastructure (shared between xprtrdma and svcrdma) for
> constructing and parsing RPC-over-RDMA error status information.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/rpc_rdma.h | 13 ++++++++++++-
> 1 file changed, 12 insertions(+), 1 deletion(-)
>
> diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
> index f33c5a4..a5a36c0 100644
> --- a/include/linux/sunrpc/rpc_rdma.h
> +++ b/include/linux/sunrpc/rpc_rdma.h
> @@ -93,6 +93,12 @@ struct rpcrdma_msg {
> __be32 rm_pempty[3]; /* 3 empty chunk lists */
> } rm_padded;
>
> + struct {
> + __be32 rm_err;
> + __be32 rm_vers_low;
> + __be32 rm_vers_hi;
> + } rm_error;
> +

I don't see above members being used or filled with a value, I am
missing the link?

> __be32 rm_chunks[0]; /* read, write and reply chunks */
>
> } rm_body;
> @@ -102,12 +108,16 @@ struct rpcrdma_msg {
> * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
> */
> #define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7)
> +#define RPCRDMA_HDRLEN_ERR (sizeof(__be32) * 5)
>
> enum rpcrdma_errcode {
> ERR_VERS = 1,
> ERR_CHUNK = 2
> };
>
> +#define err_vers cpu_to_be32(ERR_VERS)
> +#define err_chunk cpu_to_be32(ERR_CHUNK)
> +
> struct rpcrdma_err_vers {
> uint32_t rdma_vers_low; /* Version range supported by peer */
> uint32_t rdma_vers_high;
> @@ -118,7 +128,8 @@ enum rpcrdma_proc {
> RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
> RDMA_MSGP = 2, /* An RPC call or reply msg with padding */
> RDMA_DONE = 3, /* Client signals reply completion */
> - RDMA_ERROR = 4 /* An RPC RDMA encoding error */
> + RDMA_ERROR = 4, /* An RPC RDMA encoding error */
> + RDMA_LAST
> };
>
> #define rdma_msg cpu_to_be32(RDMA_MSG)
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:35:56

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 05/10] svcrdma: Make RDMA_ERROR messages work

Looks good

On Wed, Feb 3, 2016 at 9:22 PM, Chuck Lever <[email protected]> wrote:
> Fix several issues with svc_rdma_send_error():
>
> - Post a receive buffer to replace the one that was consumed by
> the incoming request
> - Posting a send should use DMA_TO_DEVICE, not DMA_FROM_DEVICE
> - No need to put_page _and_ free pages in svc_rdma_put_context
> - Make sure the sge is set up completely in case the error
> path goes through svc_rdma_unmap_dma()
>
> Related fixes in svc_rdma_recvfrom():
>
> - Don't leak the ctxt associated with the incoming request
> - Don't close the connection after sending an error reply
> - Let svc_rdma_send_error() figure out the right header error code
>
> As a last clean up, move svc_rdma_send_error() to svc_rdma_sendto.c
> with other similar functions. There is some common logic in these
> functions that could someday be combined to reduce code duplication.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/svc_rdma.h | 4 +-
> net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 19 ++++-----
> net/sunrpc/xprtrdma/svc_rdma_sendto.c | 62 ++++++++++++++++++++++++++++++
> net/sunrpc/xprtrdma/svc_rdma_transport.c | 54 --------------------------
> 4 files changed, 73 insertions(+), 66 deletions(-)
>
> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
> index d8fc58d..0589918 100644
> --- a/include/linux/sunrpc/svc_rdma.h
> +++ b/include/linux/sunrpc/svc_rdma.h
> @@ -228,11 +228,11 @@ extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *,
> extern int svc_rdma_sendto(struct svc_rqst *);
> extern struct rpcrdma_read_chunk *
> svc_rdma_get_read_chunk(struct rpcrdma_msg *);
> +extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
> + int);
>
> /* svc_rdma_transport.c */
> extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
> -extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
> - enum rpcrdma_errcode);
> extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
> extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
> extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> index acf15b8..0f09052 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> @@ -612,7 +612,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> struct svc_rdma_op_ctxt *ctxt = NULL;
> struct rpcrdma_msg *rmsgp;
> int ret = 0;
> - int len;
>
> dprintk("svcrdma: rqstp=%p\n", rqstp);
>
> @@ -654,15 +653,10 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
>
> /* Decode the RDMA header. */
> - len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
> - rqstp->rq_xprt_hlen = len;
> -
> - /* If the request is invalid, reply with an error */
> - if (len < 0) {
> - if (len == -ENOSYS)
> - svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
> - goto close_out;
> - }
> + ret = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
> + if (ret < 0)
> + goto out_err;
> + rqstp->rq_xprt_hlen = ret;
>
> if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
> ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
> @@ -698,6 +692,11 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> svc_xprt_copy_addrs(rqstp, xprt);
> return ret;
>
> +out_err:
> + svc_rdma_send_error(rdma_xprt, rmsgp, ret);
> + svc_rdma_put_context(ctxt, 0);
> + return 0;
> +
> close_out:
> if (ctxt)
> svc_rdma_put_context(ctxt, 1);
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> index 687a91afe..73793dd 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> @@ -639,3 +639,65 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
> svc_rdma_put_context(ctxt, 0);
> return ret;
> }
> +
> +void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
> + int status)
> +{
> + struct ib_send_wr err_wr;
> + struct page *p;
> + struct svc_rdma_op_ctxt *ctxt;
> + enum rpcrdma_errcode err;
> + __be32 *va;
> + int length;
> + int ret;
> +
> + ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
> + if (ret)
> + return;
> +
> + p = alloc_page(GFP_KERNEL);
> + if (!p)
> + return;
> + va = page_address(p);
> +
> + /* XDR encode an error reply */
> + err = ERR_CHUNK;
> + if (status == -ENOSYS)
> + err = ERR_VERS;
> + length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
> +
> + ctxt = svc_rdma_get_context(xprt);
> + ctxt->direction = DMA_TO_DEVICE;
> + ctxt->count = 1;
> + ctxt->pages[0] = p;
> +
> + /* Prepare SGE for local address */
> + ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
> + ctxt->sge[0].length = length;
> + ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
> + p, 0, length, DMA_TO_DEVICE);
> + if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
> + dprintk("svcrdma: Error mapping buffer for protocol error\n");
> + svc_rdma_put_context(ctxt, 1);
> + return;
> + }
> + atomic_inc(&xprt->sc_dma_used);
> +
> + /* Prepare SEND WR */
> + memset(&err_wr, 0, sizeof err_wr);
> + ctxt->wr_op = IB_WR_SEND;
> + err_wr.wr_id = (unsigned long)ctxt;
> + err_wr.sg_list = ctxt->sge;
> + err_wr.num_sge = 1;
> + err_wr.opcode = IB_WR_SEND;
> + err_wr.send_flags = IB_SEND_SIGNALED;
> +
> + /* Post It */
> + ret = svc_rdma_send(xprt, &err_wr);
> + if (ret) {
> + dprintk("svcrdma: Error %d posting send for protocol error\n",
> + ret);
> + svc_rdma_unmap_dma(ctxt);
> + svc_rdma_put_context(ctxt, 1);
> + }
> +}
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> index da82e36..e7bda1e 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> @@ -1465,57 +1465,3 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
> }
> return ret;
> }
> -
> -void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
> - enum rpcrdma_errcode err)
> -{
> - struct ib_send_wr err_wr;
> - struct page *p;
> - struct svc_rdma_op_ctxt *ctxt;
> - __be32 *va;
> - int length;
> - int ret;
> -
> - p = alloc_page(GFP_KERNEL);
> - if (!p)
> - return;
> - va = page_address(p);
> -
> - /* XDR encode error */
> - length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
> -
> - ctxt = svc_rdma_get_context(xprt);
> - ctxt->direction = DMA_FROM_DEVICE;
> - ctxt->count = 1;
> - ctxt->pages[0] = p;
> -
> - /* Prepare SGE for local address */
> - ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
> - p, 0, length, DMA_FROM_DEVICE);
> - if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
> - put_page(p);
> - svc_rdma_put_context(ctxt, 1);
> - return;
> - }
> - atomic_inc(&xprt->sc_dma_used);
> - ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
> - ctxt->sge[0].length = length;
> -
> - /* Prepare SEND WR */
> - memset(&err_wr, 0, sizeof err_wr);
> - ctxt->wr_op = IB_WR_SEND;
> - err_wr.wr_id = (unsigned long)ctxt;
> - err_wr.sg_list = ctxt->sge;
> - err_wr.num_sge = 1;
> - err_wr.opcode = IB_WR_SEND;
> - err_wr.send_flags = IB_SEND_SIGNALED;
> -
> - /* Post It */
> - ret = svc_rdma_send(xprt, &err_wr);
> - if (ret) {
> - dprintk("svcrdma: Error %d posting send for protocol error\n",
> - ret);
> - svc_rdma_unmap_dma(ctxt);
> - svc_rdma_put_context(ctxt, 1);
> - }
> -}
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:41:46

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 06/10] svcrdma: Use correct XID in error replies

Looks good.

On Wed, Feb 3, 2016 at 9:22 PM, Chuck Lever <[email protected]> wrote:
> When constructing an error reply, svc_rdma_xdr_encode_error()
> needs to view the client's request message so it can get the
> failing request's XID.
>
> svc_rdma_xdr_decode_req() is supposed to return a pointer to the
> client's request header. But if it fails to decode the client's
> message (and thus an error reply is needed) it does not return the
> pointer. The server then sends a bogus XID in the error reply.
>
> Instead, unconditionally generate the pointer to the client's header
> in svc_rdma_recvfrom(), and pass that pointer to both functions.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/svc_rdma.h | 2 +-
> net/sunrpc/xprtrdma/svc_rdma_marshal.c | 7 +------
> net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 3 ++-
> 3 files changed, 4 insertions(+), 8 deletions(-)
>
> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
> index 0589918..4ce7b74 100644
> --- a/include/linux/sunrpc/svc_rdma.h
> +++ b/include/linux/sunrpc/svc_rdma.h
> @@ -199,7 +199,7 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
> struct xdr_buf *rcvbuf);
>
> /* svc_rdma_marshal.c */
> -extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg **, struct svc_rqst *);
> +extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg *, struct svc_rqst *);
> extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
> struct rpcrdma_msg *,
> enum rpcrdma_errcode, __be32 *);
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
> index e2fca76..c011b12 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
> @@ -145,15 +145,11 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
> return (__be32 *)&ary->wc_array[nchunks];
> }
>
> -int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
> - struct svc_rqst *rqstp)
> +int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
> {
> - struct rpcrdma_msg *rmsgp = NULL;
> __be32 *va, *vaend;
> u32 hdr_len;
>
> - rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
> -
> /* Verify that there's enough bytes for header + something */
> if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
> dprintk("svcrdma: header too short = %d\n",
> @@ -201,7 +197,6 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
> hdr_len = (unsigned long)va - (unsigned long)rmsgp;
> rqstp->rq_arg.head[0].iov_len -= hdr_len;
>
> - *rdma_req = rmsgp;
> return hdr_len;
> }
>
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> index 0f09052..8f68cb6 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> @@ -653,7 +653,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
>
> /* Decode the RDMA header. */
> - ret = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
> + rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
> + ret = svc_rdma_xdr_decode_req(rmsgp, rqstp);
> if (ret < 0)
> goto out_err;
> rqstp->rq_xprt_hlen = ret;
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:42:24

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 08/10] svcrdma: Remove close_out exit path

Looks good.

On Wed, Feb 3, 2016 at 9:22 PM, Chuck Lever <[email protected]> wrote:
> Clean up: close_out is reached only when ctxt == NULL and XPT_CLOSE
> is already set.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 12 +-----------
> 1 file changed, 1 insertion(+), 11 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> index f8b840b..d3718e9 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> @@ -641,8 +641,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> * transport list
> */
> if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
> - goto close_out;
> -
> + goto defer;
> goto out;
> }
> dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
> @@ -700,15 +699,6 @@ out_err:
> svc_rdma_put_context(ctxt, 0);
> return 0;
>
> - close_out:
> - if (ctxt)
> - svc_rdma_put_context(ctxt, 1);
> - dprintk("svcrdma: transport %p is closing\n", xprt);
> - /*
> - * Set the close bit and enqueue it. svc_recv will see the
> - * close bit and call svc_xprt_delete
> - */
> - set_bit(XPT_CLOSE, &xprt->xpt_flags);
> defer:
> return 0;
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:46:05

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 07/10] svcrdma: Hook up the logic to return ERR_CHUNK

Looks good.

On Wed, Feb 3, 2016 at 9:22 PM, Chuck Lever <[email protected]> wrote:
> RFC 5666 Section 4.2 states:
>
>> When the peer detects an RPC-over-RDMA header version that it does
>> not support (currently this document defines only version 1), it
>> replies with an error code of ERR_VERS, and provides the low and
>> high inclusive version numbers it does, in fact, support.
>
> And:
>
>> When other decoding errors are detected in the header or chunks,
>> either an RPC decode error MAY be returned or the RPC/RDMA error
>> code ERR_CHUNK MUST be returned.
>
> The Linux NFS server does throw ERR_VERS when a client sends it
> a request whose rdma_version is not "one." But it does not return
> ERR_CHUNK when a header decoding error occurs. It just drops the
> request.
>
> To improve protocol extensibility, it should reject invalid values
> in the rdma_proc field instead of treating them all like RDMA_MSG.
> Otherwise clients can't detect when the server doesn't support
> new rdma_proc values.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/svc_rdma_marshal.c | 55 ++++++++++++++++++++++++-------
> net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 4 ++
> 2 files changed, 46 insertions(+), 13 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
> index c011b12..7df4f07 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
> @@ -148,22 +148,41 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
> int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
> {
> __be32 *va, *vaend;
> + unsigned int len;
> u32 hdr_len;
>
> /* Verify that there's enough bytes for header + something */
> - if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
> + if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_ERR) {
> dprintk("svcrdma: header too short = %d\n",
> rqstp->rq_arg.len);
> return -EINVAL;
> }
>
> - if (rmsgp->rm_vers != rpcrdma_version)
> + if (rmsgp->rm_vers != rpcrdma_version) {
> + dprintk("%s: bad version %u\n", __func__,
> + be32_to_cpu(rmsgp->rm_vers));
> return -ENOSYS;
> + }
>
> - /* Pull in the extra for the padded case and bump our pointer */
> - if (rmsgp->rm_type == rdma_msgp) {
> - int hdrlen;
> -
> + switch (be32_to_cpu(rmsgp->rm_type)) {
> + case RDMA_MSG:
> + case RDMA_NOMSG:
> + break;
> +
> + case RDMA_DONE:
> + /* Just drop it */
> + dprintk("svcrdma: dropping RDMA_DONE message\n");
> + return 0;
> +
> + case RDMA_ERROR:
> + /* Possible if this is a backchannel reply.
> + * XXX: We should cancel this XID, though.
> + */
> + dprintk("svcrdma: dropping RDMA_ERROR message\n");
> + return 0;
> +
> + case RDMA_MSGP:
> + /* Pull in the extra for the padded case, bump our pointer */
> rmsgp->rm_body.rm_padded.rm_align =
> be32_to_cpu(rmsgp->rm_body.rm_padded.rm_align);
> rmsgp->rm_body.rm_padded.rm_thresh =
> @@ -171,11 +190,15 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
>
> va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
> rqstp->rq_arg.head[0].iov_base = va;
> - hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
> - rqstp->rq_arg.head[0].iov_len -= hdrlen;
> - if (hdrlen > rqstp->rq_arg.len)
> + len = (u32)((unsigned long)va - (unsigned long)rmsgp);
> + rqstp->rq_arg.head[0].iov_len -= len;
> + if (len > rqstp->rq_arg.len)
> return -EINVAL;
> - return hdrlen;
> + return len;
> + default:
> + dprintk("svcrdma: bad rdma procedure (%u)\n",
> + be32_to_cpu(rmsgp->rm_type));
> + return -EINVAL;
> }
>
> /* The chunk list may contain either a read chunk list or a write
> @@ -184,14 +207,20 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
> va = &rmsgp->rm_body.rm_chunks[0];
> vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
> va = decode_read_list(va, vaend);
> - if (!va)
> + if (!va) {
> + dprintk("svcrdma: failed to decode read list\n");
> return -EINVAL;
> + }
> va = decode_write_list(va, vaend);
> - if (!va)
> + if (!va) {
> + dprintk("svcrdma: failed to decode write list\n");
> return -EINVAL;
> + }
> va = decode_reply_array(va, vaend);
> - if (!va)
> + if (!va) {
> + dprintk("svcrdma: failed to decode reply chunk\n");
> return -EINVAL;
> + }
>
> rqstp->rq_arg.head[0].iov_base = va;
> hdr_len = (unsigned long)va - (unsigned long)rmsgp;
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> index 8f68cb6..f8b840b 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> @@ -657,6 +657,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> ret = svc_rdma_xdr_decode_req(rmsgp, rqstp);
> if (ret < 0)
> goto out_err;
> + if (ret == 0)
> + goto out_drop;
> rqstp->rq_xprt_hlen = ret;
>
> if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
> @@ -710,6 +712,8 @@ out_err:
> defer:
> return 0;
>
> +out_drop:
> + svc_rdma_put_context(ctxt, 1);
> repost:
> return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
> }
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:51:49

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 09/10] svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs

Looks good

On Wed, Feb 3, 2016 at 9:22 PM, Chuck Lever <[email protected]> wrote:
> Calling ib_poll_cq() to sort through WCs during a completion is a
> common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
> ("IB: add a proper completion queue abstraction"), WC sorting can
> be handled by the IB core.
>
> By converting to this new API, svcrdma is made a better neighbor to
> other RDMA consumers, as it allows the core to schedule the delivery
> of completions more fairly amongst all active consumers.
>
> Receive completions no longer use the dto_tasklet. Each polled
> Receive WC is now handled individually in soft IRQ context.
>
> The server transport's rdma_stat_rq_poll and rdma_stat_rq_prod
> metrics are no longer updated.
>
> As far as I can tell, ib_alloc_cq() does not enable a consumer to
> specify a callback for handling async CQ events. This may cause
> a problem with handling spurious connection loss, though I think
> Receive completion flushing should be enough to force the server
> to clean up properly anyway.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/svc_rdma.h | 2
> net/sunrpc/xprtrdma/svc_rdma_transport.c | 130 +++++++++---------------------
> 2 files changed, 41 insertions(+), 91 deletions(-)
>
> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
> index 4ce7b74..7de9cbb 100644
> --- a/include/linux/sunrpc/svc_rdma.h
> +++ b/include/linux/sunrpc/svc_rdma.h
> @@ -75,6 +75,7 @@ struct svc_rdma_op_ctxt {
> struct svc_rdma_fastreg_mr *frmr;
> int hdr_count;
> struct xdr_buf arg;
> + struct ib_cqe cqe;
> struct list_head dto_q;
> enum ib_wr_opcode wr_op;
> enum ib_wc_status wc_status;
> @@ -174,7 +175,6 @@ struct svcxprt_rdma {
> struct work_struct sc_work;
> };
> /* sc_flags */
> -#define RDMAXPRT_RQ_PENDING 1
> #define RDMAXPRT_SQ_PENDING 2
> #define RDMAXPRT_CONN_PENDING 3
>
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> index e7bda1e..316df77 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> @@ -68,7 +68,6 @@ static void svc_rdma_detach(struct svc_xprt *xprt);
> static void svc_rdma_free(struct svc_xprt *xprt);
> static int svc_rdma_has_wspace(struct svc_xprt *xprt);
> static int svc_rdma_secure_port(struct svc_rqst *);
> -static void rq_cq_reap(struct svcxprt_rdma *xprt);
> static void sq_cq_reap(struct svcxprt_rdma *xprt);
>
> static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
> @@ -413,7 +412,6 @@ static void dto_tasklet_func(unsigned long data)
> list_del_init(&xprt->sc_dto_q);
> spin_unlock_irqrestore(&dto_lock, flags);
>
> - rq_cq_reap(xprt);
> sq_cq_reap(xprt);
>
> svc_xprt_put(&xprt->sc_xprt);
> @@ -422,93 +420,49 @@ static void dto_tasklet_func(unsigned long data)
> spin_unlock_irqrestore(&dto_lock, flags);
> }
>
> -/*
> - * Receive Queue Completion Handler
> - *
> - * Since an RQ completion handler is called on interrupt context, we
> - * need to defer the handling of the I/O to a tasklet
> - */
> -static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
> -{
> - struct svcxprt_rdma *xprt = cq_context;
> - unsigned long flags;
> -
> - /* Guard against unconditional flush call for destroyed QP */
> - if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
> - return;
> -
> - /*
> - * Set the bit regardless of whether or not it's on the list
> - * because it may be on the list already due to an SQ
> - * completion.
> - */
> - set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
> -
> - /*
> - * If this transport is not already on the DTO transport queue,
> - * add it
> - */
> - spin_lock_irqsave(&dto_lock, flags);
> - if (list_empty(&xprt->sc_dto_q)) {
> - svc_xprt_get(&xprt->sc_xprt);
> - list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
> - }
> - spin_unlock_irqrestore(&dto_lock, flags);
> -
> - /* Tasklet does all the work to avoid irqsave locks. */
> - tasklet_schedule(&dto_tasklet);
> -}
> -
> -/*
> - * rq_cq_reap - Process the RQ CQ.
> +/**
> + * svc_rdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
> + * @cq: completion queue
> + * @wc: completed WR
> *
> - * Take all completing WC off the CQE and enqueue the associated DTO
> - * context on the dto_q for the transport.
> - *
> - * Note that caller must hold a transport reference.
> */
> -static void rq_cq_reap(struct svcxprt_rdma *xprt)
> +static void svc_rdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
> {
> - int ret;
> - struct ib_wc wc;
> - struct svc_rdma_op_ctxt *ctxt = NULL;
> -
> - if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
> - return;
> + struct svcxprt_rdma *xprt = cq->cq_context;
> + struct ib_cqe *cqe = wc->wr_cqe;
> + struct svc_rdma_op_ctxt *ctxt;
>
> - ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
> - atomic_inc(&rdma_stat_rq_poll);
> + /* WARNING: Only wc->wr_cqe and wc->status *
> + * are reliable at this point. */
> + ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
> + ctxt->wc_status = wc->status;
> + svc_rdma_unmap_dma(ctxt);
>
> - while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
> - ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
> - ctxt->wc_status = wc.status;
> - ctxt->byte_len = wc.byte_len;
> - svc_rdma_unmap_dma(ctxt);
> - if (wc.status != IB_WC_SUCCESS) {
> - /* Close the transport */
> - dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
> - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> - svc_rdma_put_context(ctxt, 1);
> - svc_xprt_put(&xprt->sc_xprt);
> - continue;
> - }
> - spin_lock_bh(&xprt->sc_rq_dto_lock);
> - list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
> - spin_unlock_bh(&xprt->sc_rq_dto_lock);
> - svc_xprt_put(&xprt->sc_xprt);
> - }
> + if (wc->status != IB_WC_SUCCESS)
> + goto flushed;
>
> - if (ctxt)
> - atomic_inc(&rdma_stat_rq_prod);
> + /* All wc fields are now known to be valid */
> + ctxt->byte_len = wc->byte_len;
> + spin_lock(&xprt->sc_rq_dto_lock);
> + list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
> + spin_unlock(&xprt->sc_rq_dto_lock);
>
> set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
> - /*
> - * If data arrived before established event,
> - * don't enqueue. This defers RPC I/O until the
> - * RDMA connection is complete.
> - */
> - if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
> - svc_xprt_enqueue(&xprt->sc_xprt);
> + if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
> + goto out;
> + svc_xprt_enqueue(&xprt->sc_xprt);
> + goto out;
> +
> +flushed:
> + if (wc->status != IB_WC_WR_FLUSH_ERR)
> + pr_warn("svcrdma: Recv completion: %s (%u, vendor %u)\n",
> + ib_wc_status_msg(wc->status),
> + wc->status, wc->vendor_err);
> + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> + svc_rdma_put_context(ctxt, 1);
> +
> +out:
> + svc_xprt_put(&xprt->sc_xprt);
> }
>
> /*
> @@ -681,6 +635,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
> ctxt = svc_rdma_get_context(xprt);
> buflen = 0;
> ctxt->direction = DMA_FROM_DEVICE;
> + ctxt->cqe.done = svc_rdma_receive_wc;
> for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
> if (sge_no >= xprt->sc_max_sge) {
> pr_err("svcrdma: Too many sges (%d)\n", sge_no);
> @@ -705,7 +660,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
> recv_wr.next = NULL;
> recv_wr.sg_list = &ctxt->sge[0];
> recv_wr.num_sge = ctxt->count;
> - recv_wr.wr_id = (u64)(unsigned long)ctxt;
> + recv_wr.wr_cqe = &ctxt->cqe;
>
> svc_xprt_get(&xprt->sc_xprt);
> ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
> @@ -1124,12 +1079,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
> dprintk("svcrdma: error creating SQ CQ for connect request\n");
> goto errout;
> }
> - cq_attr.cqe = newxprt->sc_rq_depth;
> - newxprt->sc_rq_cq = ib_create_cq(dev,
> - rq_comp_handler,
> - cq_event_handler,
> - newxprt,
> - &cq_attr);
> + newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
> + 0, IB_POLL_SOFTIRQ);
> if (IS_ERR(newxprt->sc_rq_cq)) {
> dprintk("svcrdma: error creating RQ CQ for connect request\n");
> goto errout;
> @@ -1225,7 +1176,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
> * miss the first message
> */
> ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
> - ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
>
> /* Accept Connection */
> set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
> @@ -1369,7 +1319,7 @@ static void __svc_rdma_free(struct work_struct *work)
> ib_destroy_cq(rdma->sc_sq_cq);
>
> if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
> - ib_destroy_cq(rdma->sc_rq_cq);
> + ib_free_cq(rdma->sc_rq_cq);
>
> if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
> ib_dealloc_pd(rdma->sc_pd);
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:56:50

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 10/10] svcrdma: Use new CQ API for RPC-over-RDMA server send CQs

Looks good.

On Wed, Feb 3, 2016 at 9:22 PM, Chuck Lever <[email protected]> wrote:
> Calling ib_poll_cq() to sort through WCs during a completion is a
> common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
> ("IB: add a proper completion queue abstraction"), WC sorting can
> be handled by the IB core.
>
> By converting to this new API, svcrdma is made a better neighbor to
> other RDMA consumers, as it allows the core to schedule the delivery
> of completions more fairly amongst all active consumers.
>
> This new API also aims each completion at a function that is
> specific to the WR's opcode. Thus the ctxt->wr_op field and the
> switch in process_context is replaced by a set of methods that
> handle each completion type.
>
> The server's rdma_stat_sq_poll and rdma_stat_sq_prod metrics are no
> longer updated.
>
> As a clean up, the cq_event_handler, the dto_tasklet, and all
> associated locking is removed, as they are no longer referenced or
> used.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/svc_rdma.h | 9 +
> net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 4
> net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 15 +-
> net/sunrpc/xprtrdma/svc_rdma_sendto.c | 12 +
> net/sunrpc/xprtrdma/svc_rdma_transport.c | 257 ++++++++++------------------
> 5 files changed, 119 insertions(+), 178 deletions(-)
>
> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
> index 7de9cbb..8af9e3b 100644
> --- a/include/linux/sunrpc/svc_rdma.h
> +++ b/include/linux/sunrpc/svc_rdma.h
> @@ -76,8 +76,9 @@ struct svc_rdma_op_ctxt {
> int hdr_count;
> struct xdr_buf arg;
> struct ib_cqe cqe;
> + struct ib_cqe reg_cqe;
> + struct ib_cqe inv_cqe;
> struct list_head dto_q;
> - enum ib_wr_opcode wr_op;
> enum ib_wc_status wc_status;
> u32 byte_len;
> u32 position;
> @@ -175,7 +176,6 @@ struct svcxprt_rdma {
> struct work_struct sc_work;
> };
> /* sc_flags */
> -#define RDMAXPRT_SQ_PENDING 2
> #define RDMAXPRT_CONN_PENDING 3
>
> #define RPCRDMA_LISTEN_BACKLOG 10
> @@ -232,6 +232,11 @@ extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
> int);
>
> /* svc_rdma_transport.c */
> +extern void svc_rdma_send_wc(struct ib_cq *, struct ib_wc *);
> +extern void svc_rdma_write_wc(struct ib_cq *, struct ib_wc *);
> +extern void svc_rdma_reg_wc(struct ib_cq *, struct ib_wc *);
> +extern void svc_rdma_read_wc(struct ib_cq *, struct ib_wc *);
> +extern void svc_rdma_inv_wc(struct ib_cq *, struct ib_wc *);
> extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
> extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
> extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> index 4498eaf..512590a 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> @@ -119,7 +119,6 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
> ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
> ctxt->count = 1;
>
> - ctxt->wr_op = IB_WR_SEND;
> ctxt->direction = DMA_TO_DEVICE;
> ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
> ctxt->sge[0].length = sndbuf->len;
> @@ -133,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
> atomic_inc(&rdma->sc_dma_used);
>
> memset(&send_wr, 0, sizeof(send_wr));
> - send_wr.wr_id = (unsigned long)ctxt;
> + ctxt->cqe.done = svc_rdma_send_wc;
> + send_wr.wr_cqe = &ctxt->cqe;
> send_wr.sg_list = ctxt->sge;
> send_wr.num_sge = 1;
> send_wr.opcode = IB_WR_SEND;
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> index d3718e9..1a318fc 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> @@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
> clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
>
> memset(&read_wr, 0, sizeof(read_wr));
> - read_wr.wr.wr_id = (unsigned long)ctxt;
> + ctxt->cqe.done = svc_rdma_read_wc;
> + read_wr.wr.wr_cqe = &ctxt->cqe;
> read_wr.wr.opcode = IB_WR_RDMA_READ;
> - ctxt->wr_op = read_wr.wr.opcode;
> read_wr.wr.send_flags = IB_SEND_SIGNALED;
> read_wr.rkey = rs_handle;
> read_wr.remote_addr = rs_offset;
> @@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
> ctxt->read_hdr = head;
>
> /* Prepare REG WR */
> + ctxt->reg_cqe.done = svc_rdma_reg_wc;
> + reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
> reg_wr.wr.opcode = IB_WR_REG_MR;
> - reg_wr.wr.wr_id = 0;
> reg_wr.wr.send_flags = IB_SEND_SIGNALED;
> reg_wr.wr.num_sge = 0;
> reg_wr.mr = frmr->mr;
> @@ -310,26 +311,26 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
>
> /* Prepare RDMA_READ */
> memset(&read_wr, 0, sizeof(read_wr));
> + ctxt->cqe.done = svc_rdma_read_wc;
> + read_wr.wr.wr_cqe = &ctxt->cqe;
> read_wr.wr.send_flags = IB_SEND_SIGNALED;
> read_wr.rkey = rs_handle;
> read_wr.remote_addr = rs_offset;
> read_wr.wr.sg_list = ctxt->sge;
> read_wr.wr.num_sge = 1;
> if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
> - read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
> - read_wr.wr.wr_id = (unsigned long)ctxt;
> read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
> } else {
> read_wr.wr.opcode = IB_WR_RDMA_READ;
> read_wr.wr.next = &inv_wr;
> /* Prepare invalidate */
> memset(&inv_wr, 0, sizeof(inv_wr));
> - inv_wr.wr_id = (unsigned long)ctxt;
> + ctxt->inv_cqe.done = svc_rdma_inv_wc;
> + inv_wr.wr_cqe = &ctxt->inv_cqe;
> inv_wr.opcode = IB_WR_LOCAL_INV;
> inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
> inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
> }
> - ctxt->wr_op = read_wr.wr.opcode;
>
> /* Post the chain */
> ret = svc_rdma_send(xprt, &reg_wr.wr);
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> index 73793dd..9d21205 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> @@ -281,8 +281,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
>
> /* Prepare WRITE WR */
> memset(&write_wr, 0, sizeof write_wr);
> - ctxt->wr_op = IB_WR_RDMA_WRITE;
> - write_wr.wr.wr_id = (unsigned long)ctxt;
> + ctxt->cqe.done = svc_rdma_write_wc;
> + write_wr.wr.wr_cqe = &ctxt->cqe;
> write_wr.wr.sg_list = &sge[0];
> write_wr.wr.num_sge = sge_no;
> write_wr.wr.opcode = IB_WR_RDMA_WRITE;
> @@ -538,8 +538,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
> goto err;
> }
> memset(&send_wr, 0, sizeof send_wr);
> - ctxt->wr_op = IB_WR_SEND;
> - send_wr.wr_id = (unsigned long)ctxt;
> + ctxt->cqe.done = svc_rdma_send_wc;
> + send_wr.wr_cqe = &ctxt->cqe;
> send_wr.sg_list = ctxt->sge;
> send_wr.num_sge = sge_no;
> send_wr.opcode = IB_WR_SEND;
> @@ -685,8 +685,8 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
>
> /* Prepare SEND WR */
> memset(&err_wr, 0, sizeof err_wr);
> - ctxt->wr_op = IB_WR_SEND;
> - err_wr.wr_id = (unsigned long)ctxt;
> + ctxt->cqe.done = svc_rdma_send_wc;
> + err_wr.wr_cqe = &ctxt->cqe;
> err_wr.sg_list = ctxt->sge;
> err_wr.num_sge = 1;
> err_wr.opcode = IB_WR_SEND;
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> index 316df77..bab80f8 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> @@ -63,16 +63,10 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
> int flags);
> static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
> static void svc_rdma_release_rqst(struct svc_rqst *);
> -static void dto_tasklet_func(unsigned long data);
> static void svc_rdma_detach(struct svc_xprt *xprt);
> static void svc_rdma_free(struct svc_xprt *xprt);
> static int svc_rdma_has_wspace(struct svc_xprt *xprt);
> static int svc_rdma_secure_port(struct svc_rqst *);
> -static void sq_cq_reap(struct svcxprt_rdma *xprt);
> -
> -static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
> -static DEFINE_SPINLOCK(dto_lock);
> -static LIST_HEAD(dto_xprt_q);
>
> static struct svc_xprt_ops svc_rdma_ops = {
> .xpo_create = svc_rdma_create,
> @@ -351,15 +345,6 @@ static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
> }
> }
>
> -/* ib_cq event handler */
> -static void cq_event_handler(struct ib_event *event, void *context)
> -{
> - struct svc_xprt *xprt = context;
> - dprintk("svcrdma: received CQ event %s (%d), context=%p\n",
> - ib_event_msg(event->event), event->event, context);
> - set_bit(XPT_CLOSE, &xprt->xpt_flags);
> -}
> -
> /* QP event handler */
> static void qp_event_handler(struct ib_event *event, void *context)
> {
> @@ -391,35 +376,6 @@ static void qp_event_handler(struct ib_event *event, void *context)
> }
> }
>
> -/*
> - * Data Transfer Operation Tasklet
> - *
> - * Walks a list of transports with I/O pending, removing entries as
> - * they are added to the server's I/O pending list. Two bits indicate
> - * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
> - * spinlock that serializes access to the transport list with the RQ
> - * and SQ interrupt handlers.
> - */
> -static void dto_tasklet_func(unsigned long data)
> -{
> - struct svcxprt_rdma *xprt;
> - unsigned long flags;
> -
> - spin_lock_irqsave(&dto_lock, flags);
> - while (!list_empty(&dto_xprt_q)) {
> - xprt = list_entry(dto_xprt_q.next,
> - struct svcxprt_rdma, sc_dto_q);
> - list_del_init(&xprt->sc_dto_q);
> - spin_unlock_irqrestore(&dto_lock, flags);
> -
> - sq_cq_reap(xprt);
> -
> - svc_xprt_put(&xprt->sc_xprt);
> - spin_lock_irqsave(&dto_lock, flags);
> - }
> - spin_unlock_irqrestore(&dto_lock, flags);
> -}
> -
> /**
> * svc_rdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
> * @cq: completion queue
> @@ -465,132 +421,125 @@ out:
> svc_xprt_put(&xprt->sc_xprt);
> }
>
> -/*
> - * Process a completion context
> - */
> -static void process_context(struct svcxprt_rdma *xprt,
> - struct svc_rdma_op_ctxt *ctxt)
> +static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
> + struct ib_wc *wc)
> {
> - struct svc_rdma_op_ctxt *read_hdr;
> - int free_pages = 0;
> -
> - svc_rdma_unmap_dma(ctxt);
> -
> - switch (ctxt->wr_op) {
> - case IB_WR_SEND:
> - free_pages = 1;
> - break;
> + if (wc->status != IB_WC_SUCCESS)
> + goto err;
>
> - case IB_WR_RDMA_WRITE:
> - break;
> +out:
> + atomic_dec(&xprt->sc_sq_count);
> + wake_up(&xprt->sc_send_wait);
> + return;
>
> - case IB_WR_RDMA_READ:
> - case IB_WR_RDMA_READ_WITH_INV:
> - svc_rdma_put_frmr(xprt, ctxt->frmr);
> +err:
> + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> + if (wc->status != IB_WC_WR_FLUSH_ERR)
> + pr_warn("svcrdma: Send completion: %s (%u, vendor %u)\n",
> + ib_wc_status_msg(wc->status),
> + wc->status, wc->vendor_err);
> + goto out;
> +}
>
> - if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags))
> - break;
> +static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc)
> +{
> + struct svcxprt_rdma *xprt = cq->cq_context;
>
> - read_hdr = ctxt->read_hdr;
> - svc_rdma_put_context(ctxt, 0);
> + svc_rdma_send_wc_common(xprt, wc);
> + svc_xprt_put(&xprt->sc_xprt);
> +}
>
> - spin_lock_bh(&xprt->sc_rq_dto_lock);
> - set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
> - list_add_tail(&read_hdr->dto_q,
> - &xprt->sc_read_complete_q);
> - spin_unlock_bh(&xprt->sc_rq_dto_lock);
> - svc_xprt_enqueue(&xprt->sc_xprt);
> - return;
> +/**
> + * svc_rdma_send_wc - Invoked by RDMA provider for each polled Send WC
> + * @cq: completion queue
> + * @wc: completed WR
> + *
> + */
> +void svc_rdma_send_wc(struct ib_cq *cq, struct ib_wc *wc)
> +{
> + struct ib_cqe *cqe = wc->wr_cqe;
> + struct svc_rdma_op_ctxt *ctxt;
>
> - default:
> - dprintk("svcrdma: unexpected completion opcode=%d\n",
> - ctxt->wr_op);
> - break;
> - }
> + svc_rdma_send_wc_common_put(cq, wc);
>
> - svc_rdma_put_context(ctxt, free_pages);
> + ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
> + svc_rdma_unmap_dma(ctxt);
> + svc_rdma_put_context(ctxt, 1);
> }
>
> -/*
> - * Send Queue Completion Handler - potentially called on interrupt context.
> +/**
> + * svc_rdma_write_wc - Invoked by RDMA provider for each polled Write WC
> + * @cq: completion queue
> + * @wc: completed WR
> *
> - * Note that caller must hold a transport reference.
> */
> -static void sq_cq_reap(struct svcxprt_rdma *xprt)
> +void svc_rdma_write_wc(struct ib_cq *cq, struct ib_wc *wc)
> {
> - struct svc_rdma_op_ctxt *ctxt = NULL;
> - struct ib_wc wc_a[6];
> - struct ib_wc *wc;
> - struct ib_cq *cq = xprt->sc_sq_cq;
> - int ret;
> + struct ib_cqe *cqe = wc->wr_cqe;
> + struct svc_rdma_op_ctxt *ctxt;
>
> - memset(wc_a, 0, sizeof(wc_a));
> + svc_rdma_send_wc_common_put(cq, wc);
>
> - if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
> - return;
> + ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
> + svc_rdma_unmap_dma(ctxt);
> + svc_rdma_put_context(ctxt, 0);
> +}
>
> - ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
> - atomic_inc(&rdma_stat_sq_poll);
> - while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
> - int i;
> +/**
> + * svc_rdma_reg_wc - Invoked by RDMA provider for each polled FASTREG WC
> + * @cq: completion queue
> + * @wc: completed WR
> + *
> + */
> +void svc_rdma_reg_wc(struct ib_cq *cq, struct ib_wc *wc)
> +{
> + svc_rdma_send_wc_common_put(cq, wc);
> +}
>
> - for (i = 0; i < ret; i++) {
> - wc = &wc_a[i];
> - if (wc->status != IB_WC_SUCCESS) {
> - dprintk("svcrdma: sq wc err status %s (%d)\n",
> - ib_wc_status_msg(wc->status),
> - wc->status);
> +/**
> + * svc_rdma_read_wc - Invoked by RDMA provider for each polled Read WC
> + * @cq: completion queue
> + * @wc: completed WR
> + *
> + */
> +void svc_rdma_read_wc(struct ib_cq *cq, struct ib_wc *wc)
> +{
> + struct svcxprt_rdma *xprt = cq->cq_context;
> + struct ib_cqe *cqe = wc->wr_cqe;
> + struct svc_rdma_op_ctxt *ctxt;
>
> - /* Close the transport */
> - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> - }
> + svc_rdma_send_wc_common(xprt, wc);
>
> - /* Decrement used SQ WR count */
> - atomic_dec(&xprt->sc_sq_count);
> - wake_up(&xprt->sc_send_wait);
> + ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
> + svc_rdma_unmap_dma(ctxt);
> + svc_rdma_put_frmr(xprt, ctxt->frmr);
>
> - ctxt = (struct svc_rdma_op_ctxt *)
> - (unsigned long)wc->wr_id;
> - if (ctxt)
> - process_context(xprt, ctxt);
> + if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
> + struct svc_rdma_op_ctxt *read_hdr;
>
> - svc_xprt_put(&xprt->sc_xprt);
> - }
> + read_hdr = ctxt->read_hdr;
> + spin_lock(&xprt->sc_rq_dto_lock);
> + list_add_tail(&read_hdr->dto_q,
> + &xprt->sc_read_complete_q);
> + spin_unlock(&xprt->sc_rq_dto_lock);
> +
> + set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
> + svc_xprt_enqueue(&xprt->sc_xprt);
> }
>
> - if (ctxt)
> - atomic_inc(&rdma_stat_sq_prod);
> + svc_rdma_put_context(ctxt, 0);
> + svc_xprt_put(&xprt->sc_xprt);
> }
>
> -static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
> +/**
> + * svc_rdma_inv_wc - Invoked by RDMA provider for each polled LOCAL_INV WC
> + * @cq: completion queue
> + * @wc: completed WR
> + *
> + */
> +void svc_rdma_inv_wc(struct ib_cq *cq, struct ib_wc *wc)
> {
> - struct svcxprt_rdma *xprt = cq_context;
> - unsigned long flags;
> -
> - /* Guard against unconditional flush call for destroyed QP */
> - if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
> - return;
> -
> - /*
> - * Set the bit regardless of whether or not it's on the list
> - * because it may be on the list already due to an RQ
> - * completion.
> - */
> - set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
> -
> - /*
> - * If this transport is not already on the DTO transport queue,
> - * add it
> - */
> - spin_lock_irqsave(&dto_lock, flags);
> - if (list_empty(&xprt->sc_dto_q)) {
> - svc_xprt_get(&xprt->sc_xprt);
> - list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
> - }
> - spin_unlock_irqrestore(&dto_lock, flags);
> -
> - /* Tasklet does all the work to avoid irqsave locks. */
> - tasklet_schedule(&dto_tasklet);
> + svc_rdma_send_wc_common_put(cq, wc);
> }
>
> static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
> @@ -1011,7 +960,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
> struct svcxprt_rdma *listen_rdma;
> struct svcxprt_rdma *newxprt = NULL;
> struct rdma_conn_param conn_param;
> - struct ib_cq_init_attr cq_attr = {};
> struct ib_qp_init_attr qp_attr;
> struct ib_device *dev;
> unsigned int i;
> @@ -1069,12 +1017,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
> dprintk("svcrdma: error creating PD for connect request\n");
> goto errout;
> }
> - cq_attr.cqe = newxprt->sc_sq_depth;
> - newxprt->sc_sq_cq = ib_create_cq(dev,
> - sq_comp_handler,
> - cq_event_handler,
> - newxprt,
> - &cq_attr);
> + newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
> + 0, IB_POLL_SOFTIRQ);
> if (IS_ERR(newxprt->sc_sq_cq)) {
> dprintk("svcrdma: error creating SQ CQ for connect request\n");
> goto errout;
> @@ -1171,12 +1115,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
> /* Swap out the handler */
> newxprt->sc_cm_id->event_handler = rdma_cma_handler;
>
> - /*
> - * Arm the CQs for the SQ and RQ before accepting so we can't
> - * miss the first message
> - */
> - ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
> -
> /* Accept Connection */
> set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
> memset(&conn_param, 0, sizeof conn_param);
> @@ -1316,7 +1254,7 @@ static void __svc_rdma_free(struct work_struct *work)
> ib_destroy_qp(rdma->sc_qp);
>
> if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
> - ib_destroy_cq(rdma->sc_sq_cq);
> + ib_free_cq(rdma->sc_sq_cq);
>
> if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
> ib_free_cq(rdma->sc_rq_cq);
> @@ -1380,9 +1318,6 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
> spin_unlock_bh(&xprt->sc_lock);
> atomic_inc(&rdma_stat_sq_starve);
>
> - /* See if we can opportunistically reap SQ WR to make room */
> - sq_cq_reap(xprt);
> -
> /* Wait until SQ WR available if SQ still full */
> wait_event(xprt->sc_send_wait,
> atomic_read(&xprt->sc_sq_count) <
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 10:59:47

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 00/10] NFS/RDMA server patches for v4.6

While there are some bug fixes for ocrdma are on the way, I tested
this series with a modified ocrdma driver.
following tests passed:
iozone, cable-pull short duration, cable pull long duration (more than 5 mins).

Series Reviewed-By: Devesh Sharma <[email protected]>
Series Tested-By: Devesh Sharma <[email protected]>




On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
> For review.
>
> - Fix XDR round-up in write chunks (take 3)
> - Pre-allocate FRWRs when accepting a connection
> - Add proper handling of bad header XDR
> - Convert the server to use the new core CQ API added in v4.5.
>
> Also available in the "nfsd-rdma-for-4.6" topic branch of this git
> repo:
>
> git://git.linux-nfs.org/projects/cel/cel-2.6.git
>
> Or for browsing:
>
> http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfsd-rdma-for-4.6
>
> ---
>
> Chuck Lever (10):
> svcrdma: Do not send XDR roundup bytes for a write chunk
> svcrdma: Make svc_rdma_get_frmr() not sleep
> svcrdma: svc_rdma_post_recv() should close connection on error
> rpcrdma: Add missing XDR union fields for RDMA errors
> svcrdma: Make RDMA_ERROR messages work
> svcrdma: Use correct XID in error replies
> svcrdma: Hook up the logic to return ERR_CHUNK
> svcrdma: Remove close_out exit path
> svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs
> svcrdma: Use new CQ API for RPC-over-RDMA server send CQs
>
>
> include/linux/sunrpc/rpc_rdma.h | 13 +
> include/linux/sunrpc/svc_rdma.h | 18 +
> net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 15 -
> net/sunrpc/xprtrdma/svc_rdma_marshal.c | 62 ++--
> net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 61 +---
> net/sunrpc/xprtrdma/svc_rdma_sendto.c | 79 ++++-
> net/sunrpc/xprtrdma/svc_rdma_transport.c | 488 +++++++++++-----------------
> 7 files changed, 346 insertions(+), 390 deletions(-)
>
> --
> Chuck Lever
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-05 15:53:31

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH v1 04/10] rpcrdma: Add missing XDR union fields for RDMA errors


> On Feb 5, 2016, at 5:23 AM, Devesh Sharma <[email protected]> wrote:
>
> On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
>> Add some infrastructure (shared between xprtrdma and svcrdma) for
>> constructing and parsing RPC-over-RDMA error status information.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> include/linux/sunrpc/rpc_rdma.h | 13 ++++++++++++-
>> 1 file changed, 12 insertions(+), 1 deletion(-)
>>
>> diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
>> index f33c5a4..a5a36c0 100644
>> --- a/include/linux/sunrpc/rpc_rdma.h
>> +++ b/include/linux/sunrpc/rpc_rdma.h
>> @@ -93,6 +93,12 @@ struct rpcrdma_msg {
>> __be32 rm_pempty[3]; /* 3 empty chunk lists */
>> } rm_padded;
>>
>> + struct {
>> + __be32 rm_err;
>> + __be32 rm_vers_low;
>> + __be32 rm_vers_hi;
>> + } rm_error;
>> +
>
> I don't see above members being used or filled with a value, I am
> missing the link?

The patch adds items that are used in patches that will be
submitted separately to the NFS client maintainer and the
server maintainer for 4.6. It's the same patch to avoid merge
conflicts.

The server probably only needs RPCRDMA_HDRLEN_ERR. I can
split that into a separate patch that appears in both
series, and move the rest into a client-only patch.


>> __be32 rm_chunks[0]; /* read, write and reply chunks */
>>
>> } rm_body;
>> @@ -102,12 +108,16 @@ struct rpcrdma_msg {
>> * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
>> */
>> #define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7)
>> +#define RPCRDMA_HDRLEN_ERR (sizeof(__be32) * 5)
>>
>> enum rpcrdma_errcode {
>> ERR_VERS = 1,
>> ERR_CHUNK = 2
>> };
>>
>> +#define err_vers cpu_to_be32(ERR_VERS)
>> +#define err_chunk cpu_to_be32(ERR_CHUNK)
>> +
>> struct rpcrdma_err_vers {
>> uint32_t rdma_vers_low; /* Version range supported by peer */
>> uint32_t rdma_vers_high;
>> @@ -118,7 +128,8 @@ enum rpcrdma_proc {
>> RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
>> RDMA_MSGP = 2, /* An RPC call or reply msg with padding */
>> RDMA_DONE = 3, /* Client signals reply completion */
>> - RDMA_ERROR = 4 /* An RPC RDMA encoding error */
>> + RDMA_ERROR = 4, /* An RPC RDMA encoding error */
>> + RDMA_LAST
>> };
>>
>> #define rdma_msg cpu_to_be32(RDMA_MSG)
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever





2016-02-05 16:29:39

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH v1 02/10] svcrdma: Make svc_rdma_get_frmr() not sleep


> On Feb 5, 2016, at 5:15 AM, Devesh Sharma <[email protected]> wrote:
>
> As I understand, you are pre-allocating because alloc_mr() can sleep
> so separate it out while map-frmr-wqe would be non-blocking context by
> definition, we are on the same page?

alloc_mr() can sleep, and so can kmalloc(GFP_KERNEL).
Currently svc_rdma_get_frmr() is called in a worker thread,
so it's in a process context and can sleep, but if we were
to move the RDMA Read logic into a softIRQ context (say,
invoked during Receive processing), it would be a problem
to sleep.

Another goal is to get rid of memory allocation operations
that can fail in low memory situations in areas of the code
where an error is hard to handle.

So the patch description is misleading and incorrect.

However, the problem with pre-allocating frmrs is that it
is not possible for the server to determine how many will
be needed at accept time. The maximum number depends on the
largest possible ULP payload size, the page list depth of
the server's device, and the maximum chunk size the client
is using (it could be as small as 4KB for some registration
modes). That would mean allocating thousands per connection
up front to cover every contingency.

Perhaps a better solution would be to understand how to
deal with an frmr allocation failure in rdma_read_chunk_frmr().
I think just dropping the connection would be reasonable.
If ib_post_recv() fails, for example, this is exactly what
the server does.

Christoph says he is about to modify or replace the NFS
server's RDMA Read code paths, however.

I'll drop this one.


> On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
>> Want to call it in a context that cannot sleep. So pre-alloc
>> the memory and the MRs.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> net/sunrpc/xprtrdma/svc_rdma_transport.c | 44 ++++++++++++++++++++++++++----
>> 1 file changed, 38 insertions(+), 6 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>> index 5763825..02eee12 100644
>> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
>> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>> @@ -949,7 +949,7 @@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
>> return ERR_PTR(-ENOMEM);
>> }
>>
>> -static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
>> +static void svc_rdma_destroy_frmrs(struct svcxprt_rdma *xprt)
>> {
>> struct svc_rdma_fastreg_mr *frmr;
>>
>> @@ -963,6 +963,37 @@ static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
>> }
>> }
>>
>> +static bool svc_rdma_prealloc_frmrs(struct svcxprt_rdma *xprt)
>> +{
>> + struct ib_device *dev = xprt->sc_cm_id->device;
>> + unsigned int i;
>> +
>> + /* Pre-allocate based on the maximum amount of payload
>> + * the server's HCA can handle per RDMA Read, to keep
>> + * the number of MRs per connection in check.
>> + *
>> + * If a client sends small Read chunks (eg. it may be
>> + * using physical registration), more RDMA Reads per
>> + * NFS WRITE will be needed. svc_rdma_get_frmr() dips
>> + * into its reserve in that case. Better would be for
>> + * the server to reduce the connection's credit limit.
>> + */
>> + i = 1 + RPCSVC_MAXPAGES / dev->attrs.max_fast_reg_page_list_len;
>> + i *= xprt->sc_max_requests;
>> +
>> + while (i--) {
>> + struct svc_rdma_fastreg_mr *frmr;
>> +
>> + frmr = rdma_alloc_frmr(xprt);
>> + if (!frmr) {
>> + dprintk("svcrdma: No memory for request map\n");
>> + return false;
>> + }
>> + list_add(&frmr->frmr_list, &xprt->sc_frmr_q);
>> + }
>> + return true;
>> +}
>> +
>> struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
>> {
>> struct svc_rdma_fastreg_mr *frmr = NULL;
>> @@ -975,10 +1006,9 @@ struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
>> frmr->sg_nents = 0;
>> }
>> spin_unlock_bh(&rdma->sc_frmr_q_lock);
>> - if (frmr)
>> - return frmr;
>> -
>> - return rdma_alloc_frmr(rdma);
>> + if (!frmr)
>> + return ERR_PTR(-ENOMEM);
>> + return frmr;
>> }
>>
>> void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
>> @@ -1149,6 +1179,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
>> dev->attrs.max_fast_reg_page_list_len;
>> newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
>> newxprt->sc_reader = rdma_read_chunk_frmr;
>> + if (!svc_rdma_prealloc_frmrs(newxprt))
>> + goto errout;
>> }
>>
>> /*
>> @@ -1310,7 +1342,7 @@ static void __svc_rdma_free(struct work_struct *work)
>> xprt->xpt_bc_xprt = NULL;
>> }
>>
>> - rdma_dealloc_frmr_q(rdma);
>> + svc_rdma_destroy_frmrs(rdma);
>> svc_rdma_destroy_ctxts(rdma);
>> svc_rdma_destroy_maps(rdma);
>>

--
Chuck Lever





2016-02-05 17:49:57

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 02/10] svcrdma: Make svc_rdma_get_frmr() not sleep

On Fri, Feb 5, 2016 at 9:59 PM, Chuck Lever <[email protected]> wrote:
>
>> On Feb 5, 2016, at 5:15 AM, Devesh Sharma <[email protected]> wrote:
>>
>> As I understand, you are pre-allocating because alloc_mr() can sleep
>> so separate it out while map-frmr-wqe would be non-blocking context by
>> definition, we are on the same page?
>
> alloc_mr() can sleep, and so can kmalloc(GFP_KERNEL).
> Currently svc_rdma_get_frmr() is called in a worker thread,
> so it's in a process context and can sleep, but if we were
> to move the RDMA Read logic into a softIRQ context (say,
> invoked during Receive processing), it would be a problem
> to sleep.

Got it.

>
> Another goal is to get rid of memory allocation operations
> that can fail in low memory situations in areas of the code
> where an error is hard to handle.

>
> So the patch description is misleading and incorrect.

Got it

>
> However, the problem with pre-allocating frmrs is that it
> is not possible for the server to determine how many will
> be needed at accept time. The maximum number depends on the
> largest possible ULP payload size, the page list depth of
> the server's device, and the maximum chunk size the client
> is using (it could be as small as 4KB for some registration
> modes). That would mean allocating thousands per connection
> up front to cover every contingency.

Can't it decide on the fly? start with 'n' smaller size of frmrs
where 'n' is derived from some static calculation. Then if the
connection demands bigger size, increase the depth of all 'n' frmrs and
reuse them till the life of the connection. if it demands more frmr,
don't do anything
and try to manage with 'n'. Does it makes sense I am sure I am missing
few points here.

>
> Perhaps a better solution would be to understand how to
> deal with an frmr allocation failure in rdma_read_chunk_frmr().
> I think just dropping the connection would be reasonable.
> If ib_post_recv() fails, for example, this is exactly what
> the server does.

Dropping the connection seems logical, however, I think it would be
a harsh decision for any hypothetical device which has less IB/RoCE
resources e.g. any given VM in SRIOV enabled environment where lots
of VFs are enabled.

As suggested earlier in this mail, just continue the connection with whatever
resources it has would let low-resource-device still to use it. if
the system is recovered from memory-pressure, the allocations would
succeed eventually.
drawback is performance penalty.

>
> Christoph says he is about to modify or replace the NFS
> server's RDMA Read code paths, however.
>
> I'll drop this one.
>
>
>> On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
>>> Want to call it in a context that cannot sleep. So pre-alloc
>>> the memory and the MRs.
>>>
>>> Signed-off-by: Chuck Lever <[email protected]>
>>> ---
>>> net/sunrpc/xprtrdma/svc_rdma_transport.c | 44 ++++++++++++++++++++++++++----
>>> 1 file changed, 38 insertions(+), 6 deletions(-)
>>>
>>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>> index 5763825..02eee12 100644
>>> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>> @@ -949,7 +949,7 @@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
>>> return ERR_PTR(-ENOMEM);
>>> }
>>>
>>> -static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
>>> +static void svc_rdma_destroy_frmrs(struct svcxprt_rdma *xprt)
>>> {
>>> struct svc_rdma_fastreg_mr *frmr;
>>>
>>> @@ -963,6 +963,37 @@ static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
>>> }
>>> }
>>>
>>> +static bool svc_rdma_prealloc_frmrs(struct svcxprt_rdma *xprt)
>>> +{
>>> + struct ib_device *dev = xprt->sc_cm_id->device;
>>> + unsigned int i;
>>> +
>>> + /* Pre-allocate based on the maximum amount of payload
>>> + * the server's HCA can handle per RDMA Read, to keep
>>> + * the number of MRs per connection in check.
>>> + *
>>> + * If a client sends small Read chunks (eg. it may be
>>> + * using physical registration), more RDMA Reads per
>>> + * NFS WRITE will be needed. svc_rdma_get_frmr() dips
>>> + * into its reserve in that case. Better would be for
>>> + * the server to reduce the connection's credit limit.
>>> + */
>>> + i = 1 + RPCSVC_MAXPAGES / dev->attrs.max_fast_reg_page_list_len;
>>> + i *= xprt->sc_max_requests;
>>> +
>>> + while (i--) {
>>> + struct svc_rdma_fastreg_mr *frmr;
>>> +
>>> + frmr = rdma_alloc_frmr(xprt);
>>> + if (!frmr) {
>>> + dprintk("svcrdma: No memory for request map\n");
>>> + return false;
>>> + }
>>> + list_add(&frmr->frmr_list, &xprt->sc_frmr_q);
>>> + }
>>> + return true;
>>> +}
>>> +
>>> struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
>>> {
>>> struct svc_rdma_fastreg_mr *frmr = NULL;
>>> @@ -975,10 +1006,9 @@ struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
>>> frmr->sg_nents = 0;
>>> }
>>> spin_unlock_bh(&rdma->sc_frmr_q_lock);
>>> - if (frmr)
>>> - return frmr;
>>> -
>>> - return rdma_alloc_frmr(rdma);
>>> + if (!frmr)
>>> + return ERR_PTR(-ENOMEM);
>>> + return frmr;
>>> }
>>>
>>> void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
>>> @@ -1149,6 +1179,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
>>> dev->attrs.max_fast_reg_page_list_len;
>>> newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
>>> newxprt->sc_reader = rdma_read_chunk_frmr;
>>> + if (!svc_rdma_prealloc_frmrs(newxprt))
>>> + goto errout;
>>> }
>>>
>>> /*
>>> @@ -1310,7 +1342,7 @@ static void __svc_rdma_free(struct work_struct *work)
>>> xprt->xpt_bc_xprt = NULL;
>>> }
>>>
>>> - rdma_dealloc_frmr_q(rdma);
>>> + svc_rdma_destroy_frmrs(rdma);
>>> svc_rdma_destroy_ctxts(rdma);
>>> svc_rdma_destroy_maps(rdma);
>>>
>
> --
> Chuck Lever
>
>
>
>

2016-02-05 18:13:37

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH v1 02/10] svcrdma: Make svc_rdma_get_frmr() not sleep


> On Feb 5, 2016, at 12:49 PM, Devesh Sharma <[email protected]> wrote:
>
> On Fri, Feb 5, 2016 at 9:59 PM, Chuck Lever <[email protected]> wrote:
>>
>>> On Feb 5, 2016, at 5:15 AM, Devesh Sharma <[email protected]> wrote:
>>>
>>> As I understand, you are pre-allocating because alloc_mr() can sleep
>>> so separate it out while map-frmr-wqe would be non-blocking context by
>>> definition, we are on the same page?
>>
>> alloc_mr() can sleep, and so can kmalloc(GFP_KERNEL).
>> Currently svc_rdma_get_frmr() is called in a worker thread,
>> so it's in a process context and can sleep, but if we were
>> to move the RDMA Read logic into a softIRQ context (say,
>> invoked during Receive processing), it would be a problem
>> to sleep.
>
> Got it.
>
>>
>> Another goal is to get rid of memory allocation operations
>> that can fail in low memory situations in areas of the code
>> where an error is hard to handle.
>
>>
>> So the patch description is misleading and incorrect.
>
> Got it
>
>>
>> However, the problem with pre-allocating frmrs is that it
>> is not possible for the server to determine how many will
>> be needed at accept time. The maximum number depends on the
>> largest possible ULP payload size, the page list depth of
>> the server's device, and the maximum chunk size the client
>> is using (it could be as small as 4KB for some registration
>> modes). That would mean allocating thousands per connection
>> up front to cover every contingency.
>
> Can't it decide on the fly? start with 'n' smaller size of frmrs
> where 'n' is derived from some static calculation. Then if the
> connection demands bigger size, increase the depth of all 'n' frmrs and
> reuse them till the life of the connection. if it demands more frmr,
> don't do anything
> and try to manage with 'n'. Does it makes sense I am sure I am missing
> few points here.

Currently, all svc_rdma_frmrs are the same. They are allocated
on-demand then cached after an RDMA Read operation is complete.


>> Perhaps a better solution would be to understand how to
>> deal with an frmr allocation failure in rdma_read_chunk_frmr().
>> I think just dropping the connection would be reasonable.
>> If ib_post_recv() fails, for example, this is exactly what
>> the server does.
>
> Dropping the connection seems logical, however, I think it would be
> a harsh decision for any hypothetical device which has less IB/RoCE
> resources e.g. any given VM in SRIOV enabled environment where lots
> of VFs are enabled.
>
> As suggested earlier in this mail, just continue the connection with whatever
> resources it has would let low-resource-device still to use it. if
> the system is recovered from memory-pressure, the allocations would
> succeed eventually.
> drawback is performance penalty.

When an frmr allocation fails, the RDMA Read to pull the
RPC data over can't be done, so there's no recourse but for
the server to discard that RPC request. NFSv4.x requires a
connection drop when an RPC is lost. The client will retransmit
that RPC.

Dropping the connection frees all resources for that connection.
Clients will typically reconnect pretty quickly when they have
more work to do. The server will drop the connection repeatedly
until enough resources are available to handle the client's
workload.

Eventually we could add a retry mechanism to the RDMA Read
logic. Wait a bit and try the allocation again. That's probably
not worth the effort, this really should be very very rare.

If a device is ultimately resource-starved, that's a different
problem. The server may never be able to allocate enough MRs
even if the client tries again later.

That kind of issue should be obvious at accept time. The server
should adjust the connection's credit limit downward to fit the
device's available resource budget.

Alternately, if the device is not an iWARP device, RDMA Reads
can use physical addressing, and no frwrs are needed.


>> Christoph says he is about to modify or replace the NFS
>> server's RDMA Read code paths, however.
>>
>> I'll drop this one.
>>
>>
>>> On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
>>>> Want to call it in a context that cannot sleep. So pre-alloc
>>>> the memory and the MRs.
>>>>
>>>> Signed-off-by: Chuck Lever <[email protected]>
>>>> ---
>>>> net/sunrpc/xprtrdma/svc_rdma_transport.c | 44 ++++++++++++++++++++++++++----
>>>> 1 file changed, 38 insertions(+), 6 deletions(-)
>>>>
>>>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>>> index 5763825..02eee12 100644
>>>> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>>> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>>> @@ -949,7 +949,7 @@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
>>>> return ERR_PTR(-ENOMEM);
>>>> }
>>>>
>>>> -static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
>>>> +static void svc_rdma_destroy_frmrs(struct svcxprt_rdma *xprt)
>>>> {
>>>> struct svc_rdma_fastreg_mr *frmr;
>>>>
>>>> @@ -963,6 +963,37 @@ static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
>>>> }
>>>> }
>>>>
>>>> +static bool svc_rdma_prealloc_frmrs(struct svcxprt_rdma *xprt)
>>>> +{
>>>> + struct ib_device *dev = xprt->sc_cm_id->device;
>>>> + unsigned int i;
>>>> +
>>>> + /* Pre-allocate based on the maximum amount of payload
>>>> + * the server's HCA can handle per RDMA Read, to keep
>>>> + * the number of MRs per connection in check.
>>>> + *
>>>> + * If a client sends small Read chunks (eg. it may be
>>>> + * using physical registration), more RDMA Reads per
>>>> + * NFS WRITE will be needed. svc_rdma_get_frmr() dips
>>>> + * into its reserve in that case. Better would be for
>>>> + * the server to reduce the connection's credit limit.
>>>> + */
>>>> + i = 1 + RPCSVC_MAXPAGES / dev->attrs.max_fast_reg_page_list_len;
>>>> + i *= xprt->sc_max_requests;
>>>> +
>>>> + while (i--) {
>>>> + struct svc_rdma_fastreg_mr *frmr;
>>>> +
>>>> + frmr = rdma_alloc_frmr(xprt);
>>>> + if (!frmr) {
>>>> + dprintk("svcrdma: No memory for request map\n");
>>>> + return false;
>>>> + }
>>>> + list_add(&frmr->frmr_list, &xprt->sc_frmr_q);
>>>> + }
>>>> + return true;
>>>> +}
>>>> +
>>>> struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
>>>> {
>>>> struct svc_rdma_fastreg_mr *frmr = NULL;
>>>> @@ -975,10 +1006,9 @@ struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
>>>> frmr->sg_nents = 0;
>>>> }
>>>> spin_unlock_bh(&rdma->sc_frmr_q_lock);
>>>> - if (frmr)
>>>> - return frmr;
>>>> -
>>>> - return rdma_alloc_frmr(rdma);
>>>> + if (!frmr)
>>>> + return ERR_PTR(-ENOMEM);
>>>> + return frmr;
>>>> }
>>>>
>>>> void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
>>>> @@ -1149,6 +1179,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
>>>> dev->attrs.max_fast_reg_page_list_len;
>>>> newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
>>>> newxprt->sc_reader = rdma_read_chunk_frmr;
>>>> + if (!svc_rdma_prealloc_frmrs(newxprt))
>>>> + goto errout;
>>>> }
>>>>
>>>> /*
>>>> @@ -1310,7 +1342,7 @@ static void __svc_rdma_free(struct work_struct *work)
>>>> xprt->xpt_bc_xprt = NULL;
>>>> }
>>>>
>>>> - rdma_dealloc_frmr_q(rdma);
>>>> + svc_rdma_destroy_frmrs(rdma);
>>>> svc_rdma_destroy_ctxts(rdma);
>>>> svc_rdma_destroy_maps(rdma);
>>>>
>>
>> --
>> Chuck Lever
>>
>>
>>
>>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever





2016-02-06 15:57:12

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 02/10] svcrdma: Make svc_rdma_get_frmr() not sleep

On Fri, Feb 5, 2016 at 11:43 PM, Chuck Lever <[email protected]> wrote:
>
>> On Feb 5, 2016, at 12:49 PM, Devesh Sharma <[email protected]> wrote:
>>
>> On Fri, Feb 5, 2016 at 9:59 PM, Chuck Lever <[email protected]> wrote:
>>>
>>>> On Feb 5, 2016, at 5:15 AM, Devesh Sharma <[email protected]> wrote:
>>>>
>>>> As I understand, you are pre-allocating because alloc_mr() can sleep
>>>> so separate it out while map-frmr-wqe would be non-blocking context by
>>>> definition, we are on the same page?
>>>
>>> alloc_mr() can sleep, and so can kmalloc(GFP_KERNEL).
>>> Currently svc_rdma_get_frmr() is called in a worker thread,
>>> so it's in a process context and can sleep, but if we were
>>> to move the RDMA Read logic into a softIRQ context (say,
>>> invoked during Receive processing), it would be a problem
>>> to sleep.
>>
>> Got it.
>>
>>>
>>> Another goal is to get rid of memory allocation operations
>>> that can fail in low memory situations in areas of the code
>>> where an error is hard to handle.
>>
>>>
>>> So the patch description is misleading and incorrect.
>>
>> Got it
>>
>>>
>>> However, the problem with pre-allocating frmrs is that it
>>> is not possible for the server to determine how many will
>>> be needed at accept time. The maximum number depends on the
>>> largest possible ULP payload size, the page list depth of
>>> the server's device, and the maximum chunk size the client
>>> is using (it could be as small as 4KB for some registration
>>> modes). That would mean allocating thousands per connection
>>> up front to cover every contingency.
>>
>> Can't it decide on the fly? start with 'n' smaller size of frmrs
>> where 'n' is derived from some static calculation. Then if the
>> connection demands bigger size, increase the depth of all 'n' frmrs and
>> reuse them till the life of the connection. if it demands more frmr,
>> don't do anything
>> and try to manage with 'n'. Does it makes sense I am sure I am missing
>> few points here.
>
> Currently, all svc_rdma_frmrs are the same. They are allocated
> on-demand then cached after an RDMA Read operation is complete.

Yes, it does the same.

>
>
>>> Perhaps a better solution would be to understand how to
>>> deal with an frmr allocation failure in rdma_read_chunk_frmr().
>>> I think just dropping the connection would be reasonable.
>>> If ib_post_recv() fails, for example, this is exactly what
>>> the server does.
>>
>> Dropping the connection seems logical, however, I think it would be
>> a harsh decision for any hypothetical device which has less IB/RoCE
>> resources e.g. any given VM in SRIOV enabled environment where lots
>> of VFs are enabled.
>>
>> As suggested earlier in this mail, just continue the connection with whatever
>> resources it has would let low-resource-device still to use it. if
>> the system is recovered from memory-pressure, the allocations would
>> succeed eventually.
>> drawback is performance penalty.
>
> When an frmr allocation fails, the RDMA Read to pull the
> RPC data over can't be done, so there's no recourse but for
> the server to discard that RPC request. NFSv4.x requires a
> connection drop when an RPC is lost. The client will retransmit
> that RPC.
>
> Dropping the connection frees all resources for that connection.
> Clients will typically reconnect pretty quickly when they have
> more work to do. The server will drop the connection repeatedly
> until enough resources are available to handle the client's
> workload.

Ok, effectively user will see "failed to reconnect" messages in syslog
until it succeeds. I think if the syslog messages displays a valid reason
then we are covered here.

>
> Eventually we could add a retry mechanism to the RDMA Read
> logic. Wait a bit and try the allocation again. That's probably
> not worth the effort, this really should be very very rare.

Makes sense, I agree with you here.

>
> If a device is ultimately resource-starved, that's a different
> problem. The server may never be able to allocate enough MRs
> even if the client tries again later.

Yes

>
> That kind of issue should be obvious at accept time. The server
> should adjust the connection's credit limit downward to fit the
> device's available resource budget.
>
> Alternately, if the device is not an iWARP device, RDMA Reads
> can use physical addressing, and no frwrs are needed.
>
>

Ok, lets wait for Christoph's patch in NFS server. Till then I agree
with your decision on this patch.

>>> Christoph says he is about to modify or replace the NFS
>>> server's RDMA Read code paths, however.
>>>
>>> I'll drop this one.
>>>
>>>
>>>> On Wed, Feb 3, 2016 at 9:21 PM, Chuck Lever <[email protected]> wrote:
>>>>> Want to call it in a context that cannot sleep. So pre-alloc
>>>>> the memory and the MRs.
>>>>>
>>>>> Signed-off-by: Chuck Lever <[email protected]>
>>>>> ---
>>>>> net/sunrpc/xprtrdma/svc_rdma_transport.c | 44 ++++++++++++++++++++++++++----
>>>>> 1 file changed, 38 insertions(+), 6 deletions(-)
>>>>>
>>>>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>>>> index 5763825..02eee12 100644
>>>>> --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>>>> +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
>>>>> @@ -949,7 +949,7 @@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
>>>>> return ERR_PTR(-ENOMEM);
>>>>> }
>>>>>
>>>>> -static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
>>>>> +static void svc_rdma_destroy_frmrs(struct svcxprt_rdma *xprt)
>>>>> {
>>>>> struct svc_rdma_fastreg_mr *frmr;
>>>>>
>>>>> @@ -963,6 +963,37 @@ static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
>>>>> }
>>>>> }
>>>>>
>>>>> +static bool svc_rdma_prealloc_frmrs(struct svcxprt_rdma *xprt)
>>>>> +{
>>>>> + struct ib_device *dev = xprt->sc_cm_id->device;
>>>>> + unsigned int i;
>>>>> +
>>>>> + /* Pre-allocate based on the maximum amount of payload
>>>>> + * the server's HCA can handle per RDMA Read, to keep
>>>>> + * the number of MRs per connection in check.
>>>>> + *
>>>>> + * If a client sends small Read chunks (eg. it may be
>>>>> + * using physical registration), more RDMA Reads per
>>>>> + * NFS WRITE will be needed. svc_rdma_get_frmr() dips
>>>>> + * into its reserve in that case. Better would be for
>>>>> + * the server to reduce the connection's credit limit.
>>>>> + */
>>>>> + i = 1 + RPCSVC_MAXPAGES / dev->attrs.max_fast_reg_page_list_len;
>>>>> + i *= xprt->sc_max_requests;
>>>>> +
>>>>> + while (i--) {
>>>>> + struct svc_rdma_fastreg_mr *frmr;
>>>>> +
>>>>> + frmr = rdma_alloc_frmr(xprt);
>>>>> + if (!frmr) {
>>>>> + dprintk("svcrdma: No memory for request map\n");
>>>>> + return false;
>>>>> + }
>>>>> + list_add(&frmr->frmr_list, &xprt->sc_frmr_q);
>>>>> + }
>>>>> + return true;
>>>>> +}
>>>>> +
>>>>> struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
>>>>> {
>>>>> struct svc_rdma_fastreg_mr *frmr = NULL;
>>>>> @@ -975,10 +1006,9 @@ struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
>>>>> frmr->sg_nents = 0;
>>>>> }
>>>>> spin_unlock_bh(&rdma->sc_frmr_q_lock);
>>>>> - if (frmr)
>>>>> - return frmr;
>>>>> -
>>>>> - return rdma_alloc_frmr(rdma);
>>>>> + if (!frmr)
>>>>> + return ERR_PTR(-ENOMEM);
>>>>> + return frmr;
>>>>> }
>>>>>
>>>>> void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
>>>>> @@ -1149,6 +1179,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
>>>>> dev->attrs.max_fast_reg_page_list_len;
>>>>> newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
>>>>> newxprt->sc_reader = rdma_read_chunk_frmr;
>>>>> + if (!svc_rdma_prealloc_frmrs(newxprt))
>>>>> + goto errout;
>>>>> }
>>>>>
>>>>> /*
>>>>> @@ -1310,7 +1342,7 @@ static void __svc_rdma_free(struct work_struct *work)
>>>>> xprt->xpt_bc_xprt = NULL;
>>>>> }
>>>>>
>>>>> - rdma_dealloc_frmr_q(rdma);
>>>>> + svc_rdma_destroy_frmrs(rdma);
>>>>> svc_rdma_destroy_ctxts(rdma);
>>>>> svc_rdma_destroy_maps(rdma);
>>>>>
>>>
>>> --
>>> Chuck Lever
>>>
>>>
>>>
>>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
> --
> Chuck Lever
>
>
>
>