2015-01-13 16:02:30

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 00/10] NFS/RDMA server for 3.20

In addition to miscellaneous clean up, the following series of
patches for the Linux NFS server introduces support in the
server’s RPC/RDMA transport implementation for RDMA_NOMSG type
messages, and fixes a bug that prevents the server from handling
RPC/RDMA messages with inline content following the read list.

These patches are contained in the topic branch "nfsd-rdma-for-3.20"
at:

git://git.linux-nfs.org/projects/cel/cel-2.6.git

Changes since v1:
- Rebased on 3.19-rc4
- Patch descriptions clarified
- Bug addressed in "svcrdma: Handle additional inline content"

---

Chuck Lever (10):
svcrdma: Handle additional inline content
svcrdma: Move read list XDR round-up logic
svcrdma: Support RDMA_NOMSG requests
svcrdma: rc_position sanity checking
svcrdma: Plant reader function in struct svcxprt_rdma
svcrdma: Find rmsgp more reliably
svcrdma: Scrub BUG_ON() and WARN_ON() call sites
svcrdma: Clean up read chunk counting
svcrdma: Remove unused variable
svcrdma: Clean up dprintk


include/linux/sunrpc/svc_rdma.h | 13 +-
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 16 --
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 244 +++++++++++++++++++-----------
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 46 +++---
net/sunrpc/xprtrdma/svc_rdma_transport.c | 47 ++++--
5 files changed, 217 insertions(+), 149 deletions(-)

--
Chuck Lever


2015-01-13 16:02:41

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 01/10] svcrdma: Clean up dprintk

Nit: Fix inconsistent white space in dprintk messages.

Signed-off-by: Chuck Lever <[email protected]>
---

net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 8 ++++----
1 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index e011027..2c67de0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -501,8 +501,8 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
ret = rqstp->rq_arg.head[0].iov_len
+ rqstp->rq_arg.page_len
+ rqstp->rq_arg.tail[0].iov_len;
- dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
- "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+ dprintk("svcrdma: deferred read ret=%d, rq_arg.len=%u, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zu\n",
ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
rqstp->rq_arg.head[0].iov_len);

@@ -591,8 +591,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
+ rqstp->rq_arg.tail[0].iov_len;
svc_rdma_put_context(ctxt, 0);
out:
- dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
- "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+ dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
ret, rqstp->rq_arg.len,
rqstp->rq_arg.head[0].iov_base,
rqstp->rq_arg.head[0].iov_len);


2015-01-13 16:02:48

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 02/10] svcrdma: Remove unused variable

Nit: remove an unused variable to squelch a compiler warning.

Signed-off-by: Chuck Lever <[email protected]>
---

net/sunrpc/xprtrdma/svc_rdma_transport.c | 2 --
1 files changed, 0 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 4e61880..4ba11d0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -687,7 +687,6 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
{
struct rdma_cm_id *listen_id;
struct svcxprt_rdma *cma_xprt;
- struct svc_xprt *xprt;
int ret;

dprintk("svcrdma: Creating RDMA socket\n");
@@ -698,7 +697,6 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
cma_xprt = rdma_create_xprt(serv, 1);
if (!cma_xprt)
return ERR_PTR(-ENOMEM);
- xprt = &cma_xprt->sc_xprt;

listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP,
IB_QPT_RC);


2015-01-13 16:02:57

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 03/10] svcrdma: Clean up read chunk counting

The byte_count argument is not used, and the function is called
only from one place.

Signed-off-by: Chuck Lever <[email protected]>
---

include/linux/sunrpc/svc_rdma.h | 2 --
net/sunrpc/xprtrdma/svc_rdma_marshal.c | 16 ----------------
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 15 ++++++++++++---
3 files changed, 12 insertions(+), 21 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 975da75..2280325 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -178,8 +178,6 @@ struct svcxprt_rdma {
#define RPCRDMA_MAX_REQ_SIZE 4096

/* svc_rdma_marshal.c */
-extern void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *,
- int *, int *);
extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg **, struct svc_rqst *);
extern int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *);
extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index 65b1462..b681855 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -71,22 +71,6 @@ static u32 *decode_read_list(u32 *va, u32 *vaend)
}

/*
- * Determine number of chunks and total bytes in chunk list. The chunk
- * list has already been verified to fit within the RPCRDMA header.
- */
-void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *ch,
- int *ch_count, int *byte_count)
-{
- /* compute the number of bytes represented by read chunks */
- *byte_count = 0;
- *ch_count = 0;
- for (; ch->rc_discrim != 0; ch++) {
- *byte_count = *byte_count + ntohl(ch->rc_target.rs_length);
- *ch_count = *ch_count + 1;
- }
-}
-
-/*
* Decodes a write chunk list. The expected format is as follows:
* descrim : xdr_one
* nchunks : <count>
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 2c67de0..b3b7bb8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -365,12 +365,22 @@ static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
return ret;
}

+static unsigned int
+rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch)
+{
+ unsigned int count;
+
+ for (count = 0; ch->rc_discrim != xdr_zero; ch++)
+ count++;
+ return count;
+}
+
static int rdma_read_chunks(struct svcxprt_rdma *xprt,
struct rpcrdma_msg *rmsgp,
struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *head)
{
- int page_no, ch_count, ret;
+ int page_no, ret;
struct rpcrdma_read_chunk *ch;
u32 page_offset, byte_count;
u64 rs_offset;
@@ -381,8 +391,7 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
if (!ch)
return 0;

- svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
- if (ch_count > RPCSVC_MAXPAGES)
+ if (rdma_rcl_chunk_count(ch) > RPCSVC_MAXPAGES)
return -EINVAL;

/* The request is completed when the RDMA_READs complete. The


2015-01-13 16:03:06

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 04/10] svcrdma: Scrub BUG_ON() and WARN_ON() call sites

Current convention is to avoid using BUG_ON() in places where an
oops could cause complete system failure.

Replace BUG_ON() call sites in svcrdma with an assertion error
message and allow execution to continue safely.

Some BUG_ON() calls are removed because they have never fired in
production (that we are aware of).

Some WARN_ON() calls are also replaced where a back trace is not
helpful; e.g., in a workqueue task.

Signed-off-by: Chuck Lever <[email protected]>
---

net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 11 --------
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 28 +++++++++++++++-----
net/sunrpc/xprtrdma/svc_rdma_transport.c | 43 +++++++++++++++++++-----------
3 files changed, 49 insertions(+), 33 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index b3b7bb8..577f865 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -95,14 +95,6 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
rqstp->rq_respages = &rqstp->rq_pages[sge_no];
rqstp->rq_next_page = rqstp->rq_respages + 1;

- /* We should never run out of SGE because the limit is defined to
- * support the max allowed RPC data length
- */
- BUG_ON(bc && (sge_no == ctxt->count));
- BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
- != byte_count);
- BUG_ON(rqstp->rq_arg.len != byte_count);
-
/* If not all pages were used from the SGL, free the remaining ones */
bc = sge_no;
while (sge_no < ctxt->count) {
@@ -477,8 +469,6 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
int page_no;
int ret;

- BUG_ON(!head);
-
/* Copy RPC pages */
for (page_no = 0; page_no < head->count; page_no++) {
put_page(rqstp->rq_pages[page_no]);
@@ -567,7 +557,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
}
dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
ctxt, rdma_xprt, rqstp, ctxt->wc_status);
- BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
atomic_inc(&rdma_stat_recv);

/* Build up the XDR from the receive buffers. */
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 9f1b506..7d79897 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -60,8 +60,11 @@ static int map_xdr(struct svcxprt_rdma *xprt,
u32 page_off;
int page_no;

- BUG_ON(xdr->len !=
- (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
+ if (xdr->len !=
+ (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
+ pr_err("svcrdma: map_xdr: XDR buffer length error\n");
+ return -EIO;
+ }

/* Skip the first sge, this is for the RPCRDMA header */
sge_no = 1;
@@ -150,7 +153,11 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
int bc;
struct svc_rdma_op_ctxt *ctxt;

- BUG_ON(vec->count > RPCSVC_MAXPAGES);
+ if (vec->count > RPCSVC_MAXPAGES) {
+ pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
+ return -EIO;
+ }
+
dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
"write_len=%d, vec->sge=%p, vec->count=%lu\n",
rmr, (unsigned long long)to, xdr_off,
@@ -190,7 +197,10 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
sge_off = 0;
sge_no++;
xdr_sge_no++;
- BUG_ON(xdr_sge_no > vec->count);
+ if (xdr_sge_no > vec->count) {
+ pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
+ goto err;
+ }
bc -= sge_bytes;
if (sge_no == xprt->sc_max_sge)
break;
@@ -421,7 +431,10 @@ static int send_reply(struct svcxprt_rdma *rdma,
ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
ctxt->sge[sge_no].length = sge_bytes;
}
- BUG_ON(byte_count != 0);
+ if (byte_count != 0) {
+ pr_err("svcrdma: Could not map %d bytes\n", byte_count);
+ goto err;
+ }

/* Save all respages in the ctxt and remove them from the
* respages array. They are our pages until the I/O
@@ -442,7 +455,10 @@ static int send_reply(struct svcxprt_rdma *rdma,
}
rqstp->rq_next_page = rqstp->rq_respages + 1;

- BUG_ON(sge_no > rdma->sc_max_sge);
+ if (sge_no > rdma->sc_max_sge) {
+ pr_err("svcrdma: Too many sges (%d)\n", sge_no);
+ goto err;
+ }
memset(&send_wr, 0, sizeof send_wr);
ctxt->wr_op = IB_WR_SEND;
send_wr.wr_id = (unsigned long)ctxt;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 4ba11d0..f2e059b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -139,7 +139,6 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
struct svcxprt_rdma *xprt;
int i;

- BUG_ON(!ctxt);
xprt = ctxt->xprt;
if (free_pages)
for (i = 0; i < ctxt->count; i++)
@@ -339,12 +338,14 @@ static void process_context(struct svcxprt_rdma *xprt,

switch (ctxt->wr_op) {
case IB_WR_SEND:
- BUG_ON(ctxt->frmr);
+ if (ctxt->frmr)
+ pr_err("svcrdma: SEND: ctxt->frmr != NULL\n");
svc_rdma_put_context(ctxt, 1);
break;

case IB_WR_RDMA_WRITE:
- BUG_ON(ctxt->frmr);
+ if (ctxt->frmr)
+ pr_err("svcrdma: WRITE: ctxt->frmr != NULL\n");
svc_rdma_put_context(ctxt, 0);
break;

@@ -353,19 +354,21 @@ static void process_context(struct svcxprt_rdma *xprt,
svc_rdma_put_frmr(xprt, ctxt->frmr);
if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
- BUG_ON(!read_hdr);
- spin_lock_bh(&xprt->sc_rq_dto_lock);
- set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
- list_add_tail(&read_hdr->dto_q,
- &xprt->sc_read_complete_q);
- spin_unlock_bh(&xprt->sc_rq_dto_lock);
+ if (read_hdr) {
+ spin_lock_bh(&xprt->sc_rq_dto_lock);
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ list_add_tail(&read_hdr->dto_q,
+ &xprt->sc_read_complete_q);
+ spin_unlock_bh(&xprt->sc_rq_dto_lock);
+ } else {
+ pr_err("svcrdma: ctxt->read_hdr == NULL\n");
+ }
svc_xprt_enqueue(&xprt->sc_xprt);
}
svc_rdma_put_context(ctxt, 0);
break;

default:
- BUG_ON(1);
printk(KERN_ERR "svcrdma: unexpected completion type, "
"opcode=%d\n",
ctxt->wr_op);
@@ -513,7 +516,10 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
buflen = 0;
ctxt->direction = DMA_FROM_DEVICE;
for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
- BUG_ON(sge_no >= xprt->sc_max_sge);
+ if (sge_no >= xprt->sc_max_sge) {
+ pr_err("svcrdma: Too many sges (%d)\n", sge_no);
+ goto err_put_ctxt;
+ }
page = svc_rdma_get_page();
ctxt->pages[sge_no] = page;
pa = ib_dma_map_page(xprt->sc_cm_id->device,
@@ -820,7 +826,7 @@ void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
if (frmr) {
frmr_unmap_dma(rdma, frmr);
spin_lock_bh(&rdma->sc_frmr_q_lock);
- BUG_ON(!list_empty(&frmr->frmr_list));
+ WARN_ON_ONCE(!list_empty(&frmr->frmr_list));
list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
spin_unlock_bh(&rdma->sc_frmr_q_lock);
}
@@ -1123,7 +1129,9 @@ static void __svc_rdma_free(struct work_struct *work)
dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);

/* We should only be called from kref_put */
- BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
+ if (atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0)
+ pr_err("svcrdma: sc_xprt still in use? (%d)\n",
+ atomic_read(&rdma->sc_xprt.xpt_ref.refcount));

/*
* Destroy queued, but not processed read completions. Note
@@ -1151,8 +1159,12 @@ static void __svc_rdma_free(struct work_struct *work)
}

/* Warn if we leaked a resource or under-referenced */
- WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
- WARN_ON(atomic_read(&rdma->sc_dma_used) != 0);
+ if (atomic_read(&rdma->sc_ctxt_used) != 0)
+ pr_err("svcrdma: ctxt still in use? (%d)\n",
+ atomic_read(&rdma->sc_ctxt_used));
+ if (atomic_read(&rdma->sc_dma_used) != 0)
+ pr_err("svcrdma: dma still in use? (%d)\n",
+ atomic_read(&rdma->sc_dma_used));

/* De-allocate fastreg mr */
rdma_dealloc_frmr_q(rdma);
@@ -1252,7 +1264,6 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
return -ENOTCONN;

- BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
wr_count = 1;
for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
wr_count++;


2015-01-13 16:03:14

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 05/10] svcrdma: Find rmsgp more reliably

xdr_start() can return the wrong rmsgp address if an assumption
about how the xdr_buf was constructed changes. When it gets it
wrong, the client receives a reply that has gibberish in the
RPC/RDMA header, preventing it from matching a waiting RPC request.

Instead, make (and document) just one assumption: that the RDMA
header for the client's RPC call is at the start of the first page
in rq_pages.

Signed-off-by: Chuck Lever <[email protected]>
---

net/sunrpc/xprtrdma/svc_rdma_sendto.c | 18 ++++--------------
1 files changed, 4 insertions(+), 14 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 7d79897..7de33d1 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -483,18 +483,6 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
{
}

-/*
- * Return the start of an xdr buffer.
- */
-static void *xdr_start(struct xdr_buf *xdr)
-{
- return xdr->head[0].iov_base -
- (xdr->len -
- xdr->page_len -
- xdr->tail[0].iov_len -
- xdr->head[0].iov_len);
-}
-
int svc_rdma_sendto(struct svc_rqst *rqstp)
{
struct svc_xprt *xprt = rqstp->rq_xprt;
@@ -512,8 +500,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)

dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);

- /* Get the RDMA request header. */
- rdma_argp = xdr_start(&rqstp->rq_arg);
+ /* Get the RDMA request header. The receive logic always
+ * places this at the start of page 0.
+ */
+ rdma_argp = page_address(rqstp->rq_pages[0]);

/* Build an req vec for the XDR */
ctxt = svc_rdma_get_context(rdma);


2015-01-13 16:03:22

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 06/10] svcrdma: Plant reader function in struct svcxprt_rdma

The RDMA reader function doesn't change once an svcxprt_rdma is
instantiated. Instead of checking sc_devcap during every incoming
RPC, set the reader function once when the connection is accepted.

Signed-off-by: Chuck Lever <[email protected]>
---

include/linux/sunrpc/svc_rdma.h | 10 ++++
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 71 +++++++++++-------------------
net/sunrpc/xprtrdma/svc_rdma_transport.c | 2 +
3 files changed, 39 insertions(+), 44 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 2280325..f161e30 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -150,6 +150,10 @@ struct svcxprt_rdma {
struct ib_cq *sc_rq_cq;
struct ib_cq *sc_sq_cq;
struct ib_mr *sc_phys_mr; /* MR for server memory */
+ int (*sc_reader)(struct svcxprt_rdma *,
+ struct svc_rqst *,
+ struct svc_rdma_op_ctxt *,
+ int *, u32 *, u32, u32, u64, bool);
u32 sc_dev_caps; /* distilled device caps */
u32 sc_dma_lkey; /* local dma key */
unsigned int sc_frmr_pg_list_len;
@@ -195,6 +199,12 @@ extern int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *);

/* svc_rdma_recvfrom.c */
extern int svc_rdma_recvfrom(struct svc_rqst *);
+extern int rdma_read_chunk_lcl(struct svcxprt_rdma *, struct svc_rqst *,
+ struct svc_rdma_op_ctxt *, int *, u32 *,
+ u32, u32, u64, bool);
+extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
+ struct svc_rdma_op_ctxt *, int *, u32 *,
+ u32, u32, u64, bool);

/* svc_rdma_sendto.c */
extern int svc_rdma_sendto(struct svc_rqst *);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 577f865..c3aebc1 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -117,26 +117,16 @@ static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
return min_t(int, sge_count, xprt->sc_max_sge);
}

-typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head,
- int *page_no,
- u32 *page_offset,
- u32 rs_handle,
- u32 rs_length,
- u64 rs_offset,
- int last);
-
/* Issue an RDMA_READ using the local lkey to map the data sink */
-static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head,
- int *page_no,
- u32 *page_offset,
- u32 rs_handle,
- u32 rs_length,
- u64 rs_offset,
- int last)
+int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ int *page_no,
+ u32 *page_offset,
+ u32 rs_handle,
+ u32 rs_length,
+ u64 rs_offset,
+ bool last)
{
struct ib_send_wr read_wr;
int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
@@ -221,15 +211,15 @@ static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
}

/* Issue an RDMA_READ using an FRMR to map the data sink */
-static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head,
- int *page_no,
- u32 *page_offset,
- u32 rs_handle,
- u32 rs_length,
- u64 rs_offset,
- int last)
+int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ int *page_no,
+ u32 *page_offset,
+ u32 rs_handle,
+ u32 rs_length,
+ u64 rs_offset,
+ bool last)
{
struct ib_send_wr read_wr;
struct ib_send_wr inv_wr;
@@ -374,9 +364,9 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
{
int page_no, ret;
struct rpcrdma_read_chunk *ch;
- u32 page_offset, byte_count;
+ u32 handle, page_offset, byte_count;
u64 rs_offset;
- rdma_reader_fn reader;
+ bool last;

/* If no read list is present, return 0 */
ch = svc_rdma_get_read_chunk(rmsgp);
@@ -399,27 +389,20 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
head->arg.len = rqstp->rq_arg.len;
head->arg.buflen = rqstp->rq_arg.buflen;

- /* Use FRMR if supported */
- if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
- reader = rdma_read_chunk_frmr;
- else
- reader = rdma_read_chunk_lcl;
-
page_no = 0; page_offset = 0;
for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
ch->rc_discrim != 0; ch++) {
-
+ handle = be32_to_cpu(ch->rc_target.rs_handle);
+ byte_count = be32_to_cpu(ch->rc_target.rs_length);
xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
&rs_offset);
- byte_count = ntohl(ch->rc_target.rs_length);

while (byte_count > 0) {
- ret = reader(xprt, rqstp, head,
- &page_no, &page_offset,
- ntohl(ch->rc_target.rs_handle),
- byte_count, rs_offset,
- ((ch+1)->rc_discrim == 0) /* last */
- );
+ last = (ch + 1)->rc_discrim == xdr_zero;
+ ret = xprt->sc_reader(xprt, rqstp, head,
+ &page_no, &page_offset,
+ handle, byte_count,
+ rs_offset, last);
if (ret < 0)
goto err;
byte_count -= ret;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index f2e059b..f609c1c 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -974,10 +974,12 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
* NB: iWARP requires remote write access for the data sink
* of an RDMA_READ. IB does not.
*/
+ newxprt->sc_reader = rdma_read_chunk_lcl;
if (devattr.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
newxprt->sc_frmr_pg_list_len =
devattr.max_fast_reg_page_list_len;
newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
+ newxprt->sc_reader = rdma_read_chunk_frmr;
}

/*


2015-01-13 16:03:30

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 07/10] svcrdma: rc_position sanity checking

An RPC/RDMA client may send large RPC arguments via a read
list. This is a list of scatter/gather elements which convey
RPC call arguments too large to fit in a small RDMA SEND.

Each entry in the read list has a "position" field, whose value is
the byte offset in the XDR stream where the data in that entry is to
be inserted. Entries which share the same "position" value make up
the same RPC argument. The receiver inserts entries with the same
position field value in list order into the XDR stream.

Currently the Linux NFS/RDMA server cannot handle receiving read
chunks in more than one position, mostly because no current client
sends read lists with elements in more than one position. As a
sanity check, ensure that all received chunks have the same
"rc_position."

Signed-off-by: Chuck Lever <[email protected]>
---

net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 16 ++++++++++++----
1 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index c3aebc1..a67dd1a 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -365,6 +365,7 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
int page_no, ret;
struct rpcrdma_read_chunk *ch;
u32 handle, page_offset, byte_count;
+ u32 position;
u64 rs_offset;
bool last;

@@ -389,10 +390,17 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
head->arg.len = rqstp->rq_arg.len;
head->arg.buflen = rqstp->rq_arg.buflen;

- page_no = 0; page_offset = 0;
- for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
- ch->rc_discrim != 0; ch++) {
- handle = be32_to_cpu(ch->rc_target.rs_handle);
+ ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+ position = be32_to_cpu(ch->rc_position);
+
+ ret = 0;
+ page_no = 0;
+ page_offset = 0;
+ for (; ch->rc_discrim != xdr_zero; ch++) {
+ if (be32_to_cpu(ch->rc_position) != position)
+ goto err;
+
+ handle = be32_to_cpu(ch->rc_target.rs_handle),
byte_count = be32_to_cpu(ch->rc_target.rs_length);
xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
&rs_offset);


2015-01-13 16:03:39

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 08/10] svcrdma: Support RDMA_NOMSG requests

Currently the Linux server can not decode RDMA_NOMSG type requests.
Operations whose length exceeds the fixed size of RDMA SEND buffers,
like large NFSv4 CREATE(NF4LNK) operations, must be conveyed via
RDMA_NOMSG.

For an RDMA_MSG type request, the client sends the RPC/RDMA, RPC
headers, and some or all of the NFS arguments via RDMA SEND.

For an RDMA_NOMSG type request, the client sends just the RPC/RDMA
header via RDMA SEND. The request's read list contains elements for
the entire RPC message, including the RPC header.

NFSD expects the RPC/RMDA header and RPC header to be contiguous in
page zero of the XDR buffer. Add logic in the RDMA READ path to make
the read list contents land where the server prefers, when the
incoming message is a type RDMA_NOMSG message.

Signed-off-by: Chuck Lever <[email protected]>
---

include/linux/sunrpc/svc_rdma.h | 1 +
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 39 +++++++++++++++++++++++++++++--
2 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index f161e30..c343a94 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -79,6 +79,7 @@ struct svc_rdma_op_ctxt {
enum ib_wr_opcode wr_op;
enum ib_wc_status wc_status;
u32 byte_len;
+ u32 position;
struct svcxprt_rdma *xprt;
unsigned long flags;
enum dma_data_direction direction;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index a67dd1a..36cf51a 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -60,6 +60,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *ctxt,
u32 byte_count)
{
+ struct rpcrdma_msg *rmsgp;
struct page *page;
u32 bc;
int sge_no;
@@ -82,7 +83,14 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
/* If data remains, store it in the pagelist */
rqstp->rq_arg.page_len = bc;
rqstp->rq_arg.page_base = 0;
- rqstp->rq_arg.pages = &rqstp->rq_pages[1];
+
+ /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+ if (be32_to_cpu(rmsgp->rm_type) == RDMA_NOMSG)
+ rqstp->rq_arg.pages = &rqstp->rq_pages[0];
+ else
+ rqstp->rq_arg.pages = &rqstp->rq_pages[1];
+
sge_no = 1;
while (bc && sge_no < ctxt->count) {
page = ctxt->pages[sge_no];
@@ -383,7 +391,6 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
*/
head->arg.head[0] = rqstp->rq_arg.head[0];
head->arg.tail[0] = rqstp->rq_arg.tail[0];
- head->arg.pages = &head->pages[head->count];
head->hdr_count = head->count;
head->arg.page_base = 0;
head->arg.page_len = 0;
@@ -393,9 +400,17 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
position = be32_to_cpu(ch->rc_position);

+ /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
+ if (position == 0) {
+ head->arg.pages = &head->pages[0];
+ page_offset = head->byte_len;
+ } else {
+ head->arg.pages = &head->pages[head->count];
+ page_offset = 0;
+ }
+
ret = 0;
page_no = 0;
- page_offset = 0;
for (; ch->rc_discrim != xdr_zero; ch++) {
if (be32_to_cpu(ch->rc_position) != position)
goto err;
@@ -418,7 +433,10 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
head->arg.buflen += ret;
}
}
+
ret = 1;
+ head->position = position;
+
err:
/* Detach arg pages. svc_recv will replenish them */
for (page_no = 0;
@@ -465,6 +483,21 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
put_page(rqstp->rq_pages[page_no]);
rqstp->rq_pages[page_no] = head->pages[page_no];
}
+
+ /* Adjustments made for RDMA_NOMSG type requests */
+ if (head->position == 0) {
+ if (head->arg.len <= head->sge[0].length) {
+ head->arg.head[0].iov_len = head->arg.len -
+ head->byte_len;
+ head->arg.page_len = 0;
+ } else {
+ head->arg.head[0].iov_len = head->sge[0].length -
+ head->byte_len;
+ head->arg.page_len = head->arg.len -
+ head->sge[0].length;
+ }
+ }
+
/* Point rq_arg.pages past header */
rdma_fix_xdr_pad(&head->arg);
rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];


2015-01-13 16:03:47

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 09/10] svcrdma: Move read list XDR round-up logic

This is a pre-requisite for a subsequent patch.

Read list XDR round-up needs to be done _before_ additional inline
content is copied to the end of the XDR buffer's page list. Move
the logic added by commit e560e3b510d2 ("svcrdma: Add zero padding
if the client doesn't send it").

Signed-off-by: Chuck Lever <[email protected]>
---

net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 37 ++++++++-----------------------
1 files changed, 9 insertions(+), 28 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 36cf51a..a345cad 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -43,7 +43,6 @@
#include <linux/sunrpc/debug.h>
#include <linux/sunrpc/rpc_rdma.h>
#include <linux/spinlock.h>
-#include <linux/highmem.h>
#include <asm/unaligned.h>
#include <rdma/ib_verbs.h>
#include <rdma/rdma_cm.h>
@@ -434,6 +433,15 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
}
}

+ /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
+ if (page_offset & 3) {
+ u32 pad = 4 - (page_offset & 3);
+
+ head->arg.page_len += pad;
+ head->arg.len += pad;
+ head->arg.buflen += pad;
+ }
+
ret = 1;
head->position = position;

@@ -446,32 +454,6 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
return ret;
}

-/*
- * To avoid a separate RDMA READ just for a handful of zero bytes,
- * RFC 5666 section 3.7 allows the client to omit the XDR zero pad
- * in chunk lists.
- */
-static void
-rdma_fix_xdr_pad(struct xdr_buf *buf)
-{
- unsigned int page_len = buf->page_len;
- unsigned int size = (XDR_QUADLEN(page_len) << 2) - page_len;
- unsigned int offset, pg_no;
- char *p;
-
- if (size == 0)
- return;
-
- pg_no = page_len >> PAGE_SHIFT;
- offset = page_len & ~PAGE_MASK;
- p = page_address(buf->pages[pg_no]);
- memset(p + offset, 0, size);
-
- buf->page_len += size;
- buf->buflen += size;
- buf->len += size;
-}
-
static int rdma_read_complete(struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *head)
{
@@ -499,7 +481,6 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
}

/* Point rq_arg.pages past header */
- rdma_fix_xdr_pad(&head->arg);
rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
rqstp->rq_arg.page_len = head->arg.page_len;
rqstp->rq_arg.page_base = head->arg.page_base;


2015-01-13 16:03:55

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 10/10] svcrdma: Handle additional inline content

Most NFS RPCs place their large payload argument at the end of the
RPC header (eg, NFSv3 WRITE). For NFSv3 WRITE and SYMLINK, RPC/RDMA
sends the complete RPC header inline, and the payload argument in
the read list. Data in the read list is the last part of the XDR
stream.

One important case is not like this, however. NFSv4 COMPOUND is a
counted array of operations. A WRITE operation, with its large data
payload, can appear in the middle of the compound's operations
array. Thus NFSv4 WRITE compounds can have header content after the
WRITE payload.

The Linux client, for example, performs an NFSv4 WRITE like this:

{ PUTFH, WRITE, GETATTR }

Though RFC 5667 is not precise about this, the proper way to convey
this compound is to place the GETATTR inline, _after_ the front of
the RPC header. The receiver inserts the read list payload into the
XDR stream after the initial WRITE arguments, and before the GETATTR
operation, thanks to the value of the read list "position" field.

The Linux client currently sends the GETATTR at the end of the
RPC/RDMA read list, which is incorrect. It will be corrected in the
future.

The Linux server currently rejects NFSv4 compounds with inline
content after the read list. For the above NFSv4 WRITE compound, the
NFS compound header indicates there are three operations, but the
server finds nonsense when it looks in the XDR stream for the third
operation, and the compound fails with OP_ILLEGAL.

Move trailing inline content to the end of the XDR buffer's page
list. This presents incoming NFSv4 WRITE compounds to NFSD in the
same way the socket transport does.

Signed-off-by: Chuck Lever <[email protected]>
---

net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 55 +++++++++++++++++++++++++++++++
1 files changed, 55 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index a345cad..f9f13a3 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -364,6 +364,56 @@ rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch)
return count;
}

+/* If there was additional inline content, append it to the end of arg.pages.
+ * Tail copy has to be done after the reader function has determined how many
+ * pages are needed for RDMA READ.
+ */
+static int
+rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head,
+ u32 position, u32 byte_count, u32 page_offset, int page_no)
+{
+ char *srcp, *destp;
+ int ret;
+
+ ret = 0;
+ srcp = head->arg.head[0].iov_base + position;
+ byte_count = head->arg.head[0].iov_len - position;
+ if (byte_count > PAGE_SIZE) {
+ dprintk("svcrdma: large tail unsupported\n");
+ return 0;
+ }
+
+ /* Fit as much of the tail on the current page as possible */
+ if (page_offset != PAGE_SIZE) {
+ destp = page_address(rqstp->rq_arg.pages[page_no]);
+ destp += page_offset;
+ while (byte_count--) {
+ *destp++ = *srcp++;
+ page_offset++;
+ if (page_offset == PAGE_SIZE && byte_count)
+ goto more;
+ }
+ goto done;
+ }
+
+more:
+ /* Fit the rest on the next page */
+ page_no++;
+ destp = page_address(rqstp->rq_arg.pages[page_no]);
+ while (byte_count--)
+ *destp++ = *srcp++;
+
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+done:
+ byte_count = head->arg.head[0].iov_len - position;
+ head->arg.page_len += byte_count;
+ head->arg.len += byte_count;
+ head->arg.buflen += byte_count;
+ return 1;
+}
+
static int rdma_read_chunks(struct svcxprt_rdma *xprt,
struct rpcrdma_msg *rmsgp,
struct svc_rqst *rqstp,
@@ -440,9 +490,14 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt,
head->arg.page_len += pad;
head->arg.len += pad;
head->arg.buflen += pad;
+ page_offset += pad;
}

ret = 1;
+ if (position && position < head->arg.head[0].iov_len)
+ ret = rdma_copy_tail(rqstp, head, position,
+ byte_count, page_offset, page_no);
+ head->arg.head[0].iov_len = position;
head->position = position;

err:


2015-01-13 17:31:14

by Steve Wise

[permalink] [raw]

2015-01-15 19:01:12

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH v2 00/10] NFS/RDMA server for 3.20

On Tue, Jan 13, 2015 at 11:31:16AM -0600, Steve Wise wrote:
>
> Reviewed-by: Steve Wise <[email protected]>

Thanks, applying for 3.20.

--b.