2016-02-12 21:05:55

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 0/8] NFS/RDMA client patches for v4.6

There continues to be some fallout from enabling NFSv4.1/RDMA, and
from converting the reply handler to use a work queue. This series
includes some bug fixes for those issues.

Logic to handle the RPC-over-RDMA RDMA_ERROR message type is also
introduced into the RPC reply handler.

Also included is a patch set to convert xprtrdma to use the new core
CQ API.

Available in the "nfs-rdma-for-4.6" topic branch of this git repo:

git://git.linux-nfs.org/projects/cel/cel-2.6.git

Or for browsing:

http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfs-rdma-for-4.6


Chuck Lever (8):
xprtrdma: Segment head and tail XDR buffers on page boundaries
xprtrdma: Invalidate memory when a signal is caught
rpcrdma: Add RPCRDMA_HDRLEN_ERR
xprtrdma: Properly handle RDMA_ERROR replies
xprtrdma: Serialize credit accounting again
xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs
xprtrdma: Use an anonymous union in struct rpcrdma_mw
xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs


include/linux/sunrpc/rpc_rdma.h | 12 +-
net/sunrpc/xprtrdma/fmr_ops.c | 28 +++---
net/sunrpc/xprtrdma/frwr_ops.c | 137 ++++++++++++++++++---------
net/sunrpc/xprtrdma/rpc_rdma.c | 89 +++++++++++++-----
net/sunrpc/xprtrdma/transport.c | 41 +++++++-
net/sunrpc/xprtrdma/verbs.c | 198 ++++++++++++---------------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 15 +--
7 files changed, 279 insertions(+), 241 deletions(-)

--
Chuck Lever


2016-02-12 21:06:04

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 1/8] xprtrdma: Segment head and tail XDR buffers on page boundaries

A single memory allocation is used for the pair of buffers wherein
the RPC client builds an RPC call message and decodes its matching
reply. These buffers are sized based on the maximum possible size
of the RPC call and reply messages for the operation in progress.

This means that as the call buffer increases in size, the start of
the reply buffer is pushed farther into the memory allocation.

RPC requests are growing in size. It used to be that both the call
and reply buffers fit inside a single page.

But these days, thanks to NFSv4 (and especially security labels in
NFSv4.2) the maximum call and reply sizes are large. NFSv4.0 OPEN,
for example, now requires a 6KB allocation for a pair of call and
reply buffers, and NFSv4 LOOKUP is not far behind.

As the maximum size of a call increases, the reply buffer is pushed
far enough into the buffer's memory allocation that a page boundary
can appear in the middle of it.

When the maximum possible reply size is larger than the client's
RDMA receive buffers (currently 1KB), the client has to register a
Reply chunk for the server to RDMA Write the reply into.

The logic in rpcrdma_convert_iovs() assumes that xdr_buf head and
tail buffers would always be contained on a single page. It supplies
just one segment for the head and one for the tail.

FMR, for example, registers up to a page boundary (only a portion of
the reply buffer in the OPEN case above). But without additional
segments, it doesn't register the rest of the buffer.

When the server tries to write the OPEN reply, the RDMA Write fails
with a remote access error since the client registered only part of
the Reply chunk.

rpcrdma_convert_iovs() must split the XDR buffer into multiple
segments, each of which are guaranteed not to contain a page
boundary. That way fmr_op_map is given the proper number of segments
to register the whole reply buffer.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 42 ++++++++++++++++++++++++++++++----------
1 file changed, 32 insertions(+), 10 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 0f28f2d..add1f98 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -132,6 +132,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
return tlen;
}

+/* Split "vec" on page boundaries into segments. FMR registers pages,
+ * not a byte range. Other modes coalesce these segments into a single
+ * MR when they can.
+ */
+static int
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
+ int n, int nsegs)
+{
+ size_t page_offset;
+ u32 remaining;
+ char *base;
+
+ base = vec->iov_base;
+ page_offset = offset_in_page(base);
+ remaining = vec->iov_len;
+ while (remaining && n < nsegs) {
+ seg[n].mr_page = NULL;
+ seg[n].mr_offset = base;
+ seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
+ remaining -= seg[n].mr_len;
+ base += seg[n].mr_len;
+ ++n;
+ page_offset = 0;
+ }
+ return n;
+}
+
/*
* Chunk assembly from upper layer xdr_buf.
*
@@ -150,11 +177,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
int page_base;
struct page **ppages;

- if (pos == 0 && xdrbuf->head[0].iov_len) {
- seg[n].mr_page = NULL;
- seg[n].mr_offset = xdrbuf->head[0].iov_base;
- seg[n].mr_len = xdrbuf->head[0].iov_len;
- ++n;
+ if (pos == 0) {
+ n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
+ if (n == nsegs)
+ return -EIO;
}

len = xdrbuf->page_len;
@@ -192,13 +218,9 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
* xdr pad bytes, saving the server an RDMA operation. */
if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
return n;
+ n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
if (n == nsegs)
- /* Tail remains, but we're out of segments */
return -EIO;
- seg[n].mr_page = NULL;
- seg[n].mr_offset = xdrbuf->tail[0].iov_base;
- seg[n].mr_len = xdrbuf->tail[0].iov_len;
- ++n;
}

return n;


2016-02-12 21:06:13

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 2/8] xprtrdma: Invalidate memory when a signal is caught

When a signal occurs during a synchronous RPC, the RPC scheduler
causes the RPC task to exit immediately.

However, the RPC transaction is still running on the server, which
may read arguments from the client or deliver a reply into client
memory that is still registered.

Ensure that xprt_release() invalidates the signaled RPC's chunks
and waits for the invalidation to complete in order to fence the
memory from the server before the client re-allocates it.

Note it should be safe to sleep in this case. An asynchronous
RPC is never awoken by a signal.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 41 ++++++++++++++++++++++++++++++++-------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
2 files changed, 35 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index b1b009f..ceb7140 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -508,6 +508,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
+ req->rl_task = task;
return req->rl_sendbuf->rg_base;

out_rdmabuf:
@@ -555,6 +556,37 @@ out_fail:
return NULL;
}

+/* Invalidate registered memory still associated with this req.
+ *
+ * Normally, the RPC reply handler invalidates chunks.
+ *
+ * If we're here because a signal fired, this is an exiting
+ * synchronous RPC and the reply handler has not run, leaving
+ * the RPC's chunks registered. The server is still processing
+ * this RPC and can still read or update those chunks.
+ *
+ * Synchronously fence the chunks before this RPC terminates
+ * to ensure the server doesn't write into memory that has
+ * been reallocated.
+ */
+static void xprt_rdma_free_chunks(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req)
+{
+ unsigned int i;
+
+ if (!RPC_IS_ASYNC(req->rl_task)) {
+ r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
+ return;
+ }
+
+ pr_warn("rpcrdma: freeing async RPC task with registered chunks\n");
+ for (i = 0; req->rl_nchunks;) {
+ --req->rl_nchunks;
+ i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
+ &req->rl_segments[i]);
+ }
+}
+
/*
* This function returns all RDMA resources to the pool.
*/
@@ -564,7 +596,6 @@ xprt_rdma_free(void *buffer)
struct rpcrdma_req *req;
struct rpcrdma_xprt *r_xprt;
struct rpcrdma_regbuf *rb;
- int i;

if (buffer == NULL)
return;
@@ -578,12 +609,8 @@ xprt_rdma_free(void *buffer)

dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);

- for (i = 0; req->rl_nchunks;) {
- --req->rl_nchunks;
- i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
- &req->rl_segments[i]);
- }
-
+ if (req->rl_nchunks)
+ xprt_rdma_free_chunks(r_xprt, req);
rpcrdma_buffer_put(req);
}

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 38fe11b..bf98c67 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -280,6 +280,7 @@ struct rpcrdma_req {
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
+ struct rpc_task *rl_task;

struct list_head rl_all;
bool rl_backchannel;


2016-02-12 21:06:20

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 3/8] rpcrdma: Add RPCRDMA_HDRLEN_ERR

Error headers are shorter than either RDMA_MSG or RDMA_NOMSG.

Since HDRLEN_MIN is already used in several other places that would
be annoying to change, add RPCRDMA_HDRLEN_ERR for the one or two
spots where the shorter length is needed.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 1 +
1 file changed, 1 insertion(+)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index f33c5a4..8c6d23c 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -102,6 +102,7 @@ struct rpcrdma_msg {
* Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
*/
#define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7)
+#define RPCRDMA_HDRLEN_ERR (sizeof(__be32) * 5)

enum rpcrdma_errcode {
ERR_VERS = 1,


2016-02-12 21:06:37

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 5/8] xprtrdma: Serialize credit accounting again

Commit fe97b47cd623 ("xprtrdma: Use workqueue to process RPC/RDMA
replies") replaced the reply tasklet with a workqueue that allows
RPC replies to be processed in parallel. Thus the credit values in
RPC-over-RDMA replies can applied in a different order than in
which the server sent them.

To fix this, revert commit eba8ff660b2d ("xprtrdma: Move credit
update to RPC reply handler"). Done by hand to accommodate code
changes that have occurred since then.

Fixes: fe97b47cd623 ("xprtrdma: Use workqueue to process . . .")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 9 +--------
net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++++++++++++++-
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
3 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c341225..0c45288 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -797,7 +797,6 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
__be32 *iptr;
int rdmalen, status, rmerr;
unsigned long cwnd;
- u32 credits;

dprintk("RPC: %s: incoming rep %p\n", __func__, rep);

@@ -930,15 +929,9 @@ out:
if (req->rl_nchunks)
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);

- credits = be32_to_cpu(headerp->rm_credit);
- if (credits == 0)
- credits = 1; /* don't deadlock */
- else if (credits > r_xprt->rx_buf.rb_max_requests)
- credits = r_xprt->rx_buf.rb_max_requests;
-
spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd;
- xprt->cwnd = credits << RPC_CWNDSHIFT;
+ xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 878f1bf..fc1ef5f 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -190,6 +190,28 @@ rpcrdma_receive_worker(struct work_struct *work)
rpcrdma_reply_handler(rep);
}

+/* Perform basic sanity checking to avoid using garbage
+ * to update the credit grant value.
+ */
+static void
+rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
+{
+ struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
+ struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
+ u32 credits;
+
+ if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
+ return;
+
+ credits = be32_to_cpu(rmsgp->rm_credit);
+ if (credits == 0)
+ credits = 1; /* don't deadlock */
+ else if (credits > buffer->rb_max_requests)
+ credits = buffer->rb_max_requests;
+
+ atomic_set(&buffer->rb_credits, credits);
+}
+
static void
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
{
@@ -211,7 +233,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
ib_dma_sync_single_for_cpu(rep->rr_device,
rdmab_addr(rep->rr_rdmabuf),
rep->rr_len, DMA_FROM_DEVICE);
- prefetch(rdmab_to_msg(rep->rr_rdmabuf));
+
+ rpcrdma_update_granted_credits(rep);

out_schedule:
queue_work(rpcrdma_receive_wq, &rep->rr_work);
@@ -330,6 +353,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
connected:
dprintk("RPC: %s: %sconnected\n",
__func__, connstate > 0 ? "" : "dis");
+ atomic_set(&xprt->rx_buf.rb_credits, 1);
ep->rep_connected = connstate;
rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
@@ -943,6 +967,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);
+ atomic_set(&buf->rb_credits, 1);

rc = ia->ri_ops->ro_init(r_xprt);
if (rc)
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index bf98c67..efd6fa7 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -312,6 +312,7 @@ struct rpcrdma_buffer {
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
u32 rb_max_requests;
+ atomic_t rb_credits; /* most recent credit grant */

u32 rb_bc_srv_max_requests;
spinlock_t rb_reqslock; /* protect rb_allreqs */


2016-02-12 21:06:45

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 6/8] xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs

Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, xprtrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

Because each ib_cqe carries a pointer to a completion method, the
core can now post its own operations on a consumer's QP, and handle
the completions itself, without changes to the consumer.

xprtrdma's receive processing is already handled in a worker thread,
but there is some initial order-dependent processing that is done
in the soft IRQ context before the worker thread is scheduled.
IB_POLL_SOFTIRQ is a direct replacement for the current xprtrdma
receive code path.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 68 ++++++++++-----------------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
2 files changed, 19 insertions(+), 50 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index fc1ef5f..53c30e2 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -215,8 +215,9 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
static void
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
{
- struct rpcrdma_rep *rep =
- (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
+ rr_cqe);

/* WARNING: Only wr_id and status are reliable at this point */
if (wc->status != IB_WC_SUCCESS)
@@ -242,46 +243,23 @@ out_schedule:

out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("RPC: %s: rep %p: %s\n",
- __func__, rep, ib_wc_status_msg(wc->status));
+ pr_err("RPC: %s: Recv: %s (%u, vendor %u)\n",
+ __func__, ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
rep->rr_len = RPCRDMA_BAD_LEN;
goto out_schedule;
}

-/* The wc array is on stack: automatic memory is always CPU-local.
+/**
+ * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
*
- * struct ib_wc is 64 bytes, making the poll array potentially
- * large. But this is at the bottom of the call chain. Further
- * substantial work is done in another thread.
- */
-static void
-rpcrdma_recvcq_poll(struct ib_cq *cq)
-{
- struct ib_wc *pos, wcs[4];
- int count, rc;
-
- do {
- pos = wcs;
-
- rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
- if (rc < 0)
- break;
-
- count = rc;
- while (count-- > 0)
- rpcrdma_recvcq_process_wc(pos++);
- } while (rc == ARRAY_SIZE(wcs));
-}
-
-/* Handle provider receive completion upcalls.
*/
static void
-rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
+rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
{
- do {
- rpcrdma_recvcq_poll(cq);
- } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
- IB_CQ_REPORT_MISSED_EVENTS) > 0);
+ rpcrdma_recvcq_process_wc(wc);
}

static void
@@ -655,9 +633,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out2;
}

- cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
- recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
- rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+ recvcq = ib_alloc_cq(ia->ri_device, NULL,
+ ep->rep_attr.cap.max_recv_wr + 1,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -665,14 +643,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out2;
}

- rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
- if (rc) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- ib_destroy_cq(recvcq);
- goto out2;
- }
-
ep->rep_attr.send_cq = sendcq;
ep->rep_attr.recv_cq = recvcq;

@@ -735,10 +705,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ia->ri_id->qp = NULL;
}

- rc = ib_destroy_cq(ep->rep_attr.recv_cq);
- if (rc)
- dprintk("RPC: %s: ib_destroy_cq returned %i\n",
- __func__, rc);
+ ib_free_cq(ep->rep_attr.recv_cq);

rc = ib_destroy_cq(ep->rep_attr.send_cq);
if (rc)
@@ -947,6 +914,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
}

rep->rr_device = ia->ri_device;
+ rep->rr_cqe.done = rpcrdma_receive_wc;
rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
return rep;
@@ -1322,7 +1290,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
int rc;

recv_wr.next = NULL;
- recv_wr.wr_id = (u64) (unsigned long) rep;
+ recv_wr.wr_cqe = &rep->rr_cqe;
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index efd6fa7..7d87cdc 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -171,6 +171,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
struct rpcrdma_buffer;

struct rpcrdma_rep {
+ struct ib_cqe rr_cqe;
unsigned int rr_len;
struct ib_device *rr_device;
struct rpcrdma_xprt *rr_rxprt;


2016-02-12 21:06:29

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 4/8] xprtrdma: Properly handle RDMA_ERROR replies

These are shorter than RPCRDMA_HDRLEN_MIN, and they need to
complete the waiting RPC.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 11 ++++++-----
net/sunrpc/xprtrdma/rpc_rdma.c | 38 +++++++++++++++++++++++++++++++++-----
2 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index 8c6d23c..3b1ff38 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -93,6 +93,12 @@ struct rpcrdma_msg {
__be32 rm_pempty[3]; /* 3 empty chunk lists */
} rm_padded;

+ struct {
+ __be32 rm_err;
+ __be32 rm_vers_low;
+ __be32 rm_vers_high;
+ } rm_error;
+
__be32 rm_chunks[0]; /* read, write and reply chunks */

} rm_body;
@@ -109,11 +115,6 @@ enum rpcrdma_errcode {
ERR_CHUNK = 2
};

-struct rpcrdma_err_vers {
- uint32_t rdma_vers_low; /* Version range supported by peer */
- uint32_t rdma_vers_high;
-};
-
enum rpcrdma_proc {
RDMA_MSG = 0, /* An RPC call or reply msg */
RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index add1f98..c341225 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -795,7 +795,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
- int rdmalen, status;
+ int rdmalen, status, rmerr;
unsigned long cwnd;
u32 credits;

@@ -803,12 +803,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)

if (rep->rr_len == RPCRDMA_BAD_LEN)
goto out_badstatus;
- if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+ if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
goto out_shortreply;

headerp = rdmab_to_msg(rep->rr_rdmabuf);
- if (headerp->rm_vers != rpcrdma_version)
- goto out_badversion;
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
if (rpcrdma_is_bcall(headerp))
goto out_bcall;
@@ -840,6 +838,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
req->rl_reply = rep;
xprt->reestablish_timeout = 0;

+ if (headerp->rm_vers != rpcrdma_version)
+ goto out_badversion;
+
/* check for expected message types */
/* The order of some of these tests is important. */
switch (headerp->rm_type) {
@@ -900,6 +901,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
status = rdmalen;
break;

+ case rdma_error:
+ goto out_rdmaerr;
+
badheader:
default:
dprintk("%s: invalid rpcrdma reply header (type %d):"
@@ -915,6 +919,7 @@ badheader:
break;
}

+out:
/* Invalidate and flush the data payloads before waking the
* waiting application. This guarantees the memory region is
* properly fenced from the server before the application
@@ -957,6 +962,27 @@ out_bcall:
return;
#endif

+out_rdmaerr:
+ rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
+ switch (rmerr) {
+ case ERR_VERS:
+ pr_err("%s: server reports header version error (%u-%u)\n",
+ __func__,
+ be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
+ be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
+ break;
+ case ERR_CHUNK:
+ pr_err("%s: server reports header decoding error\n",
+ __func__);
+ break;
+ default:
+ pr_err("%s: server reports unknown error %d\n",
+ __func__, rmerr);
+ }
+ status = -EREMOTEIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ goto out;
+
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
@@ -964,7 +990,9 @@ out_shortreply:
out_badversion:
dprintk("RPC: %s: invalid version %d\n",
__func__, be32_to_cpu(headerp->rm_vers));
- goto repost;
+ status = -EIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ goto out;

out_nomatch:
spin_unlock_bh(&xprt->transport_lock);


2016-02-12 21:06:56

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 7/8] xprtrdma: Use an anonymous union in struct rpcrdma_mw

Clean up: Make code more readable.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 28 +++++++++++++-------------
net/sunrpc/xprtrdma/frwr_ops.c | 42 ++++++++++++++++++++-------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +-
3 files changed, 36 insertions(+), 36 deletions(-)

diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index c14f3a4..b289e10 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -80,13 +80,13 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
if (!r)
goto out;

- r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
- sizeof(u64), GFP_KERNEL);
- if (!r->r.fmr.physaddrs)
+ r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
+ sizeof(u64), GFP_KERNEL);
+ if (!r->fmr.physaddrs)
goto out_free;

- r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
- if (IS_ERR(r->r.fmr.fmr))
+ r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+ if (IS_ERR(r->fmr.fmr))
goto out_fmr_err;

list_add(&r->mw_list, &buf->rb_mws);
@@ -95,9 +95,9 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
return 0;

out_fmr_err:
- rc = PTR_ERR(r->r.fmr.fmr);
+ rc = PTR_ERR(r->fmr.fmr);
dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc);
- kfree(r->r.fmr.physaddrs);
+ kfree(r->fmr.physaddrs);
out_free:
kfree(r);
out:
@@ -109,7 +109,7 @@ __fmr_unmap(struct rpcrdma_mw *r)
{
LIST_HEAD(l);

- list_add(&r->r.fmr.fmr->list, &l);
+ list_add(&r->fmr.fmr->list, &l);
return ib_unmap_fmr(&l);
}

@@ -148,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
nsegs = RPCRDMA_MAX_FMR_SGES;
for (i = 0; i < nsegs;) {
rpcrdma_map_one(device, seg, direction);
- mw->r.fmr.physaddrs[i] = seg->mr_dma;
+ mw->fmr.physaddrs[i] = seg->mr_dma;
len += seg->mr_len;
++seg;
++i;
@@ -158,13 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
break;
}

- rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
+ rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs,
i, seg1->mr_dma);
if (rc)
goto out_maperr;

seg1->rl_mw = mw;
- seg1->mr_rkey = mw->r.fmr.fmr->rkey;
+ seg1->mr_rkey = mw->fmr.fmr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
seg1->mr_len = len;
@@ -219,7 +219,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
seg = &req->rl_segments[i];
mw = seg->rl_mw;

- list_add(&mw->r.fmr.fmr->list, &unmap_list);
+ list_add(&mw->fmr.fmr->list, &unmap_list);

i += seg->mr_nsegs;
}
@@ -281,9 +281,9 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
- kfree(r->r.fmr.physaddrs);
+ kfree(r->fmr.physaddrs);

- rc = ib_dealloc_fmr(r->r.fmr.fmr);
+ rc = ib_dealloc_fmr(r->fmr.fmr);
if (rc)
dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
__func__, rc);
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index e165673..23382fa 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -109,20 +109,20 @@ static void
__frwr_recovery_worker(struct work_struct *work)
{
struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
- r.frmr.fr_work);
- struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
+ frmr.fr_work);
+ struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
struct ib_pd *pd = r_xprt->rx_ia.ri_pd;

- if (ib_dereg_mr(r->r.frmr.fr_mr))
+ if (ib_dereg_mr(r->frmr.fr_mr))
goto out_fail;

- r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
- if (IS_ERR(r->r.frmr.fr_mr))
+ r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
+ if (IS_ERR(r->frmr.fr_mr))
goto out_fail;

dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
- r->r.frmr.fr_state = FRMR_IS_INVALID;
+ r->frmr.fr_state = FRMR_IS_INVALID;
rpcrdma_put_mw(r_xprt, r);
return;

@@ -137,15 +137,15 @@ out_fail:
static void
__frwr_queue_recovery(struct rpcrdma_mw *r)
{
- INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
- queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
+ INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
+ queue_work(frwr_recovery_wq, &r->frmr.fr_work);
}

static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
unsigned int depth)
{
- struct rpcrdma_frmr *f = &r->r.frmr;
+ struct rpcrdma_frmr *f = &r->frmr;
int rc;

f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
@@ -179,11 +179,11 @@ __frwr_release(struct rpcrdma_mw *r)
{
int rc;

- rc = ib_dereg_mr(r->r.frmr.fr_mr);
+ rc = ib_dereg_mr(r->frmr.fr_mr);
if (rc)
dprintk("RPC: %s: ib_dereg_mr status %i\n",
__func__, rc);
- kfree(r->r.frmr.sg);
+ kfree(r->frmr.sg);
}

static int
@@ -263,14 +263,14 @@ __frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
__func__, r, ib_wc_status_msg(wc->status), wc->status);

- r->r.frmr.fr_state = FRMR_IS_STALE;
+ r->frmr.fr_state = FRMR_IS_STALE;
}

static void
frwr_sendcompletion(struct ib_wc *wc)
{
struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- struct rpcrdma_frmr *f = &r->r.frmr;
+ struct rpcrdma_frmr *f = &r->frmr;

if (unlikely(wc->status != IB_WC_SUCCESS))
__frwr_sendcompletion_flush(wc, r);
@@ -314,7 +314,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all);
r->mw_sendcompletion = frwr_sendcompletion;
- r->r.frmr.fr_xprt = r_xprt;
+ r->frmr.fr_xprt = r_xprt;
}

return 0;
@@ -347,8 +347,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
mw = rpcrdma_get_mw(r_xprt);
if (!mw)
return -ENOMEM;
- } while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
- frmr = &mw->r.frmr;
+ } while (mw->frmr.fr_state != FRMR_IS_INVALID);
+ frmr = &mw->frmr;
frmr->fr_state = FRMR_IS_VALID;
frmr->fr_waiter = false;
mr = frmr->fr_mr;
@@ -434,7 +434,7 @@ static struct ib_send_wr *
__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_mw *mw = seg->rl_mw;
- struct rpcrdma_frmr *f = &mw->r.frmr;
+ struct rpcrdma_frmr *f = &mw->frmr;
struct ib_send_wr *invalidate_wr;

f->fr_waiter = false;
@@ -455,7 +455,7 @@ __frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
{
struct ib_device *device = r_xprt->rx_ia.ri_device;
struct rpcrdma_mw *mw = seg->rl_mw;
- struct rpcrdma_frmr *f = &mw->r.frmr;
+ struct rpcrdma_frmr *f = &mw->frmr;

seg->rl_mw = NULL;

@@ -504,7 +504,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)

i += seg->mr_nsegs;
}
- f = &seg->rl_mw->r.frmr;
+ f = &seg->rl_mw->frmr;

/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
@@ -549,7 +549,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mw *mw = seg1->rl_mw;
- struct rpcrdma_frmr *frmr = &mw->r.frmr;
+ struct rpcrdma_frmr *frmr = &mw->frmr;
struct ib_send_wr *invalidate_wr, *bad_wr;
int rc, nsegs = seg->mr_nsegs;

@@ -557,7 +557,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)

seg1->rl_mw = NULL;
frmr->fr_state = FRMR_IS_INVALID;
- invalidate_wr = &mw->r.frmr.fr_invwr;
+ invalidate_wr = &mw->frmr.fr_invwr;

memset(invalidate_wr, 0, sizeof(*invalidate_wr));
invalidate_wr->wr_id = (uintptr_t)mw;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 7d87cdc..4be4c9b 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -225,7 +225,7 @@ struct rpcrdma_mw {
union {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
- } r;
+ };
void (*mw_sendcompletion)(struct ib_wc *);
struct list_head mw_list;
struct list_head mw_all;


2016-02-12 21:07:01

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v1 8/8] xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs

Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, xprtrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

Because each ib_cqe carries a pointer to a completion method, the
core can now post its own operations on a consumer's QP, and handle
the completions itself, without changes to the consumer.

Send completions were previously handled entirely in the completion
upcall handler. Thus IB_POLL_SOFTIRQ is a direct replacement for the
current xprtrdma send code path.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 99 +++++++++++++++++++++++++----------
net/sunrpc/xprtrdma/verbs.c | 109 ++++++++-------------------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 10 +---
3 files changed, 94 insertions(+), 124 deletions(-)

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 23382fa..3040fda 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -244,39 +244,80 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
}

-/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs
- * to be reset.
+static void
+__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
+ const char *wr)
+{
+ frmr->fr_state = FRMR_IS_STALE;
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("RPC: %s: %s %s (%u, vendor %u)\n",
+ __func__, ib_wc_status_msg(wc->status),
+ wr, wc->status, wc->vendor_err);
+}
+
+/**
+ * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
*
- * WARNING: Only wr_id and status are reliable at this point
+ * FastReg WRs are typically unsignaled, but flushed WRs still need to
+ * be handled.
*/
static void
-__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
+frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
{
- if (likely(wc->status == IB_WC_SUCCESS))
- return;
-
- /* WARNING: Only wr_id and status are reliable at this point */
- r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- if (wc->status == IB_WC_WR_FLUSH_ERR)
- dprintk("RPC: %s: frmr %p flushed\n", __func__, r);
- else
- pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
- __func__, r, ib_wc_status_msg(wc->status), wc->status);
+ struct rpcrdma_frmr *frmr;
+ struct ib_cqe *cqe;

- r->frmr.fr_state = FRMR_IS_STALE;
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS) {
+ cqe = wc->wr_cqe;
+ frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+ __frwr_sendcompletion_flush(wc, frmr, "fastreg");
+ }
}

+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ * LocalInv WRs are typically unsignaled, but flushed WRs still need to
+ * be handled.
+ */
static void
-frwr_sendcompletion(struct ib_wc *wc)
+frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
{
- struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- struct rpcrdma_frmr *f = &r->frmr;
+ struct rpcrdma_frmr *frmr;
+ struct ib_cqe *cqe;

- if (unlikely(wc->status != IB_WC_SUCCESS))
- __frwr_sendcompletion_flush(wc, r);
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS) {
+ cqe = wc->wr_cqe;
+ frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+ __frwr_sendcompletion_flush(wc, frmr, "localinv");
+ }
+}

- if (f->fr_waiter)
- complete(&f->fr_linv_done);
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void
+frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct rpcrdma_frmr *frmr;
+ struct ib_cqe *cqe;
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ cqe = wc->wr_cqe;
+ frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+ if (wc->status != IB_WC_SUCCESS)
+ __frwr_sendcompletion_flush(wc, frmr, "localinv");
+ complete(&frmr->fr_linv_done);
}

static int
@@ -313,7 +354,6 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)

list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all);
- r->mw_sendcompletion = frwr_sendcompletion;
r->frmr.fr_xprt = r_xprt;
}

@@ -350,7 +390,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
} while (mw->frmr.fr_state != FRMR_IS_INVALID);
frmr = &mw->frmr;
frmr->fr_state = FRMR_IS_VALID;
- frmr->fr_waiter = false;
mr = frmr->fr_mr;
reg_wr = &frmr->fr_regwr;

@@ -400,7 +439,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,

reg_wr->wr.next = NULL;
reg_wr->wr.opcode = IB_WR_REG_MR;
- reg_wr->wr.wr_id = (uintptr_t)mw;
+ frmr->fr_cqe.done = frwr_wc_fastreg;
+ reg_wr->wr.wr_cqe = &frmr->fr_cqe;
reg_wr->wr.num_sge = 0;
reg_wr->wr.send_flags = 0;
reg_wr->mr = mr;
@@ -437,12 +477,12 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
struct rpcrdma_frmr *f = &mw->frmr;
struct ib_send_wr *invalidate_wr;

- f->fr_waiter = false;
f->fr_state = FRMR_IS_INVALID;
invalidate_wr = &f->fr_invwr;

memset(invalidate_wr, 0, sizeof(*invalidate_wr));
- invalidate_wr->wr_id = (unsigned long)(void *)mw;
+ f->fr_cqe.done = frwr_wc_localinv;
+ invalidate_wr->wr_cqe = &f->fr_cqe;
invalidate_wr->opcode = IB_WR_LOCAL_INV;
invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;

@@ -511,7 +551,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* are complete.
*/
f->fr_invwr.send_flags = IB_SEND_SIGNALED;
- f->fr_waiter = true;
+ f->fr_cqe.done = frwr_wc_localinv_wake;
init_completion(&f->fr_linv_done);
INIT_CQCOUNT(&r_xprt->rx_ep);

@@ -560,7 +600,8 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
invalidate_wr = &mw->frmr.fr_invwr;

memset(invalidate_wr, 0, sizeof(*invalidate_wr));
- invalidate_wr->wr_id = (uintptr_t)mw;
+ frmr->fr_cqe.done = frwr_wc_localinv;
+ invalidate_wr->wr_cqe = &frmr->fr_cqe;
invalidate_wr->opcode = IB_WR_LOCAL_INV;
invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 53c30e2..2f38388 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -112,73 +112,22 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
}
}

-static void
-rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
-{
- struct rpcrdma_ep *ep = context;
-
- pr_err("RPC: %s: %s on device %s ep %p\n",
- __func__, ib_event_msg(event->event),
- event->device->name, context);
- if (ep->rep_connected == 1) {
- ep->rep_connected = -EIO;
- rpcrdma_conn_func(ep);
- wake_up_all(&ep->rep_connect_wait);
- }
-}
-
-static void
-rpcrdma_sendcq_process_wc(struct ib_wc *wc)
-{
- /* WARNING: Only wr_id and status are reliable at this point */
- if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
- if (wc->status != IB_WC_SUCCESS &&
- wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("RPC: %s: SEND: %s\n",
- __func__, ib_wc_status_msg(wc->status));
- } else {
- struct rpcrdma_mw *r;
-
- r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- r->mw_sendcompletion(wc);
- }
-}
-
-/* The common case is a single send completion is waiting. By
- * passing two WC entries to ib_poll_cq, a return code of 1
- * means there is exactly one WC waiting and no more. We don't
- * have to invoke ib_poll_cq again to know that the CQ has been
- * properly drained.
- */
-static void
-rpcrdma_sendcq_poll(struct ib_cq *cq)
-{
- struct ib_wc *pos, wcs[2];
- int count, rc;
-
- do {
- pos = wcs;
-
- rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
- if (rc < 0)
- break;
-
- count = rc;
- while (count-- > 0)
- rpcrdma_sendcq_process_wc(pos++);
- } while (rc == ARRAY_SIZE(wcs));
- return;
-}
-
-/* Handle provider send completion upcalls.
+/**
+ * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ * Send WRs are typically unsignaled, but flushed WRs still need to
+ * be handled.
*/
static void
-rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
+rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
- do {
- rpcrdma_sendcq_poll(cq);
- } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
- IB_CQ_REPORT_MISSED_EVENTS) > 0);
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("RPC: %s: Send: %s (%u, vendor %u)\n",
+ __func__, ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
}

static void
@@ -269,8 +218,6 @@ rpcrdma_flush_cqs(struct rpcrdma_ep *ep)

while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
rpcrdma_recvcq_process_wc(&wc);
- while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
- rpcrdma_sendcq_process_wc(&wc);
}

static int
@@ -562,9 +509,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct rpcrdma_create_data_internal *cdata)
{
struct ib_cq *sendcq, *recvcq;
- struct ib_cq_init_attr cq_attr = {};
unsigned int max_qp_wr;
- int rc, err;
+ int rc;

if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
dprintk("RPC: %s: insufficient sge's available\n",
@@ -616,9 +562,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);

- cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
- sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
- rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+ sendcq = ib_alloc_cq(ia->ri_device, NULL,
+ ep->rep_attr.cap.max_send_wr + 1,
+ 0, IB_POLL_SOFTIRQ);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -626,13 +572,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out1;
}

- rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
- if (rc) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- goto out2;
- }
-
recvcq = ib_alloc_cq(ia->ri_device, NULL,
ep->rep_attr.cap.max_recv_wr + 1,
0, IB_POLL_SOFTIRQ);
@@ -667,10 +606,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
return 0;

out2:
- err = ib_destroy_cq(sendcq);
- if (err)
- dprintk("RPC: %s: ib_destroy_cq returned %i\n",
- __func__, err);
+ ib_free_cq(sendcq);
out1:
if (ia->ri_dma_mr)
ib_dereg_mr(ia->ri_dma_mr);
@@ -706,11 +642,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
}

ib_free_cq(ep->rep_attr.recv_cq);
-
- rc = ib_destroy_cq(ep->rep_attr.send_cq);
- if (rc)
- dprintk("RPC: %s: ib_destroy_cq returned %i\n",
- __func__, rc);
+ ib_free_cq(ep->rep_attr.send_cq);

if (ia->ri_dma_mr) {
rc = ib_dereg_mr(ia->ri_dma_mr);
@@ -889,6 +821,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
spin_lock(&buffer->rb_reqslock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_reqslock);
+ req->rl_cqe.done = rpcrdma_wc_send;
req->rl_buffer = &r_xprt->rx_buf;
return req;
}
@@ -1252,7 +1185,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
}

send_wr.next = NULL;
- send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
+ send_wr.wr_cqe = &req->rl_cqe;
send_wr.sg_list = iov;
send_wr.num_sge = req->rl_niovs;
send_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 4be4c9b..fcfd4c0 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -95,10 +95,6 @@ struct rpcrdma_ep {
#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)

-/* Force completion handler to ignore the signal
- */
-#define RPCRDMA_IGNORE_COMPLETION (0ULL)
-
/* Pre-allocate extra Work Requests for handling backward receives
* and sends. This is a fixed value because the Work Queues are
* allocated when the forward channel is set up.
@@ -205,11 +201,11 @@ struct rpcrdma_frmr {
struct scatterlist *sg;
int sg_nents;
struct ib_mr *fr_mr;
+ struct ib_cqe fr_cqe;
enum rpcrdma_frmr_state fr_state;
+ struct completion fr_linv_done;
struct work_struct fr_work;
struct rpcrdma_xprt *fr_xprt;
- bool fr_waiter;
- struct completion fr_linv_done;;
union {
struct ib_reg_wr fr_regwr;
struct ib_send_wr fr_invwr;
@@ -226,7 +222,6 @@ struct rpcrdma_mw {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
};
- void (*mw_sendcompletion)(struct ib_wc *);
struct list_head mw_list;
struct list_head mw_all;
};
@@ -283,6 +278,7 @@ struct rpcrdma_req {
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
struct rpc_task *rl_task;

+ struct ib_cqe rl_cqe;
struct list_head rl_all;
bool rl_backchannel;
};


2016-02-15 14:28:32

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 1/8] xprtrdma: Segment head and tail XDR buffers on page boundaries

Looks good.

On Sat, Feb 13, 2016 at 2:36 AM, Chuck Lever <[email protected]> wrote:
> A single memory allocation is used for the pair of buffers wherein
> the RPC client builds an RPC call message and decodes its matching
> reply. These buffers are sized based on the maximum possible size
> of the RPC call and reply messages for the operation in progress.
>
> This means that as the call buffer increases in size, the start of
> the reply buffer is pushed farther into the memory allocation.
>
> RPC requests are growing in size. It used to be that both the call
> and reply buffers fit inside a single page.
>
> But these days, thanks to NFSv4 (and especially security labels in
> NFSv4.2) the maximum call and reply sizes are large. NFSv4.0 OPEN,
> for example, now requires a 6KB allocation for a pair of call and
> reply buffers, and NFSv4 LOOKUP is not far behind.
>
> As the maximum size of a call increases, the reply buffer is pushed
> far enough into the buffer's memory allocation that a page boundary
> can appear in the middle of it.
>
> When the maximum possible reply size is larger than the client's
> RDMA receive buffers (currently 1KB), the client has to register a
> Reply chunk for the server to RDMA Write the reply into.
>
> The logic in rpcrdma_convert_iovs() assumes that xdr_buf head and
> tail buffers would always be contained on a single page. It supplies
> just one segment for the head and one for the tail.
>
> FMR, for example, registers up to a page boundary (only a portion of
> the reply buffer in the OPEN case above). But without additional
> segments, it doesn't register the rest of the buffer.
>
> When the server tries to write the OPEN reply, the RDMA Write fails
> with a remote access error since the client registered only part of
> the Reply chunk.
>
> rpcrdma_convert_iovs() must split the XDR buffer into multiple
> segments, each of which are guaranteed not to contain a page
> boundary. That way fmr_op_map is given the proper number of segments
> to register the whole reply buffer.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/rpc_rdma.c | 42 ++++++++++++++++++++++++++++++----------
> 1 file changed, 32 insertions(+), 10 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index 0f28f2d..add1f98 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -132,6 +132,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
> return tlen;
> }
>
> +/* Split "vec" on page boundaries into segments. FMR registers pages,
> + * not a byte range. Other modes coalesce these segments into a single
> + * MR when they can.
> + */
> +static int
> +rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
> + int n, int nsegs)
> +{
> + size_t page_offset;
> + u32 remaining;
> + char *base;
> +
> + base = vec->iov_base;
> + page_offset = offset_in_page(base);
> + remaining = vec->iov_len;
> + while (remaining && n < nsegs) {
> + seg[n].mr_page = NULL;
> + seg[n].mr_offset = base;
> + seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
> + remaining -= seg[n].mr_len;
> + base += seg[n].mr_len;
> + ++n;
> + page_offset = 0;
> + }
> + return n;
> +}
> +
> /*
> * Chunk assembly from upper layer xdr_buf.
> *
> @@ -150,11 +177,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
> int page_base;
> struct page **ppages;
>
> - if (pos == 0 && xdrbuf->head[0].iov_len) {
> - seg[n].mr_page = NULL;
> - seg[n].mr_offset = xdrbuf->head[0].iov_base;
> - seg[n].mr_len = xdrbuf->head[0].iov_len;
> - ++n;
> + if (pos == 0) {
> + n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
> + if (n == nsegs)
> + return -EIO;
> }
>
> len = xdrbuf->page_len;
> @@ -192,13 +218,9 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
> * xdr pad bytes, saving the server an RDMA operation. */
> if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
> return n;
> + n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
> if (n == nsegs)
> - /* Tail remains, but we're out of segments */
> return -EIO;
> - seg[n].mr_page = NULL;
> - seg[n].mr_offset = xdrbuf->tail[0].iov_base;
> - seg[n].mr_len = xdrbuf->tail[0].iov_len;
> - ++n;
> }
>
> return n;
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-15 14:28:40

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 2/8] xprtrdma: Invalidate memory when a signal is caught

Looks good.

On Sat, Feb 13, 2016 at 2:36 AM, Chuck Lever <[email protected]> wrote:
> When a signal occurs during a synchronous RPC, the RPC scheduler
> causes the RPC task to exit immediately.
>
> However, the RPC transaction is still running on the server, which
> may read arguments from the client or deliver a reply into client
> memory that is still registered.
>
> Ensure that xprt_release() invalidates the signaled RPC's chunks
> and waits for the invalidation to complete in order to fence the
> memory from the server before the client re-allocates it.
>
> Note it should be safe to sleep in this case. An asynchronous
> RPC is never awoken by a signal.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/transport.c | 41 ++++++++++++++++++++++++++++++++-------
> net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
> 2 files changed, 35 insertions(+), 7 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index b1b009f..ceb7140 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -508,6 +508,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
> out:
> dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
> req->rl_connect_cookie = 0; /* our reserved value */
> + req->rl_task = task;
> return req->rl_sendbuf->rg_base;
>
> out_rdmabuf:
> @@ -555,6 +556,37 @@ out_fail:
> return NULL;
> }
>
> +/* Invalidate registered memory still associated with this req.
> + *
> + * Normally, the RPC reply handler invalidates chunks.
> + *
> + * If we're here because a signal fired, this is an exiting
> + * synchronous RPC and the reply handler has not run, leaving
> + * the RPC's chunks registered. The server is still processing
> + * this RPC and can still read or update those chunks.
> + *
> + * Synchronously fence the chunks before this RPC terminates
> + * to ensure the server doesn't write into memory that has
> + * been reallocated.
> + */
> +static void xprt_rdma_free_chunks(struct rpcrdma_xprt *r_xprt,
> + struct rpcrdma_req *req)
> +{
> + unsigned int i;
> +
> + if (!RPC_IS_ASYNC(req->rl_task)) {
> + r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
> + return;
> + }
> +
> + pr_warn("rpcrdma: freeing async RPC task with registered chunks\n");
> + for (i = 0; req->rl_nchunks;) {
> + --req->rl_nchunks;
> + i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
> + &req->rl_segments[i]);
> + }
> +}
> +
> /*
> * This function returns all RDMA resources to the pool.
> */
> @@ -564,7 +596,6 @@ xprt_rdma_free(void *buffer)
> struct rpcrdma_req *req;
> struct rpcrdma_xprt *r_xprt;
> struct rpcrdma_regbuf *rb;
> - int i;
>
> if (buffer == NULL)
> return;
> @@ -578,12 +609,8 @@ xprt_rdma_free(void *buffer)
>
> dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
>
> - for (i = 0; req->rl_nchunks;) {
> - --req->rl_nchunks;
> - i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
> - &req->rl_segments[i]);
> - }
> -
> + if (req->rl_nchunks)
> + xprt_rdma_free_chunks(r_xprt, req);
> rpcrdma_buffer_put(req);
> }
>
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index 38fe11b..bf98c67 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -280,6 +280,7 @@ struct rpcrdma_req {
> struct rpcrdma_regbuf *rl_rdmabuf;
> struct rpcrdma_regbuf *rl_sendbuf;
> struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
> + struct rpc_task *rl_task;
>
> struct list_head rl_all;
> bool rl_backchannel;
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-15 14:28:47

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 3/8] rpcrdma: Add RPCRDMA_HDRLEN_ERR

Looks good.

On Sat, Feb 13, 2016 at 2:36 AM, Chuck Lever <[email protected]> wrote:
> Error headers are shorter than either RDMA_MSG or RDMA_NOMSG.
>
> Since HDRLEN_MIN is already used in several other places that would
> be annoying to change, add RPCRDMA_HDRLEN_ERR for the one or two
> spots where the shorter length is needed.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/rpc_rdma.h | 1 +
> 1 file changed, 1 insertion(+)
>
> diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
> index f33c5a4..8c6d23c 100644
> --- a/include/linux/sunrpc/rpc_rdma.h
> +++ b/include/linux/sunrpc/rpc_rdma.h
> @@ -102,6 +102,7 @@ struct rpcrdma_msg {
> * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
> */
> #define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7)
> +#define RPCRDMA_HDRLEN_ERR (sizeof(__be32) * 5)
>
> enum rpcrdma_errcode {
> ERR_VERS = 1,
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-15 14:29:26

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 4/8] xprtrdma: Properly handle RDMA_ERROR replies

Hi Chuck

On Sat, Feb 13, 2016 at 2:36 AM, Chuck Lever <[email protected]> wrote:
> These are shorter than RPCRDMA_HDRLEN_MIN, and they need to
> complete the waiting RPC.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/rpc_rdma.h | 11 ++++++-----
> net/sunrpc/xprtrdma/rpc_rdma.c | 38 +++++++++++++++++++++++++++++++++-----
> 2 files changed, 39 insertions(+), 10 deletions(-)
>
> diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
> index 8c6d23c..3b1ff38 100644
> --- a/include/linux/sunrpc/rpc_rdma.h
> +++ b/include/linux/sunrpc/rpc_rdma.h
> @@ -93,6 +93,12 @@ struct rpcrdma_msg {
> __be32 rm_pempty[3]; /* 3 empty chunk lists */
> } rm_padded;
>
> + struct {
> + __be32 rm_err;
> + __be32 rm_vers_low;
> + __be32 rm_vers_high;
> + } rm_error;
> +
> __be32 rm_chunks[0]; /* read, write and reply chunks */
>
> } rm_body;
> @@ -109,11 +115,6 @@ enum rpcrdma_errcode {
> ERR_CHUNK = 2
> };
>
> -struct rpcrdma_err_vers {
> - uint32_t rdma_vers_low; /* Version range supported by peer */
> - uint32_t rdma_vers_high;
> -};
> -
> enum rpcrdma_proc {
> RDMA_MSG = 0, /* An RPC call or reply msg */
> RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index add1f98..c341225 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -795,7 +795,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
> struct rpc_xprt *xprt = &r_xprt->rx_xprt;
> __be32 *iptr;
> - int rdmalen, status;
> + int rdmalen, status, rmerr;
> unsigned long cwnd;
> u32 credits;
>
> @@ -803,12 +803,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>
> if (rep->rr_len == RPCRDMA_BAD_LEN)
> goto out_badstatus;
> - if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
> + if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
> goto out_shortreply;
>
> headerp = rdmab_to_msg(rep->rr_rdmabuf);
> - if (headerp->rm_vers != rpcrdma_version)
> - goto out_badversion;
> #if defined(CONFIG_SUNRPC_BACKCHANNEL)
> if (rpcrdma_is_bcall(headerp))
> goto out_bcall;
> @@ -840,6 +838,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> req->rl_reply = rep;
> xprt->reestablish_timeout = 0;
>
> + if (headerp->rm_vers != rpcrdma_version)
> + goto out_badversion;
> +
> /* check for expected message types */
> /* The order of some of these tests is important. */
> switch (headerp->rm_type) {
> @@ -900,6 +901,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> status = rdmalen;
> break;
>
> + case rdma_error:
> + goto out_rdmaerr;
> +
> badheader:
> default:
> dprintk("%s: invalid rpcrdma reply header (type %d):"
> @@ -915,6 +919,7 @@ badheader:
> break;
> }
>
> +out:
> /* Invalidate and flush the data payloads before waking the
> * waiting application. This guarantees the memory region is
> * properly fenced from the server before the application
> @@ -957,6 +962,27 @@ out_bcall:
> return;
> #endif
>
> +out_rdmaerr:
> + rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
> + switch (rmerr) {
> + case ERR_VERS:
> + pr_err("%s: server reports header version error (%u-%u)\n",
> + __func__,
> + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
> + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
> + break;
> + case ERR_CHUNK:
> + pr_err("%s: server reports header decoding error\n",
> + __func__);
> + break;
> + default:
> + pr_err("%s: server reports unknown error %d\n",
> + __func__, rmerr);
> + }
> + status = -EREMOTEIO;
> + r_xprt->rx_stats.bad_reply_count++;

May be put above switch-case and surrounding statements in a function?

> + goto out;
> +
> out_shortreply:
> dprintk("RPC: %s: short/invalid reply\n", __func__);
> goto repost;
> @@ -964,7 +990,9 @@ out_shortreply:
> out_badversion:
> dprintk("RPC: %s: invalid version %d\n",
> __func__, be32_to_cpu(headerp->rm_vers));
> - goto repost;
> + status = -EIO;
> + r_xprt->rx_stats.bad_reply_count++;
> + goto out;
>
> out_nomatch:
> spin_unlock_bh(&xprt->transport_lock);
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-15 14:30:13

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 5/8] xprtrdma: Serialize credit accounting again

On Sat, Feb 13, 2016 at 2:36 AM, Chuck Lever <[email protected]> wrote:
> Commit fe97b47cd623 ("xprtrdma: Use workqueue to process RPC/RDMA
> replies") replaced the reply tasklet with a workqueue that allows
> RPC replies to be processed in parallel. Thus the credit values in
> RPC-over-RDMA replies can applied in a different order than in
> which the server sent them.
>
> To fix this, revert commit eba8ff660b2d ("xprtrdma: Move credit
> update to RPC reply handler"). Done by hand to accommodate code
> changes that have occurred since then.
>
> Fixes: fe97b47cd623 ("xprtrdma: Use workqueue to process . . .")
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/rpc_rdma.c | 9 +--------
> net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++++++++++++++-
> net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
> 3 files changed, 28 insertions(+), 9 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index c341225..0c45288 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -797,7 +797,6 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> __be32 *iptr;
> int rdmalen, status, rmerr;
> unsigned long cwnd;
> - u32 credits;
>
> dprintk("RPC: %s: incoming rep %p\n", __func__, rep);

You may also want to remove the extra header len checks from here.
Header len validity is already checked
in rpcrdma_update_granted_credits() function call before scheduling wq.

>
> @@ -930,15 +929,9 @@ out:
> if (req->rl_nchunks)
> r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
>
> - credits = be32_to_cpu(headerp->rm_credit);
> - if (credits == 0)
> - credits = 1; /* don't deadlock */
> - else if (credits > r_xprt->rx_buf.rb_max_requests)
> - credits = r_xprt->rx_buf.rb_max_requests;
> -
> spin_lock_bh(&xprt->transport_lock);
> cwnd = xprt->cwnd;
> - xprt->cwnd = credits << RPC_CWNDSHIFT;
> + xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
> if (xprt->cwnd > cwnd)
> xprt_release_rqst_cong(rqst->rq_task);
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 878f1bf..fc1ef5f 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -190,6 +190,28 @@ rpcrdma_receive_worker(struct work_struct *work)
> rpcrdma_reply_handler(rep);
> }
>
> +/* Perform basic sanity checking to avoid using garbage
> + * to update the credit grant value.
> + */
> +static void
> +rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
> +{
> + struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
> + struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
> + u32 credits;
> +
> + if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
> + return;
> +
> + credits = be32_to_cpu(rmsgp->rm_credit);
> + if (credits == 0)
> + credits = 1; /* don't deadlock */
> + else if (credits > buffer->rb_max_requests)
> + credits = buffer->rb_max_requests;
> +
> + atomic_set(&buffer->rb_credits, credits);
> +}
> +
> static void
> rpcrdma_recvcq_process_wc(struct ib_wc *wc)
> {
> @@ -211,7 +233,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
> ib_dma_sync_single_for_cpu(rep->rr_device,
> rdmab_addr(rep->rr_rdmabuf),
> rep->rr_len, DMA_FROM_DEVICE);
> - prefetch(rdmab_to_msg(rep->rr_rdmabuf));

do you really want to remove prefetch()?

> +
> + rpcrdma_update_granted_credits(rep);
>
> out_schedule:
> queue_work(rpcrdma_receive_wq, &rep->rr_work);
> @@ -330,6 +353,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
> connected:
> dprintk("RPC: %s: %sconnected\n",
> __func__, connstate > 0 ? "" : "dis");
> + atomic_set(&xprt->rx_buf.rb_credits, 1);
> ep->rep_connected = connstate;
> rpcrdma_conn_func(ep);
> wake_up_all(&ep->rep_connect_wait);
> @@ -943,6 +967,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> buf->rb_max_requests = r_xprt->rx_data.max_requests;
> buf->rb_bc_srv_max_requests = 0;
> spin_lock_init(&buf->rb_lock);
> + atomic_set(&buf->rb_credits, 1);

Will this give a slow start to server initially? should it be rb_max_requests?
I am not sure, just raising a flag to bring your notice.

>
> rc = ia->ri_ops->ro_init(r_xprt);
> if (rc)
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index bf98c67..efd6fa7 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -312,6 +312,7 @@ struct rpcrdma_buffer {
> struct list_head rb_send_bufs;
> struct list_head rb_recv_bufs;
> u32 rb_max_requests;
> + atomic_t rb_credits; /* most recent credit grant */
>
> u32 rb_bc_srv_max_requests;
> spinlock_t rb_reqslock; /* protect rb_allreqs */
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-15 14:30:36

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 6/8] xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs

On Sat, Feb 13, 2016 at 2:36 AM, Chuck Lever <[email protected]> wrote:
> Calling ib_poll_cq() to sort through WCs during a completion is a
> common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
> ("IB: add a proper completion queue abstraction"), WC sorting can
> be handled by the IB core.
>
> By converting to this new API, xprtrdma is made a better neighbor to
> other RDMA consumers, as it allows the core to schedule the delivery
> of completions more fairly amongst all active consumers.
>
> Because each ib_cqe carries a pointer to a completion method, the
> core can now post its own operations on a consumer's QP, and handle
> the completions itself, without changes to the consumer.
>
> xprtrdma's receive processing is already handled in a worker thread,
> but there is some initial order-dependent processing that is done
> in the soft IRQ context before the worker thread is scheduled.
> IB_POLL_SOFTIRQ is a direct replacement for the current xprtrdma
> receive code path.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/verbs.c | 68 ++++++++++-----------------------------
> net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
> 2 files changed, 19 insertions(+), 50 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index fc1ef5f..53c30e2 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -215,8 +215,9 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
> static void
> rpcrdma_recvcq_process_wc(struct ib_wc *wc)
> {
> - struct rpcrdma_rep *rep =
> - (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
> + struct ib_cqe *cqe = wc->wr_cqe;
> + struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
> + rr_cqe);
>
> /* WARNING: Only wr_id and status are reliable at this point */
> if (wc->status != IB_WC_SUCCESS)
> @@ -242,46 +243,23 @@ out_schedule:
>
> out_fail:
> if (wc->status != IB_WC_WR_FLUSH_ERR)
> - pr_err("RPC: %s: rep %p: %s\n",
> - __func__, rep, ib_wc_status_msg(wc->status));
> + pr_err("RPC: %s: Recv: %s (%u, vendor %u)\n",
> + __func__, ib_wc_status_msg(wc->status),
> + wc->status, wc->vendor_err);
> rep->rr_len = RPCRDMA_BAD_LEN;
> goto out_schedule;
> }
>
> -/* The wc array is on stack: automatic memory is always CPU-local.
> +/**
> + * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
> + * @cq: completion queue (ignored)
> + * @wc: completed WR
> *
> - * struct ib_wc is 64 bytes, making the poll array potentially
> - * large. But this is at the bottom of the call chain. Further
> - * substantial work is done in another thread.
> - */
> -static void
> -rpcrdma_recvcq_poll(struct ib_cq *cq)
> -{
> - struct ib_wc *pos, wcs[4];
> - int count, rc;
> -
> - do {
> - pos = wcs;
> -
> - rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
> - if (rc < 0)
> - break;
> -
> - count = rc;
> - while (count-- > 0)
> - rpcrdma_recvcq_process_wc(pos++);
> - } while (rc == ARRAY_SIZE(wcs));
> -}
> -
> -/* Handle provider receive completion upcalls.
> */
> static void
> -rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
> +rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)

May be we can get rid of rpcrdma_receive_wc() and directly use
rpcrdma_recvcq_process_wc()?

> {
> - do {
> - rpcrdma_recvcq_poll(cq);
> - } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
> - IB_CQ_REPORT_MISSED_EVENTS) > 0);
> + rpcrdma_recvcq_process_wc(wc);
> }
>
> static void
> @@ -655,9 +633,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
> goto out2;
> }
>
> - cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
> - recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
> - rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
> + recvcq = ib_alloc_cq(ia->ri_device, NULL,
> + ep->rep_attr.cap.max_recv_wr + 1,
> + 0, IB_POLL_SOFTIRQ);
> if (IS_ERR(recvcq)) {
> rc = PTR_ERR(recvcq);
> dprintk("RPC: %s: failed to create recv CQ: %i\n",
> @@ -665,14 +643,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
> goto out2;
> }
>
> - rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
> - if (rc) {
> - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
> - __func__, rc);
> - ib_destroy_cq(recvcq);
> - goto out2;
> - }
> -
> ep->rep_attr.send_cq = sendcq;
> ep->rep_attr.recv_cq = recvcq;
>
> @@ -735,10 +705,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
> ia->ri_id->qp = NULL;
> }
>
> - rc = ib_destroy_cq(ep->rep_attr.recv_cq);
> - if (rc)
> - dprintk("RPC: %s: ib_destroy_cq returned %i\n",
> - __func__, rc);
> + ib_free_cq(ep->rep_attr.recv_cq);
>
> rc = ib_destroy_cq(ep->rep_attr.send_cq);
> if (rc)
> @@ -947,6 +914,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
> }
>
> rep->rr_device = ia->ri_device;
> + rep->rr_cqe.done = rpcrdma_receive_wc;
> rep->rr_rxprt = r_xprt;
> INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
> return rep;
> @@ -1322,7 +1290,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
> int rc;
>
> recv_wr.next = NULL;
> - recv_wr.wr_id = (u64) (unsigned long) rep;
> + recv_wr.wr_cqe = &rep->rr_cqe;
> recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
> recv_wr.num_sge = 1;
>
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index efd6fa7..7d87cdc 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -171,6 +171,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
> struct rpcrdma_buffer;
>
> struct rpcrdma_rep {
> + struct ib_cqe rr_cqe;
> unsigned int rr_len;
> struct ib_device *rr_device;
> struct rpcrdma_xprt *rr_rxprt;
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-15 14:30:42

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 7/8] xprtrdma: Use an anonymous union in struct rpcrdma_mw

Looks good.

On Sat, Feb 13, 2016 at 2:36 AM, Chuck Lever <[email protected]> wrote:
> Clean up: Make code more readable.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/fmr_ops.c | 28 +++++++++++++-------------
> net/sunrpc/xprtrdma/frwr_ops.c | 42 ++++++++++++++++++++-------------------
> net/sunrpc/xprtrdma/xprt_rdma.h | 2 +-
> 3 files changed, 36 insertions(+), 36 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
> index c14f3a4..b289e10 100644
> --- a/net/sunrpc/xprtrdma/fmr_ops.c
> +++ b/net/sunrpc/xprtrdma/fmr_ops.c
> @@ -80,13 +80,13 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
> if (!r)
> goto out;
>
> - r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
> - sizeof(u64), GFP_KERNEL);
> - if (!r->r.fmr.physaddrs)
> + r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
> + sizeof(u64), GFP_KERNEL);
> + if (!r->fmr.physaddrs)
> goto out_free;
>
> - r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
> - if (IS_ERR(r->r.fmr.fmr))
> + r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
> + if (IS_ERR(r->fmr.fmr))
> goto out_fmr_err;
>
> list_add(&r->mw_list, &buf->rb_mws);
> @@ -95,9 +95,9 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
> return 0;
>
> out_fmr_err:
> - rc = PTR_ERR(r->r.fmr.fmr);
> + rc = PTR_ERR(r->fmr.fmr);
> dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc);
> - kfree(r->r.fmr.physaddrs);
> + kfree(r->fmr.physaddrs);
> out_free:
> kfree(r);
> out:
> @@ -109,7 +109,7 @@ __fmr_unmap(struct rpcrdma_mw *r)
> {
> LIST_HEAD(l);
>
> - list_add(&r->r.fmr.fmr->list, &l);
> + list_add(&r->fmr.fmr->list, &l);
> return ib_unmap_fmr(&l);
> }
>
> @@ -148,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
> nsegs = RPCRDMA_MAX_FMR_SGES;
> for (i = 0; i < nsegs;) {
> rpcrdma_map_one(device, seg, direction);
> - mw->r.fmr.physaddrs[i] = seg->mr_dma;
> + mw->fmr.physaddrs[i] = seg->mr_dma;
> len += seg->mr_len;
> ++seg;
> ++i;
> @@ -158,13 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
> break;
> }
>
> - rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
> + rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs,
> i, seg1->mr_dma);
> if (rc)
> goto out_maperr;
>
> seg1->rl_mw = mw;
> - seg1->mr_rkey = mw->r.fmr.fmr->rkey;
> + seg1->mr_rkey = mw->fmr.fmr->rkey;
> seg1->mr_base = seg1->mr_dma + pageoff;
> seg1->mr_nsegs = i;
> seg1->mr_len = len;
> @@ -219,7 +219,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
> seg = &req->rl_segments[i];
> mw = seg->rl_mw;
>
> - list_add(&mw->r.fmr.fmr->list, &unmap_list);
> + list_add(&mw->fmr.fmr->list, &unmap_list);
>
> i += seg->mr_nsegs;
> }
> @@ -281,9 +281,9 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
> while (!list_empty(&buf->rb_all)) {
> r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
> list_del(&r->mw_all);
> - kfree(r->r.fmr.physaddrs);
> + kfree(r->fmr.physaddrs);
>
> - rc = ib_dealloc_fmr(r->r.fmr.fmr);
> + rc = ib_dealloc_fmr(r->fmr.fmr);
> if (rc)
> dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
> __func__, rc);
> diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
> index e165673..23382fa 100644
> --- a/net/sunrpc/xprtrdma/frwr_ops.c
> +++ b/net/sunrpc/xprtrdma/frwr_ops.c
> @@ -109,20 +109,20 @@ static void
> __frwr_recovery_worker(struct work_struct *work)
> {
> struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
> - r.frmr.fr_work);
> - struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
> + frmr.fr_work);
> + struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
> unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
> struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
>
> - if (ib_dereg_mr(r->r.frmr.fr_mr))
> + if (ib_dereg_mr(r->frmr.fr_mr))
> goto out_fail;
>
> - r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
> - if (IS_ERR(r->r.frmr.fr_mr))
> + r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
> + if (IS_ERR(r->frmr.fr_mr))
> goto out_fail;
>
> dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
> - r->r.frmr.fr_state = FRMR_IS_INVALID;
> + r->frmr.fr_state = FRMR_IS_INVALID;
> rpcrdma_put_mw(r_xprt, r);
> return;
>
> @@ -137,15 +137,15 @@ out_fail:
> static void
> __frwr_queue_recovery(struct rpcrdma_mw *r)
> {
> - INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
> - queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
> + INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
> + queue_work(frwr_recovery_wq, &r->frmr.fr_work);
> }
>
> static int
> __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
> unsigned int depth)
> {
> - struct rpcrdma_frmr *f = &r->r.frmr;
> + struct rpcrdma_frmr *f = &r->frmr;
> int rc;
>
> f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
> @@ -179,11 +179,11 @@ __frwr_release(struct rpcrdma_mw *r)
> {
> int rc;
>
> - rc = ib_dereg_mr(r->r.frmr.fr_mr);
> + rc = ib_dereg_mr(r->frmr.fr_mr);
> if (rc)
> dprintk("RPC: %s: ib_dereg_mr status %i\n",
> __func__, rc);
> - kfree(r->r.frmr.sg);
> + kfree(r->frmr.sg);
> }
>
> static int
> @@ -263,14 +263,14 @@ __frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
> pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
> __func__, r, ib_wc_status_msg(wc->status), wc->status);
>
> - r->r.frmr.fr_state = FRMR_IS_STALE;
> + r->frmr.fr_state = FRMR_IS_STALE;
> }
>
> static void
> frwr_sendcompletion(struct ib_wc *wc)
> {
> struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
> - struct rpcrdma_frmr *f = &r->r.frmr;
> + struct rpcrdma_frmr *f = &r->frmr;
>
> if (unlikely(wc->status != IB_WC_SUCCESS))
> __frwr_sendcompletion_flush(wc, r);
> @@ -314,7 +314,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
> list_add(&r->mw_list, &buf->rb_mws);
> list_add(&r->mw_all, &buf->rb_all);
> r->mw_sendcompletion = frwr_sendcompletion;
> - r->r.frmr.fr_xprt = r_xprt;
> + r->frmr.fr_xprt = r_xprt;
> }
>
> return 0;
> @@ -347,8 +347,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
> mw = rpcrdma_get_mw(r_xprt);
> if (!mw)
> return -ENOMEM;
> - } while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
> - frmr = &mw->r.frmr;
> + } while (mw->frmr.fr_state != FRMR_IS_INVALID);
> + frmr = &mw->frmr;
> frmr->fr_state = FRMR_IS_VALID;
> frmr->fr_waiter = false;
> mr = frmr->fr_mr;
> @@ -434,7 +434,7 @@ static struct ib_send_wr *
> __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
> {
> struct rpcrdma_mw *mw = seg->rl_mw;
> - struct rpcrdma_frmr *f = &mw->r.frmr;
> + struct rpcrdma_frmr *f = &mw->frmr;
> struct ib_send_wr *invalidate_wr;
>
> f->fr_waiter = false;
> @@ -455,7 +455,7 @@ __frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
> {
> struct ib_device *device = r_xprt->rx_ia.ri_device;
> struct rpcrdma_mw *mw = seg->rl_mw;
> - struct rpcrdma_frmr *f = &mw->r.frmr;
> + struct rpcrdma_frmr *f = &mw->frmr;
>
> seg->rl_mw = NULL;
>
> @@ -504,7 +504,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
>
> i += seg->mr_nsegs;
> }
> - f = &seg->rl_mw->r.frmr;
> + f = &seg->rl_mw->frmr;
>
> /* Strong send queue ordering guarantees that when the
> * last WR in the chain completes, all WRs in the chain
> @@ -549,7 +549,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
> struct rpcrdma_mr_seg *seg1 = seg;
> struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> struct rpcrdma_mw *mw = seg1->rl_mw;
> - struct rpcrdma_frmr *frmr = &mw->r.frmr;
> + struct rpcrdma_frmr *frmr = &mw->frmr;
> struct ib_send_wr *invalidate_wr, *bad_wr;
> int rc, nsegs = seg->mr_nsegs;
>
> @@ -557,7 +557,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
>
> seg1->rl_mw = NULL;
> frmr->fr_state = FRMR_IS_INVALID;
> - invalidate_wr = &mw->r.frmr.fr_invwr;
> + invalidate_wr = &mw->frmr.fr_invwr;
>
> memset(invalidate_wr, 0, sizeof(*invalidate_wr));
> invalidate_wr->wr_id = (uintptr_t)mw;
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index 7d87cdc..4be4c9b 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -225,7 +225,7 @@ struct rpcrdma_mw {
> union {
> struct rpcrdma_fmr fmr;
> struct rpcrdma_frmr frmr;
> - } r;
> + };
> void (*mw_sendcompletion)(struct ib_wc *);
> struct list_head mw_list;
> struct list_head mw_all;
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-15 14:31:46

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 0/8] NFS/RDMA client patches for v4.6

I shell test this series with ocrdma and update the testing status.

Series Reviewed-by: Devesh Sharma <[email protected]>

On Sat, Feb 13, 2016 at 2:35 AM, Chuck Lever <[email protected]> wrote:
> There continues to be some fallout from enabling NFSv4.1/RDMA, and
> from converting the reply handler to use a work queue. This series
> includes some bug fixes for those issues.
>
> Logic to handle the RPC-over-RDMA RDMA_ERROR message type is also
> introduced into the RPC reply handler.
>
> Also included is a patch set to convert xprtrdma to use the new core
> CQ API.
>
> Available in the "nfs-rdma-for-4.6" topic branch of this git repo:
>
> git://git.linux-nfs.org/projects/cel/cel-2.6.git
>
> Or for browsing:
>
> http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfs-rdma-for-4.6
>
>
> Chuck Lever (8):
> xprtrdma: Segment head and tail XDR buffers on page boundaries
> xprtrdma: Invalidate memory when a signal is caught
> rpcrdma: Add RPCRDMA_HDRLEN_ERR
> xprtrdma: Properly handle RDMA_ERROR replies
> xprtrdma: Serialize credit accounting again
> xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs
> xprtrdma: Use an anonymous union in struct rpcrdma_mw
> xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs
>
>
> include/linux/sunrpc/rpc_rdma.h | 12 +-
> net/sunrpc/xprtrdma/fmr_ops.c | 28 +++---
> net/sunrpc/xprtrdma/frwr_ops.c | 137 ++++++++++++++++++---------
> net/sunrpc/xprtrdma/rpc_rdma.c | 89 +++++++++++++-----
> net/sunrpc/xprtrdma/transport.c | 41 +++++++-
> net/sunrpc/xprtrdma/verbs.c | 198 ++++++++++++---------------------------
> net/sunrpc/xprtrdma/xprt_rdma.h | 15 +--
> 7 files changed, 279 insertions(+), 241 deletions(-)
>
> --
> Chuck Lever
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

2016-02-15 15:00:20

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH v1 5/8] xprtrdma: Serialize credit accounting again


> On Feb 15, 2016, at 9:29 AM, Devesh Sharma <[email protected]> wrote:
>
> On Sat, Feb 13, 2016 at 2:36 AM, Chuck Lever <[email protected]> wrote:
>> Commit fe97b47cd623 ("xprtrdma: Use workqueue to process RPC/RDMA
>> replies") replaced the reply tasklet with a workqueue that allows
>> RPC replies to be processed in parallel. Thus the credit values in
>> RPC-over-RDMA replies can applied in a different order than in
>> which the server sent them.
>>
>> To fix this, revert commit eba8ff660b2d ("xprtrdma: Move credit
>> update to RPC reply handler"). Done by hand to accommodate code
>> changes that have occurred since then.
>>
>> Fixes: fe97b47cd623 ("xprtrdma: Use workqueue to process . . .")
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> net/sunrpc/xprtrdma/rpc_rdma.c | 9 +--------
>> net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++++++++++++++-
>> net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
>> 3 files changed, 28 insertions(+), 9 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
>> index c341225..0c45288 100644
>> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
>> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
>> @@ -797,7 +797,6 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>> __be32 *iptr;
>> int rdmalen, status, rmerr;
>> unsigned long cwnd;
>> - u32 credits;
>>
>> dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
>
> You may also want to remove the extra header len checks from here.
> Header len validity is already checked
> in rpcrdma_update_granted_credits() function call before scheduling wq.

Actually we need to repost a receive buffer for these
error cases, and it doesn't look like this is done
consistently in the current logic.


>> @@ -930,15 +929,9 @@ out:
>> if (req->rl_nchunks)
>> r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
>>
>> - credits = be32_to_cpu(headerp->rm_credit);
>> - if (credits == 0)
>> - credits = 1; /* don't deadlock */
>> - else if (credits > r_xprt->rx_buf.rb_max_requests)
>> - credits = r_xprt->rx_buf.rb_max_requests;
>> -
>> spin_lock_bh(&xprt->transport_lock);
>> cwnd = xprt->cwnd;
>> - xprt->cwnd = credits << RPC_CWNDSHIFT;
>> + xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
>> if (xprt->cwnd > cwnd)
>> xprt_release_rqst_cong(rqst->rq_task);
>>
>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>> index 878f1bf..fc1ef5f 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -190,6 +190,28 @@ rpcrdma_receive_worker(struct work_struct *work)
>> rpcrdma_reply_handler(rep);
>> }
>>
>> +/* Perform basic sanity checking to avoid using garbage
>> + * to update the credit grant value.
>> + */
>> +static void
>> +rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
>> +{
>> + struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
>> + struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
>> + u32 credits;
>> +
>> + if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
>> + return;
>> +
>> + credits = be32_to_cpu(rmsgp->rm_credit);
>> + if (credits == 0)
>> + credits = 1; /* don't deadlock */
>> + else if (credits > buffer->rb_max_requests)
>> + credits = buffer->rb_max_requests;
>> +
>> + atomic_set(&buffer->rb_credits, credits);
>> +}
>> +
>> static void
>> rpcrdma_recvcq_process_wc(struct ib_wc *wc)
>> {
>> @@ -211,7 +233,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
>> ib_dma_sync_single_for_cpu(rep->rr_device,
>> rdmab_addr(rep->rr_rdmabuf),
>> rep->rr_len, DMA_FROM_DEVICE);
>> - prefetch(rdmab_to_msg(rep->rr_rdmabuf));
>
> do you really want to remove prefetch()?

Yes. Parsing the credits field in the header amounts to
the same thing, the header content is pulled into the
CPU cache.


>> +
>> + rpcrdma_update_granted_credits(rep);
>>
>> out_schedule:
>> queue_work(rpcrdma_receive_wq, &rep->rr_work);
>> @@ -330,6 +353,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
>> connected:
>> dprintk("RPC: %s: %sconnected\n",
>> __func__, connstate > 0 ? "" : "dis");
>> + atomic_set(&xprt->rx_buf.rb_credits, 1);
>> ep->rep_connected = connstate;
>> rpcrdma_conn_func(ep);
>> wake_up_all(&ep->rep_connect_wait);
>> @@ -943,6 +967,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>> buf->rb_max_requests = r_xprt->rx_data.max_requests;
>> buf->rb_bc_srv_max_requests = 0;
>> spin_lock_init(&buf->rb_lock);
>> + atomic_set(&buf->rb_credits, 1);
>
> Will this give a slow start to server initially? should it be rb_max_requests?
> I am not sure, just raising a flag to bring your notice.

Starting at 1 is required by RFC 5666.

It's not slow start. The first server reply should contain
a large credit value, which takes effect as soon as the
receive WC is processed.


>> rc = ia->ri_ops->ro_init(r_xprt);
>> if (rc)
>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>> index bf98c67..efd6fa7 100644
>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>> @@ -312,6 +312,7 @@ struct rpcrdma_buffer {
>> struct list_head rb_send_bufs;
>> struct list_head rb_recv_bufs;
>> u32 rb_max_requests;
>> + atomic_t rb_credits; /* most recent credit grant */
>>
>> u32 rb_bc_srv_max_requests;
>> spinlock_t rb_reqslock; /* protect rb_allreqs */
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever





2016-02-16 05:16:15

by Devesh Sharma

[permalink] [raw]
Subject: Re: [PATCH v1 5/8] xprtrdma: Serialize credit accounting again

On Mon, Feb 15, 2016 at 8:30 PM, Chuck Lever <[email protected]> wrote:
>
>> On Feb 15, 2016, at 9:29 AM, Devesh Sharma <[email protected]> wrote:
>>
>> On Sat, Feb 13, 2016 at 2:36 AM, Chuck Lever <[email protected]> wrote:
>>> Commit fe97b47cd623 ("xprtrdma: Use workqueue to process RPC/RDMA
>>> replies") replaced the reply tasklet with a workqueue that allows
>>> RPC replies to be processed in parallel. Thus the credit values in
>>> RPC-over-RDMA replies can applied in a different order than in
>>> which the server sent them.
>>>
>>> To fix this, revert commit eba8ff660b2d ("xprtrdma: Move credit
>>> update to RPC reply handler"). Done by hand to accommodate code
>>> changes that have occurred since then.
>>>
>>> Fixes: fe97b47cd623 ("xprtrdma: Use workqueue to process . . .")
>>> Signed-off-by: Chuck Lever <[email protected]>
>>> ---
>>> net/sunrpc/xprtrdma/rpc_rdma.c | 9 +--------
>>> net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++++++++++++++-
>>> net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
>>> 3 files changed, 28 insertions(+), 9 deletions(-)
>>>
>>> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
>>> index c341225..0c45288 100644
>>> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
>>> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
>>> @@ -797,7 +797,6 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>>> __be32 *iptr;
>>> int rdmalen, status, rmerr;
>>> unsigned long cwnd;
>>> - u32 credits;
>>>
>>> dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
>>
>> You may also want to remove the extra header len checks from here.
>> Header len validity is already checked
>> in rpcrdma_update_granted_credits() function call before scheduling wq.
>
> Actually we need to repost a receive buffer for these
> error cases, and it doesn't look like this is done
> consistently in the current logic.

Okay, so this needs a fix.

>
>
>>> @@ -930,15 +929,9 @@ out:
>>> if (req->rl_nchunks)
>>> r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
>>>
>>> - credits = be32_to_cpu(headerp->rm_credit);
>>> - if (credits == 0)
>>> - credits = 1; /* don't deadlock */
>>> - else if (credits > r_xprt->rx_buf.rb_max_requests)
>>> - credits = r_xprt->rx_buf.rb_max_requests;
>>> -
>>> spin_lock_bh(&xprt->transport_lock);
>>> cwnd = xprt->cwnd;
>>> - xprt->cwnd = credits << RPC_CWNDSHIFT;
>>> + xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
>>> if (xprt->cwnd > cwnd)
>>> xprt_release_rqst_cong(rqst->rq_task);
>>>
>>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>>> index 878f1bf..fc1ef5f 100644
>>> --- a/net/sunrpc/xprtrdma/verbs.c
>>> +++ b/net/sunrpc/xprtrdma/verbs.c
>>> @@ -190,6 +190,28 @@ rpcrdma_receive_worker(struct work_struct *work)
>>> rpcrdma_reply_handler(rep);
>>> }
>>>
>>> +/* Perform basic sanity checking to avoid using garbage
>>> + * to update the credit grant value.
>>> + */
>>> +static void
>>> +rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
>>> +{
>>> + struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
>>> + struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
>>> + u32 credits;
>>> +
>>> + if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
>>> + return;
>>> +
>>> + credits = be32_to_cpu(rmsgp->rm_credit);
>>> + if (credits == 0)
>>> + credits = 1; /* don't deadlock */
>>> + else if (credits > buffer->rb_max_requests)
>>> + credits = buffer->rb_max_requests;
>>> +
>>> + atomic_set(&buffer->rb_credits, credits);
>>> +}
>>> +
>>> static void
>>> rpcrdma_recvcq_process_wc(struct ib_wc *wc)
>>> {
>>> @@ -211,7 +233,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
>>> ib_dma_sync_single_for_cpu(rep->rr_device,
>>> rdmab_addr(rep->rr_rdmabuf),
>>> rep->rr_len, DMA_FROM_DEVICE);
>>> - prefetch(rdmab_to_msg(rep->rr_rdmabuf));
>>
>> do you really want to remove prefetch()?
>
> Yes. Parsing the credits field in the header amounts to
> the same thing, the header content is pulled into the
> CPU cache.

Okay got it.

>
>
>>> +
>>> + rpcrdma_update_granted_credits(rep);
>>>
>>> out_schedule:
>>> queue_work(rpcrdma_receive_wq, &rep->rr_work);
>>> @@ -330,6 +353,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
>>> connected:
>>> dprintk("RPC: %s: %sconnected\n",
>>> __func__, connstate > 0 ? "" : "dis");
>>> + atomic_set(&xprt->rx_buf.rb_credits, 1);
>>> ep->rep_connected = connstate;
>>> rpcrdma_conn_func(ep);
>>> wake_up_all(&ep->rep_connect_wait);
>>> @@ -943,6 +967,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>>> buf->rb_max_requests = r_xprt->rx_data.max_requests;
>>> buf->rb_bc_srv_max_requests = 0;
>>> spin_lock_init(&buf->rb_lock);
>>> + atomic_set(&buf->rb_credits, 1);
>>
>> Will this give a slow start to server initially? should it be rb_max_requests?
>> I am not sure, just raising a flag to bring your notice.
>
> Starting at 1 is required by RFC 5666.
>
> It's not slow start. The first server reply should contain
> a large credit value, which takes effect as soon as the
> receive WC is processed.

Okay got it

>
>
>>> rc = ia->ri_ops->ro_init(r_xprt);
>>> if (rc)
>>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>>> index bf98c67..efd6fa7 100644
>>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>>> @@ -312,6 +312,7 @@ struct rpcrdma_buffer {
>>> struct list_head rb_send_bufs;
>>> struct list_head rb_recv_bufs;
>>> u32 rb_max_requests;
>>> + atomic_t rb_credits; /* most recent credit grant */
>>>
>>> u32 rb_bc_srv_max_requests;
>>> spinlock_t rb_reqslock; /* protect rb_allreqs */
>>>
>>> --
>>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>>> the body of a message to [email protected]
>>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
> --
> Chuck Lever
>
>
>
>

2016-02-17 21:19:25

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v1 4/8] xprtrdma: Properly handle RDMA_ERROR replies

Hi Chuck,

Can you combine this patch with 3/8, that way we're using RPCRDMA_HDRLEN_ERR in the same patch it's introduced? It's only a one line difference :)

Thanks,
Anna

On 02/12/2016 04:06 PM, Chuck Lever wrote:
> These are shorter than RPCRDMA_HDRLEN_MIN, and they need to
> complete the waiting RPC.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/rpc_rdma.h | 11 ++++++-----
> net/sunrpc/xprtrdma/rpc_rdma.c | 38 +++++++++++++++++++++++++++++++++-----
> 2 files changed, 39 insertions(+), 10 deletions(-)
>
> diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
> index 8c6d23c..3b1ff38 100644
> --- a/include/linux/sunrpc/rpc_rdma.h
> +++ b/include/linux/sunrpc/rpc_rdma.h
> @@ -93,6 +93,12 @@ struct rpcrdma_msg {
> __be32 rm_pempty[3]; /* 3 empty chunk lists */
> } rm_padded;
>
> + struct {
> + __be32 rm_err;
> + __be32 rm_vers_low;
> + __be32 rm_vers_high;
> + } rm_error;
> +
> __be32 rm_chunks[0]; /* read, write and reply chunks */
>
> } rm_body;
> @@ -109,11 +115,6 @@ enum rpcrdma_errcode {
> ERR_CHUNK = 2
> };
>
> -struct rpcrdma_err_vers {
> - uint32_t rdma_vers_low; /* Version range supported by peer */
> - uint32_t rdma_vers_high;
> -};
> -
> enum rpcrdma_proc {
> RDMA_MSG = 0, /* An RPC call or reply msg */
> RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index add1f98..c341225 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -795,7 +795,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
> struct rpc_xprt *xprt = &r_xprt->rx_xprt;
> __be32 *iptr;
> - int rdmalen, status;
> + int rdmalen, status, rmerr;
> unsigned long cwnd;
> u32 credits;
>
> @@ -803,12 +803,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>
> if (rep->rr_len == RPCRDMA_BAD_LEN)
> goto out_badstatus;
> - if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
> + if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
> goto out_shortreply;
>
> headerp = rdmab_to_msg(rep->rr_rdmabuf);
> - if (headerp->rm_vers != rpcrdma_version)
> - goto out_badversion;
> #if defined(CONFIG_SUNRPC_BACKCHANNEL)
> if (rpcrdma_is_bcall(headerp))
> goto out_bcall;
> @@ -840,6 +838,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> req->rl_reply = rep;
> xprt->reestablish_timeout = 0;
>
> + if (headerp->rm_vers != rpcrdma_version)
> + goto out_badversion;
> +
> /* check for expected message types */
> /* The order of some of these tests is important. */
> switch (headerp->rm_type) {
> @@ -900,6 +901,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> status = rdmalen;
> break;
>
> + case rdma_error:
> + goto out_rdmaerr;
> +
> badheader:
> default:
> dprintk("%s: invalid rpcrdma reply header (type %d):"
> @@ -915,6 +919,7 @@ badheader:
> break;
> }
>
> +out:
> /* Invalidate and flush the data payloads before waking the
> * waiting application. This guarantees the memory region is
> * properly fenced from the server before the application
> @@ -957,6 +962,27 @@ out_bcall:
> return;
> #endif
>
> +out_rdmaerr:
> + rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
> + switch (rmerr) {
> + case ERR_VERS:
> + pr_err("%s: server reports header version error (%u-%u)\n",
> + __func__,
> + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
> + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
> + break;
> + case ERR_CHUNK:
> + pr_err("%s: server reports header decoding error\n",
> + __func__);
> + break;
> + default:
> + pr_err("%s: server reports unknown error %d\n",
> + __func__, rmerr);
> + }
> + status = -EREMOTEIO;
> + r_xprt->rx_stats.bad_reply_count++;
> + goto out;
> +
> out_shortreply:
> dprintk("RPC: %s: short/invalid reply\n", __func__);
> goto repost;
> @@ -964,7 +990,9 @@ out_shortreply:
> out_badversion:
> dprintk("RPC: %s: invalid version %d\n",
> __func__, be32_to_cpu(headerp->rm_vers));
> - goto repost;
> + status = -EIO;
> + r_xprt->rx_stats.bad_reply_count++;
> + goto out;
>
> out_nomatch:
> spin_unlock_bh(&xprt->transport_lock);
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>


2016-02-17 21:21:18

by Chuck Lever

[permalink] [raw]
Subject: Re: [PATCH v1 4/8] xprtrdma: Properly handle RDMA_ERROR replies


> On Feb 17, 2016, at 4:19 PM, Anna Schumaker <[email protected]> wrote:
>
> Hi Chuck,
>
> Can you combine this patch with 3/8, that way we're using RPCRDMA_HDRLEN_ERR in the same patch it's introduced? It's only a one line difference :)

I have to submit 3/8 also in the server series, to prevent
merge conflicts, and only RPCRDMA_HDRLEN_ERR is needed there.

So 3/8 has to remain separate.


> Thanks,
> Anna
>
> On 02/12/2016 04:06 PM, Chuck Lever wrote:
>> These are shorter than RPCRDMA_HDRLEN_MIN, and they need to
>> complete the waiting RPC.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> include/linux/sunrpc/rpc_rdma.h | 11 ++++++-----
>> net/sunrpc/xprtrdma/rpc_rdma.c | 38 +++++++++++++++++++++++++++++++++-----
>> 2 files changed, 39 insertions(+), 10 deletions(-)
>>
>> diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
>> index 8c6d23c..3b1ff38 100644
>> --- a/include/linux/sunrpc/rpc_rdma.h
>> +++ b/include/linux/sunrpc/rpc_rdma.h
>> @@ -93,6 +93,12 @@ struct rpcrdma_msg {
>> __be32 rm_pempty[3]; /* 3 empty chunk lists */
>> } rm_padded;
>>
>> + struct {
>> + __be32 rm_err;
>> + __be32 rm_vers_low;
>> + __be32 rm_vers_high;
>> + } rm_error;
>> +
>> __be32 rm_chunks[0]; /* read, write and reply chunks */
>>
>> } rm_body;
>> @@ -109,11 +115,6 @@ enum rpcrdma_errcode {
>> ERR_CHUNK = 2
>> };
>>
>> -struct rpcrdma_err_vers {
>> - uint32_t rdma_vers_low; /* Version range supported by peer */
>> - uint32_t rdma_vers_high;
>> -};
>> -
>> enum rpcrdma_proc {
>> RDMA_MSG = 0, /* An RPC call or reply msg */
>> RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
>> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
>> index add1f98..c341225 100644
>> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
>> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
>> @@ -795,7 +795,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>> struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
>> struct rpc_xprt *xprt = &r_xprt->rx_xprt;
>> __be32 *iptr;
>> - int rdmalen, status;
>> + int rdmalen, status, rmerr;
>> unsigned long cwnd;
>> u32 credits;
>>
>> @@ -803,12 +803,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>>
>> if (rep->rr_len == RPCRDMA_BAD_LEN)
>> goto out_badstatus;
>> - if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
>> + if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
>> goto out_shortreply;
>>
>> headerp = rdmab_to_msg(rep->rr_rdmabuf);
>> - if (headerp->rm_vers != rpcrdma_version)
>> - goto out_badversion;
>> #if defined(CONFIG_SUNRPC_BACKCHANNEL)
>> if (rpcrdma_is_bcall(headerp))
>> goto out_bcall;
>> @@ -840,6 +838,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>> req->rl_reply = rep;
>> xprt->reestablish_timeout = 0;
>>
>> + if (headerp->rm_vers != rpcrdma_version)
>> + goto out_badversion;
>> +
>> /* check for expected message types */
>> /* The order of some of these tests is important. */
>> switch (headerp->rm_type) {
>> @@ -900,6 +901,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>> status = rdmalen;
>> break;
>>
>> + case rdma_error:
>> + goto out_rdmaerr;
>> +
>> badheader:
>> default:
>> dprintk("%s: invalid rpcrdma reply header (type %d):"
>> @@ -915,6 +919,7 @@ badheader:
>> break;
>> }
>>
>> +out:
>> /* Invalidate and flush the data payloads before waking the
>> * waiting application. This guarantees the memory region is
>> * properly fenced from the server before the application
>> @@ -957,6 +962,27 @@ out_bcall:
>> return;
>> #endif
>>
>> +out_rdmaerr:
>> + rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
>> + switch (rmerr) {
>> + case ERR_VERS:
>> + pr_err("%s: server reports header version error (%u-%u)\n",
>> + __func__,
>> + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
>> + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
>> + break;
>> + case ERR_CHUNK:
>> + pr_err("%s: server reports header decoding error\n",
>> + __func__);
>> + break;
>> + default:
>> + pr_err("%s: server reports unknown error %d\n",
>> + __func__, rmerr);
>> + }
>> + status = -EREMOTEIO;
>> + r_xprt->rx_stats.bad_reply_count++;
>> + goto out;
>> +
>> out_shortreply:
>> dprintk("RPC: %s: short/invalid reply\n", __func__);
>> goto repost;
>> @@ -964,7 +990,9 @@ out_shortreply:
>> out_badversion:
>> dprintk("RPC: %s: invalid version %d\n",
>> __func__, be32_to_cpu(headerp->rm_vers));
>> - goto repost;
>> + status = -EIO;
>> + r_xprt->rx_stats.bad_reply_count++;
>> + goto out;
>>
>> out_nomatch:
>> spin_unlock_bh(&xprt->transport_lock);
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
>

--
Chuck Lever





2016-02-17 21:24:07

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v1 4/8] xprtrdma: Properly handle RDMA_ERROR replies

On 02/17/2016 04:21 PM, Chuck Lever wrote:
>
>> On Feb 17, 2016, at 4:19 PM, Anna Schumaker <[email protected]> wrote:
>>
>> Hi Chuck,
>>
>> Can you combine this patch with 3/8, that way we're using RPCRDMA_HDRLEN_ERR in the same patch it's introduced? It's only a one line difference :)
>
> I have to submit 3/8 also in the server series, to prevent
> merge conflicts, and only RPCRDMA_HDRLEN_ERR is needed there.
>
> So 3/8 has to remain separate.

Got it. Thanks for the explanation!

Anna
>
>
>> Thanks,
>> Anna
>>
>> On 02/12/2016 04:06 PM, Chuck Lever wrote:
>>> These are shorter than RPCRDMA_HDRLEN_MIN, and they need to
>>> complete the waiting RPC.
>>>
>>> Signed-off-by: Chuck Lever <[email protected]>
>>> ---
>>> include/linux/sunrpc/rpc_rdma.h | 11 ++++++-----
>>> net/sunrpc/xprtrdma/rpc_rdma.c | 38 +++++++++++++++++++++++++++++++++-----
>>> 2 files changed, 39 insertions(+), 10 deletions(-)
>>>
>>> diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
>>> index 8c6d23c..3b1ff38 100644
>>> --- a/include/linux/sunrpc/rpc_rdma.h
>>> +++ b/include/linux/sunrpc/rpc_rdma.h
>>> @@ -93,6 +93,12 @@ struct rpcrdma_msg {
>>> __be32 rm_pempty[3]; /* 3 empty chunk lists */
>>> } rm_padded;
>>>
>>> + struct {
>>> + __be32 rm_err;
>>> + __be32 rm_vers_low;
>>> + __be32 rm_vers_high;
>>> + } rm_error;
>>> +
>>> __be32 rm_chunks[0]; /* read, write and reply chunks */
>>>
>>> } rm_body;
>>> @@ -109,11 +115,6 @@ enum rpcrdma_errcode {
>>> ERR_CHUNK = 2
>>> };
>>>
>>> -struct rpcrdma_err_vers {
>>> - uint32_t rdma_vers_low; /* Version range supported by peer */
>>> - uint32_t rdma_vers_high;
>>> -};
>>> -
>>> enum rpcrdma_proc {
>>> RDMA_MSG = 0, /* An RPC call or reply msg */
>>> RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
>>> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
>>> index add1f98..c341225 100644
>>> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
>>> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
>>> @@ -795,7 +795,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>>> struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
>>> struct rpc_xprt *xprt = &r_xprt->rx_xprt;
>>> __be32 *iptr;
>>> - int rdmalen, status;
>>> + int rdmalen, status, rmerr;
>>> unsigned long cwnd;
>>> u32 credits;
>>>
>>> @@ -803,12 +803,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>>>
>>> if (rep->rr_len == RPCRDMA_BAD_LEN)
>>> goto out_badstatus;
>>> - if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
>>> + if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
>>> goto out_shortreply;
>>>
>>> headerp = rdmab_to_msg(rep->rr_rdmabuf);
>>> - if (headerp->rm_vers != rpcrdma_version)
>>> - goto out_badversion;
>>> #if defined(CONFIG_SUNRPC_BACKCHANNEL)
>>> if (rpcrdma_is_bcall(headerp))
>>> goto out_bcall;
>>> @@ -840,6 +838,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>>> req->rl_reply = rep;
>>> xprt->reestablish_timeout = 0;
>>>
>>> + if (headerp->rm_vers != rpcrdma_version)
>>> + goto out_badversion;
>>> +
>>> /* check for expected message types */
>>> /* The order of some of these tests is important. */
>>> switch (headerp->rm_type) {
>>> @@ -900,6 +901,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>>> status = rdmalen;
>>> break;
>>>
>>> + case rdma_error:
>>> + goto out_rdmaerr;
>>> +
>>> badheader:
>>> default:
>>> dprintk("%s: invalid rpcrdma reply header (type %d):"
>>> @@ -915,6 +919,7 @@ badheader:
>>> break;
>>> }
>>>
>>> +out:
>>> /* Invalidate and flush the data payloads before waking the
>>> * waiting application. This guarantees the memory region is
>>> * properly fenced from the server before the application
>>> @@ -957,6 +962,27 @@ out_bcall:
>>> return;
>>> #endif
>>>
>>> +out_rdmaerr:
>>> + rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
>>> + switch (rmerr) {
>>> + case ERR_VERS:
>>> + pr_err("%s: server reports header version error (%u-%u)\n",
>>> + __func__,
>>> + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
>>> + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
>>> + break;
>>> + case ERR_CHUNK:
>>> + pr_err("%s: server reports header decoding error\n",
>>> + __func__);
>>> + break;
>>> + default:
>>> + pr_err("%s: server reports unknown error %d\n",
>>> + __func__, rmerr);
>>> + }
>>> + status = -EREMOTEIO;
>>> + r_xprt->rx_stats.bad_reply_count++;
>>> + goto out;
>>> +
>>> out_shortreply:
>>> dprintk("RPC: %s: short/invalid reply\n", __func__);
>>> goto repost;
>>> @@ -964,7 +990,9 @@ out_shortreply:
>>> out_badversion:
>>> dprintk("RPC: %s: invalid version %d\n",
>>> __func__, be32_to_cpu(headerp->rm_vers));
>>> - goto repost;
>>> + status = -EIO;
>>> + r_xprt->rx_stats.bad_reply_count++;
>>> + goto out;
>>>
>>> out_nomatch:
>>> spin_unlock_bh(&xprt->transport_lock);
>>>
>>> --
>>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>>> the body of a message to [email protected]
>>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>>
>>
>
> --
> Chuck Lever
>
>
>
>