2014-07-09 16:56:32

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 00/21] NFS/RDMA client patches for 3.17

The main purpose of this series is to address connection drop
recovery issues by fixing FRMR re-use to make it less likely the
client will deadlock due to a memory management operation error.

Some clean-ups and other fixes are present as well.

See topic branch nfs-rdma-for-3.17 in

git://git.linux-nfs.org/projects/cel/cel-2.6.git

I tested with NFSv3 and NFSv4 on all three supported memory
registration modes. Used cthon04, iozone, and dbench with both
Solaris and Linux NFS/RDMA servers. Used xfstests with Linux.

v2:
Many patches from v1 have been written or replaced.

The MW ref counting approach in v1 is abandoned. Instead, I've
eliminated signaling FAST_REG_MR and LOCAL_INV, and added
appropriate recovery mechanisms after a transport reconnect that
should prevent rkey dis-synchrony entirely.

A couple of optimizations have been added, including:

- Allocating each MW separately rather than carving each out of a
large piece of contiguous memory

- Now that the receive CQ upcall handler dequeues a bundle of CQEs
at once, fire off the reply handler tasklet just once per upcall
to reduce context switches and how often hard IRQs are disabled

Jury is still out on the latter.

---

Chuck Lever (21):
xprtrdma: Fix panic in rpcrdma_register_frmr_external()
xprtrdma: Protect ia->ri_id when unmapping/invalidating MRs
xprtrdma: Limit data payload size for ALLPHYSICAL
xprtrdma: Update rkeys after transport reconnect
xprtrdma: On disconnect, don't ignore pending CQEs
xprtrdma: Don't invalidate FRMRs if registration fails
xprtrdma: Unclutter struct rpcrdma_mr_seg
xprtrdma: Back off rkey when FAST_REG_MR fails
xprtrdma: Chain together all MWs in same buffer pool
xprtrdma: Properly handle exhaustion of the rb_mws list
xprtrdma: Reset FRMRs when FAST_REG_MR is flushed by a disconnect
xprtrdma: Reset FRMRs after a flushed LOCAL_INV Work Request
xprtrdma: Don't post a LOCAL_INV in rpcrdma_register_frmr_external()
xprtrdma: Disable completions for FAST_REG_MR Work Requests
xprtrdma: Disable completions for LOCAL_INV Work Requests
xprtrdma: Rename frmr_wr
xprtrdma: Allocate each struct rpcrdma_mw separately
xprtrdma: Schedule reply tasklet once per upcall
xprtrdma: Make rpcrdma_ep_disconnect() return void
xprtrdma: Remove RPCRDMA_PERSISTENT_REGISTRATION macro
xprtrdma: Handle additional connection events


include/linux/sunrpc/xprtrdma.h | 2
net/sunrpc/xprtrdma/rpc_rdma.c | 83 ++--
net/sunrpc/xprtrdma/transport.c | 17 +
net/sunrpc/xprtrdma/verbs.c | 758 +++++++++++++++++++++++++++------------
net/sunrpc/xprtrdma/xprt_rdma.h | 61 +++
5 files changed, 618 insertions(+), 303 deletions(-)

--
Chuck Lever


2014-07-09 16:58:57

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 17/21] xprtrdma: Allocate each struct rpcrdma_mw separately

Currently rpcrdma_buffer_create() allocates struct rpcrdma_mw's as
a single contiguous area of memory. It amounts to quite a bit of
memory, and there's no requirement for these to be carved from a
single piece of contiguous memory.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 242 +++++++++++++++++++++++++------------------
1 file changed, 143 insertions(+), 99 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index d0a893d..7c49a55 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1005,9 +1005,91 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
return rc;
}

-/*
- * Initialize buffer memory
- */
+static int
+rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
+{
+ int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
+ struct ib_fmr_attr fmr_attr = {
+ .max_pages = RPCRDMA_MAX_DATA_SEGS,
+ .max_maps = 1,
+ .page_shift = PAGE_SHIFT
+ };
+ struct rpcrdma_mw *r;
+ int i, rc;
+
+ i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
+ dprintk("RPC: %s: initalizing %d FMRs\n", __func__, i);
+
+ while (i--) {
+ r = kzalloc(sizeof(*r), GFP_KERNEL);
+ if (r == NULL)
+ return -ENOMEM;
+
+ r->r.fmr = ib_alloc_fmr(ia->ri_pd, mr_access_flags, &fmr_attr);
+ if (IS_ERR(r->r.fmr)) {
+ rc = PTR_ERR(r->r.fmr);
+ dprintk("RPC: %s: ib_alloc_fmr failed %i\n",
+ __func__, rc);
+ goto out_free;
+ }
+
+ list_add(&r->mw_list, &buf->rb_mws);
+ list_add(&r->mw_all, &buf->rb_all);
+ }
+ return 0;
+
+out_free:
+ kfree(r);
+ return rc;
+}
+
+static int
+rpcrdma_init_frmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_frmr *f;
+ struct rpcrdma_mw *r;
+ int i, rc;
+
+ i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
+ dprintk("RPC: %s: initalizing %d FRMRs\n", __func__, i);
+
+ while (i--) {
+ r = kzalloc(sizeof(*r), GFP_KERNEL);
+ if (r == NULL)
+ return -ENOMEM;
+ f = &r->r.frmr;
+
+ f->fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
+ ia->ri_max_frmr_depth);
+ if (IS_ERR(f->fr_mr)) {
+ rc = PTR_ERR(f->fr_mr);
+ dprintk("RPC: %s: ib_alloc_fast_reg_mr "
+ "failed %i\n", __func__, rc);
+ goto out_free;
+ }
+
+ f->fr_pgl = ib_alloc_fast_reg_page_list(ia->ri_id->device,
+ ia->ri_max_frmr_depth);
+ if (IS_ERR(f->fr_pgl)) {
+ rc = PTR_ERR(f->fr_pgl);
+ dprintk("RPC: %s: ib_alloc_fast_reg_page_list "
+ "failed %i\n", __func__, rc);
+
+ ib_dereg_mr(f->fr_mr);
+ goto out_free;
+ }
+
+ list_add(&r->mw_list, &buf->rb_mws);
+ list_add(&r->mw_all, &buf->rb_all);
+ }
+
+ return 0;
+
+out_free:
+ kfree(r);
+ return rc;
+}
+
int
rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
@@ -1015,7 +1097,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
char *p;
size_t len, rlen, wlen;
int i, rc;
- struct rpcrdma_mw *r;

buf->rb_max_requests = cdata->max_requests;
spin_lock_init(&buf->rb_lock);
@@ -1026,28 +1107,12 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
* 2. arrays of struct rpcrdma_req to fill in pointers
* 3. array of struct rpcrdma_rep for replies
* 4. padding, if any
- * 5. mw's, fmr's or frmr's, if any
* Send/recv buffers in req/rep need to be registered
*/
-
len = buf->rb_max_requests *
(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
len += cdata->padding;
- switch (ia->ri_memreg_strategy) {
- case RPCRDMA_FRMR:
- len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
- sizeof(struct rpcrdma_mw);
- break;
- case RPCRDMA_MTHCAFMR:
- /* TBD we are perhaps overallocating here */
- len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
- sizeof(struct rpcrdma_mw);
- break;
- default:
- break;
- }

- /* allocate 1, 4 and 5 in one shot */
p = kzalloc(len, GFP_KERNEL);
if (p == NULL) {
dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
@@ -1075,53 +1140,16 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,

INIT_LIST_HEAD(&buf->rb_mws);
INIT_LIST_HEAD(&buf->rb_all);
- r = (struct rpcrdma_mw *)p;
switch (ia->ri_memreg_strategy) {
case RPCRDMA_FRMR:
- for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
- r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
- ia->ri_max_frmr_depth);
- if (IS_ERR(r->r.frmr.fr_mr)) {
- rc = PTR_ERR(r->r.frmr.fr_mr);
- dprintk("RPC: %s: ib_alloc_fast_reg_mr"
- " failed %i\n", __func__, rc);
- goto out;
- }
- r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
- ia->ri_id->device,
- ia->ri_max_frmr_depth);
- if (IS_ERR(r->r.frmr.fr_pgl)) {
- rc = PTR_ERR(r->r.frmr.fr_pgl);
- dprintk("RPC: %s: "
- "ib_alloc_fast_reg_page_list "
- "failed %i\n", __func__, rc);
-
- ib_dereg_mr(r->r.frmr.fr_mr);
- goto out;
- }
- list_add(&r->mw_all, &buf->rb_all);
- list_add(&r->mw_list, &buf->rb_mws);
- ++r;
- }
+ rc = rpcrdma_init_frmrs(ia, buf);
+ if (rc)
+ goto out;
break;
case RPCRDMA_MTHCAFMR:
- /* TBD we are perhaps overallocating here */
- for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
- static struct ib_fmr_attr fa =
- { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
- r->r.fmr = ib_alloc_fmr(ia->ri_pd,
- IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
- &fa);
- if (IS_ERR(r->r.fmr)) {
- rc = PTR_ERR(r->r.fmr);
- dprintk("RPC: %s: ib_alloc_fmr"
- " failed %i\n", __func__, rc);
- goto out;
- }
- list_add(&r->mw_all, &buf->rb_all);
- list_add(&r->mw_list, &buf->rb_mws);
- ++r;
- }
+ rc = rpcrdma_init_fmrs(ia, buf);
+ if (rc)
+ goto out;
break;
default:
break;
@@ -1189,24 +1217,57 @@ out:
return rc;
}

-/*
- * Unregister and destroy buffer memory. Need to deal with
- * partial initialization, so it's callable from failed create.
- * Must be called before destroying endpoint, as registrations
- * reference it.
- */
+static void
+rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_mw *r;
+ int rc;
+
+ while (!list_empty(&buf->rb_all)) {
+ r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
+ list_del(&r->mw_all);
+ list_del(&r->mw_list);
+
+ rc = ib_dealloc_fmr(r->r.fmr);
+ if (rc)
+ dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
+ __func__, rc);
+
+ kfree(r);
+ }
+}
+
+static void
+rpcrdma_destroy_frmrs(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_mw *r;
+ int rc;
+
+ while (!list_empty(&buf->rb_all)) {
+ r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
+ list_del(&r->mw_all);
+ list_del(&r->mw_list);
+
+ rc = ib_dereg_mr(r->r.frmr.fr_mr);
+ if (rc)
+ dprintk("RPC: %s: ib_dereg_mr failed %i\n",
+ __func__, rc);
+ ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
+
+ kfree(r);
+ }
+}
+
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
- int rc, i;
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
- struct rpcrdma_mw *r;
+ int i;

/* clean up in reverse order from create
* 1. recv mr memory (mr free, then kfree)
* 2. send mr memory (mr free, then kfree)
- * 3. padding (if any) [moved to rpcrdma_ep_destroy]
- * 4. arrays
+ * 3. MWs
*/
dprintk("RPC: %s: entering\n", __func__);

@@ -1225,32 +1286,15 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
}
}

- while (!list_empty(&buf->rb_mws)) {
- r = list_entry(buf->rb_mws.next,
- struct rpcrdma_mw, mw_list);
- list_del(&r->mw_all);
- list_del(&r->mw_list);
- switch (ia->ri_memreg_strategy) {
- case RPCRDMA_FRMR:
- rc = ib_dereg_mr(r->r.frmr.fr_mr);
- if (rc)
- dprintk("RPC: %s:"
- " ib_dereg_mr"
- " failed %i\n",
- __func__, rc);
- ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
- break;
- case RPCRDMA_MTHCAFMR:
- rc = ib_dealloc_fmr(r->r.fmr);
- if (rc)
- dprintk("RPC: %s:"
- " ib_dealloc_fmr"
- " failed %i\n",
- __func__, rc);
- break;
- default:
- break;
- }
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_FRMR:
+ rpcrdma_destroy_frmrs(buf);
+ break;
+ case RPCRDMA_MTHCAFMR:
+ rpcrdma_destroy_fmrs(buf);
+ break;
+ default:
+ break;
}

kfree(buf->rb_pool);


2014-07-09 16:59:08

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 18/21] xprtrdma: Schedule reply tasklet once per upcall

Minor optimization: grab rpcrdma_tk_lock_g and disable hard IRQs
just once after clearing the receive completion queue.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 31 +++++++++++++++----------------
1 file changed, 15 insertions(+), 16 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 7c49a55..a8a86a0 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -105,17 +105,6 @@ rpcrdma_run_tasklet(unsigned long data)

static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);

-static inline void
-rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
-{
- unsigned long flags;
-
- spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
- list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
- spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
- tasklet_schedule(&rpcrdma_tasklet_g);
-}
-
static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
@@ -214,7 +203,7 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
}

static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc)
+rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
{
struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -245,28 +234,38 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
}

out_schedule:
- rpcrdma_schedule_tasklet(rep);
+ list_add_tail(&rep->rr_list, sched_list);
}

static int
rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
{
+ struct list_head sched_list;
struct ib_wc *wcs;
int budget, count, rc;
+ unsigned long flags;

+ INIT_LIST_HEAD(&sched_list);
budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
do {
wcs = ep->rep_recv_wcs;

rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
if (rc <= 0)
- return rc;
+ goto out_schedule;

count = rc;
while (count-- > 0)
- rpcrdma_recvcq_process_wc(wcs++);
+ rpcrdma_recvcq_process_wc(wcs++, &sched_list);
} while (rc == RPCRDMA_POLLSIZE && --budget);
- return 0;
+ rc = 0;
+
+out_schedule:
+ spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+ list_splice_tail(&sched_list, &rpcrdma_tasklets_g);
+ spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+ tasklet_schedule(&rpcrdma_tasklet_g);
+ return rc;
}

/*


2014-07-09 16:57:24

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 06/21] xprtrdma: Don't invalidate FRMRs if registration fails

If FRMR registration fails, it's likely to transition the QP to the
error state. Or, registration may have failed because the QP is
_already_ in ERROR.

Thus calling rpcrdma_deregister_external() in
rpcrdma_create_chunks() is useless in FRMR mode: the LOCAL_INVs just
get flushed.

It is safe to leave existing registrations: when FRMR registration
is tried again, rpcrdma_register_frmr_external() checks if each FRMR
is already/still VALID, and knocks it down first if it is.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 54422f7..6166c98 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -271,9 +271,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
return (unsigned char *)iptr - (unsigned char *)headerp;

out:
- for (pos = 0; nchunks--;)
- pos += rpcrdma_deregister_external(
- &req->rl_segments[pos], r_xprt);
+ if (r_xprt->rx_ia.ri_memreg_strategy != RPCRDMA_FRMR) {
+ for (pos = 0; nchunks--;)
+ pos += rpcrdma_deregister_external(
+ &req->rl_segments[pos], r_xprt);
+ }
return n;
}



2014-07-09 16:57:07

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 04/21] xprtrdma: Update rkeys after transport reconnect

Various reports of:

rpcrdma_qp_async_error_upcall: QP error 3 on device mlx4_0
ep ffff8800bfd3e848

Ensure that rkeys in already-marshalled RPC/RDMA headers are
refreshed after the QP has been replaced by a reconnect.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=249
Suggested-by: Selvin Xavier <[email protected]>
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 75 ++++++++++++++++++++-------------------
net/sunrpc/xprtrdma/transport.c | 11 +++---
net/sunrpc/xprtrdma/xprt_rdma.h | 10 +++++
3 files changed, 54 insertions(+), 42 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 693966d..54422f7 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -53,14 +53,6 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif

-enum rpcrdma_chunktype {
- rpcrdma_noch = 0,
- rpcrdma_readch,
- rpcrdma_areadch,
- rpcrdma_writech,
- rpcrdma_replych
-};
-
#ifdef RPC_DEBUG
static const char transfertypes[][12] = {
"pure inline", /* no chunks */
@@ -286,6 +278,28 @@ out:
}

/*
+ * Marshal chunks. This routine returns the header length
+ * consumed by marshaling.
+ *
+ * Returns positive RPC/RDMA header size, or negative errno.
+ */
+
+ssize_t
+rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
+{
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base;
+
+ if (req->rl_rtype != rpcrdma_noch)
+ result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
+ headerp, req->rl_rtype);
+ else if (req->rl_wtype != rpcrdma_noch)
+ result = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
+ headerp, req->rl_wtype);
+ return result;
+}
+
+/*
* Copy write data inline.
* This function is used for "small" requests. Data which is passed
* to RPC via iovecs (or page list) is copied directly into the
@@ -377,7 +391,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
char *base;
size_t rpclen, padlen;
ssize_t hdrlen;
- enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;

/*
@@ -415,13 +428,13 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* into pages; otherwise use reply chunks.
*/
if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
- wtype = rpcrdma_noch;
+ req->rl_wtype = rpcrdma_noch;
else if (rqst->rq_rcv_buf.page_len == 0)
- wtype = rpcrdma_replych;
+ req->rl_wtype = rpcrdma_replych;
else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
- wtype = rpcrdma_writech;
+ req->rl_wtype = rpcrdma_writech;
else
- wtype = rpcrdma_replych;
+ req->rl_wtype = rpcrdma_replych;

/*
* Chunks needed for arguments?
@@ -438,16 +451,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* TBD check NFSv4 setacl
*/
if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
- rtype = rpcrdma_noch;
+ req->rl_rtype = rpcrdma_noch;
else if (rqst->rq_snd_buf.page_len == 0)
- rtype = rpcrdma_areadch;
+ req->rl_rtype = rpcrdma_areadch;
else
- rtype = rpcrdma_readch;
+ req->rl_rtype = rpcrdma_readch;

/* The following simplification is not true forever */
- if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
- wtype = rpcrdma_noch;
- if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
+ if (req->rl_rtype != rpcrdma_noch && req->rl_wtype == rpcrdma_replych)
+ req->rl_wtype = rpcrdma_noch;
+ if (req->rl_rtype != rpcrdma_noch && req->rl_wtype != rpcrdma_noch) {
dprintk("RPC: %s: cannot marshal multiple chunk lists\n",
__func__);
return -EIO;
@@ -461,7 +474,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* When padding is in use and applies to the transfer, insert
* it and change the message type.
*/
- if (rtype == rpcrdma_noch) {
+ if (req->rl_rtype == rpcrdma_noch) {

padlen = rpcrdma_inline_pullup(rqst,
RPCRDMA_INLINE_PAD_VALUE(rqst));
@@ -476,7 +489,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
- if (wtype != rpcrdma_noch) {
+ if (req->rl_wtype != rpcrdma_noch) {
dprintk("RPC: %s: invalid chunk list\n",
__func__);
return -EIO;
@@ -497,30 +510,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* on receive. Therefore, we request a reply chunk
* for non-writes wherever feasible and efficient.
*/
- if (wtype == rpcrdma_noch)
- wtype = rpcrdma_replych;
+ if (req->rl_wtype == rpcrdma_noch)
+ req->rl_wtype = rpcrdma_replych;
}
}

- /*
- * Marshal chunks. This routine will return the header length
- * consumed by marshaling.
- */
- if (rtype != rpcrdma_noch) {
- hdrlen = rpcrdma_create_chunks(rqst,
- &rqst->rq_snd_buf, headerp, rtype);
- wtype = rtype; /* simplify dprintk */
-
- } else if (wtype != rpcrdma_noch) {
- hdrlen = rpcrdma_create_chunks(rqst,
- &rqst->rq_rcv_buf, headerp, wtype);
- }
+ hdrlen = rpcrdma_marshal_chunks(rqst, hdrlen);
if (hdrlen < 0)
return hdrlen;

dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
" headerp 0x%p base 0x%p lkey 0x%x\n",
- __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
+ __func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
headerp, base, req->rl_iov.lkey);

/*
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 4185102..f6d280b 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -597,13 +597,14 @@ xprt_rdma_send_request(struct rpc_task *task)
struct rpc_xprt *xprt = rqst->rq_xprt;
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
- int rc;
+ int rc = 0;

- if (req->rl_niovs == 0) {
+ if (req->rl_niovs == 0)
rc = rpcrdma_marshal_req(rqst);
- if (rc < 0)
- goto failed_marshal;
- }
+ else if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
+ rc = rpcrdma_marshal_chunks(rqst, 0);
+ if (rc < 0)
+ goto failed_marshal;

if (req->rl_reply == NULL) /* e.g. reconnection */
rpcrdma_recv_buffer_get(req);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index f3d86b2..c270e59 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -99,6 +99,14 @@ struct rpcrdma_ep {
#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)

+enum rpcrdma_chunktype {
+ rpcrdma_noch = 0,
+ rpcrdma_readch,
+ rpcrdma_areadch,
+ rpcrdma_writech,
+ rpcrdma_replych
+};
+
/*
* struct rpcrdma_rep -- this structure encapsulates state required to recv
* and complete a reply, asychronously. It needs several pieces of
@@ -192,6 +200,7 @@ struct rpcrdma_req {
unsigned int rl_niovs; /* 0, 2 or 4 */
unsigned int rl_nchunks; /* non-zero if chunks */
unsigned int rl_connect_cookie; /* retry detection */
+ enum rpcrdma_chunktype rl_rtype, rl_wtype;
struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
@@ -347,6 +356,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
+ssize_t rpcrdma_marshal_chunks(struct rpc_rqst *, ssize_t);
int rpcrdma_marshal_req(struct rpc_rqst *);
size_t rpcrdma_max_payload(struct rpcrdma_xprt *);



2014-07-09 16:58:32

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 14/21] xprtrdma: Disable completions for FAST_REG_MR Work Requests

Instead of relying on a completion to change the state of an FRMR
to FRMR_IS_VALID, set it in advance. If an error occurs, a completion
will fire anyway and mark the FRMR FRMR_IS_STALE.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 78290bb..e83eda6 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -159,10 +159,7 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
return;
}

- if (wc->opcode == IB_WC_FAST_REG_MR)
- frmr->r.frmr.fr_state = FRMR_IS_VALID;
- else if (wc->opcode == IB_WC_LOCAL_INV)
- frmr->r.frmr.fr_state = FRMR_IS_INVALID;
+ frmr->r.frmr.fr_state = FRMR_IS_INVALID;
}

static int
@@ -1740,10 +1737,11 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
dprintk("RPC: %s: Using frmr %p to map %d segments\n",
__func__, mw, i);

+ frmr->fr_state = FRMR_IS_VALID;
+
memset(&frmr_wr, 0, sizeof frmr_wr);
frmr_wr.wr_id = (unsigned long)(void *)mw;
frmr_wr.opcode = IB_WR_FAST_REG_MR;
- frmr_wr.send_flags = IB_SEND_SIGNALED;
frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
frmr_wr.wr.fast_reg.page_list = frmr->fr_pgl;
frmr_wr.wr.fast_reg.page_list_len = page_no;
@@ -1775,6 +1773,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
*nsegs = i;
return 0;
out_err:
+ frmr->fr_state = FRMR_IS_INVALID;
while (i--)
rpcrdma_unmap_one(ia, --seg);
return rc;


2014-07-09 16:56:41

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 01/21] xprtrdma: Fix panic in rpcrdma_register_frmr_external()

seg1->mr_nsegs is not yet initialized when it is used to unmap
segments during an error exit. Use the same unmapping logic for
all error exits.

"if (frmr_wr.wr.fast_reg.length < len) {" used to be a BUG_ON check.
The broken code will never be executed under normal operation.

Fixes: c977dea (xprtrdma: Remove BUG_ON() call sites)
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 13dbd1c..78bd7c6 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1545,9 +1545,8 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
if (frmr_wr.wr.fast_reg.length < len) {
- while (seg1->mr_nsegs--)
- rpcrdma_unmap_one(ia, seg++);
- return -EIO;
+ rc = -EIO;
+ goto out_err;
}

/* Bump the key */
@@ -1565,8 +1564,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
if (rc) {
dprintk("RPC: %s: failed ib_post_send for register,"
" status %i\n", __func__, rc);
- while (i--)
- rpcrdma_unmap_one(ia, --seg);
+ goto out_err;
} else {
seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
@@ -1574,6 +1572,10 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
seg1->mr_len = len;
}
*nsegs = i;
+ return 0;
+out_err:
+ while (i--)
+ rpcrdma_unmap_one(ia, --seg);
return rc;
}



2014-07-09 16:57:16

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 05/21] xprtrdma: On disconnect, don't ignore pending CQEs

xprtrdma is currently throwing away queued completions during
a reconnect. RPC replies posted just before connection loss, or
successful completions that change the state of an FRMR, can be
missed.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 0d5187d..7fd457e 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -310,6 +310,13 @@ rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
rpcrdma_recvcq_poll(cq, ep);
}

+static void
+rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
+{
+ rpcrdma_recvcq_upcall(ep->rep_attr.recv_cq, ep);
+ rpcrdma_sendcq_upcall(ep->rep_attr.send_cq, ep);
+}
+
#ifdef RPC_DEBUG
static const char * const conn[] = {
"address resolved",
@@ -872,9 +879,7 @@ retry:
if (rc && rc != -ENOTCONN)
dprintk("RPC: %s: rpcrdma_ep_disconnect"
" status %i\n", __func__, rc);
-
- rpcrdma_clean_cq(ep->rep_attr.recv_cq);
- rpcrdma_clean_cq(ep->rep_attr.send_cq);
+ rpcrdma_flush_cqs(ep);

xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
id = rpcrdma_create_id(xprt, ia,
@@ -985,8 +990,7 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
int rc;

- rpcrdma_clean_cq(ep->rep_attr.recv_cq);
- rpcrdma_clean_cq(ep->rep_attr.send_cq);
+ rpcrdma_flush_cqs(ep);
rc = rdma_disconnect(ia->ri_id);
if (!rc) {
/* returns without wait if not connected */


2014-07-09 16:57:58

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 10/21] xprtrdma: Properly handle exhaustion of the rb_mws list

If the rb_mws list is exhausted, clean up and return NULL so that
call_allocate() will delay and try again.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 103 ++++++++++++++++++++++++++++++-------------
1 file changed, 71 insertions(+), 32 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index aa14a36..5796e09 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1256,6 +1256,67 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
kfree(buf->rb_pool);
}

+/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
+ * some req segments uninitialized.
+ */
+static void
+rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
+{
+ if (*mw) {
+ list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
+ *mw = NULL;
+ }
+}
+
+/* Cycle mw's back in reverse order, and "spin" them.
+ * This delays and scrambles reuse as much as possible.
+ */
+static void
+rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_mr_seg *seg = req->rl_segments;
+ struct rpcrdma_mr_seg *seg1 = seg;
+ int i;
+
+ for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
+ rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
+ rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
+}
+
+static void
+rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+{
+ buf->rb_send_bufs[--buf->rb_send_index] = req;
+ req->rl_niovs = 0;
+ if (req->rl_reply) {
+ buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
+ req->rl_reply->rr_func = NULL;
+ req->rl_reply = NULL;
+ }
+}
+
+static struct rpcrdma_req *
+rpcrdma_buffer_get_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_mw *r;
+ int i;
+
+ i = RPCRDMA_MAX_SEGS - 1;
+ while (!list_empty(&buf->rb_mws)) {
+ r = list_entry(buf->rb_mws.next,
+ struct rpcrdma_mw, mw_list);
+ list_del(&r->mw_list);
+ req->rl_segments[i].mr_chunk.rl_mw = r;
+ if (unlikely(i-- == 0))
+ return req; /* Success */
+ }
+
+ /* Not enough entries on rb_mws for this req */
+ rpcrdma_buffer_put_sendbuf(req, buf);
+ rpcrdma_buffer_put_mrs(req, buf);
+ return NULL;
+}
+
/*
* Get a set of request/reply buffers.
*
@@ -1268,10 +1329,9 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
+ struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
struct rpcrdma_req *req;
unsigned long flags;
- int i;
- struct rpcrdma_mw *r;

spin_lock_irqsave(&buffers->rb_lock, flags);
if (buffers->rb_send_index == buffers->rb_max_requests) {
@@ -1291,14 +1351,13 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
}
buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
- if (!list_empty(&buffers->rb_mws)) {
- i = RPCRDMA_MAX_SEGS - 1;
- do {
- r = list_entry(buffers->rb_mws.next,
- struct rpcrdma_mw, mw_list);
- list_del(&r->mw_list);
- req->rl_segments[i].mr_chunk.rl_mw = r;
- } while (--i >= 0);
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_FRMR:
+ case RPCRDMA_MTHCAFMR:
+ req = rpcrdma_buffer_get_mrs(req, buffers);
+ break;
+ default:
+ break;
}
spin_unlock_irqrestore(&buffers->rb_lock, flags);
return req;
@@ -1313,34 +1372,14 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
- int i;
unsigned long flags;

spin_lock_irqsave(&buffers->rb_lock, flags);
- buffers->rb_send_bufs[--buffers->rb_send_index] = req;
- req->rl_niovs = 0;
- if (req->rl_reply) {
- buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
- req->rl_reply->rr_func = NULL;
- req->rl_reply = NULL;
- }
+ rpcrdma_buffer_put_sendbuf(req, buffers);
switch (ia->ri_memreg_strategy) {
case RPCRDMA_FRMR:
case RPCRDMA_MTHCAFMR:
- /*
- * Cycle mw's back in reverse order, and "spin" them.
- * This delays and scrambles reuse as much as possible.
- */
- i = 1;
- do {
- struct rpcrdma_mw **mw;
- mw = &req->rl_segments[i].mr_chunk.rl_mw;
- list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
- *mw = NULL;
- } while (++i < RPCRDMA_MAX_SEGS);
- list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
- &buffers->rb_mws);
- req->rl_segments[0].mr_chunk.rl_mw = NULL;
+ rpcrdma_buffer_put_mrs(req, buffers);
break;
default:
break;


2014-07-09 20:27:25

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v2 08/21] xprtrdma: Back off rkey when FAST_REG_MR fails


On Jul 9, 2014, at 4:07 PM, Anna Schumaker <[email protected]> wrote:

> On 07/09/2014 12:57 PM, Chuck Lever wrote:
>> If posting a FAST_REG_MR Work Reqeust fails, revert the rkey update
>> to avoid subsequent IB_WC_MW_BIND_ERR completions.
>>
>> Suggested-by: Steve Wise <[email protected]>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++++++++-------
>> 1 file changed, 20 insertions(+), 7 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>> index ce847d2..14d24ec 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -1487,6 +1487,24 @@ rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
>> seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
>> }
>>
>> +static void
>> +rpcrdma_increment_frmr_rkey(struct rpcrdma_mw *mw)
>> +{
>> + struct ib_mr *mr = mw->r.frmr.fr_mr;
>> + u8 key = mr->rkey & 0x000000FF;
>
> It's not super obvious to be how this key is being calculated. Can you change 0x000000FF into a named constant to help?

Bumping the rkey is the same for any RDMA consumer, so it would be
even better to use a common helper.

include/rdma/ib_verbs.h has ib_inc_rkey() . . .

> It might also be useful to have an frmr_get_rkey() function rather than doing this calculation in two different places.

. . . but there isn?t an ib_dec_rkey(). I could add one.

> Anna
>
>> +
>> + ib_update_fast_reg_key(mr, ++key);
>> +}
>> +
>> +static void
>> +rpcrdma_decrement_frmr_rkey(struct rpcrdma_mw *mw)
>> +{
>> + struct ib_mr *mr = mw->r.frmr.fr_mr;
>> + u8 key = mr->rkey & 0x000000FF;
>> +
>> + ib_update_fast_reg_key(mr, --key);
>> +}
>> +
>> static int
>> rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
>> int *nsegs, int writing, struct rpcrdma_ia *ia,
>> @@ -1496,8 +1514,6 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
>> struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
>> struct rpcrdma_frmr *frmr = &mw->r.frmr;
>> struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
>> -
>> - u8 key;
>> int len, pageoff;
>> int i, rc;
>> int seg_len;
>> @@ -1557,14 +1573,10 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
>> rc = -EIO;
>> goto out_err;
>> }
>> -
>> - /* Bump the key */
>> - key = (u8)(frmr->fr_mr->rkey & 0x000000FF);
>> - ib_update_fast_reg_key(frmr->fr_mr, ++key);
>> -
>> frmr_wr.wr.fast_reg.access_flags = (writing ?
>> IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
>> IB_ACCESS_REMOTE_READ);
>> + rpcrdma_increment_frmr_rkey(mw);
>> frmr_wr.wr.fast_reg.rkey = frmr->fr_mr->rkey;
>> DECR_CQCOUNT(&r_xprt->rx_ep);
>>
>> @@ -1573,6 +1585,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
>> if (rc) {
>> dprintk("RPC: %s: failed ib_post_send for register,"
>> " status %i\n", __func__, rc);
>> + rpcrdma_decrement_frmr_rkey(seg1->mr_chunk.rl_mw);
>> goto out_err;
>> } else {
>> seg1->mr_rkey = frmr->fr_mr->rkey;
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>

--
Chuck Lever
chuck[dot]lever[at]oracle[dot]com




2014-07-09 21:39:42

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v2 05/21] xprtrdma: On disconnect, don't ignore pending CQEs

Hi Shirley-

On Jul 9, 2014, at 5:28 PM, Shirley Ma <[email protected]> wrote:

> Should all rdma_clean_cq be replaced by flush_cqs? The outstanding CQEs should be processed in any context.

The only other context is when the transport is being destroyed (ie,
at umount time).

Send CQEs don?t matter at that point: the FRMRs are going to get
deregistered no matter what state they are in.

The RPC client waits for all remaining RPCs to complete before it
attempts to destroy the transport, so there shouldn?t be any receive
CQEs at that point either.

> Shirley
>
> On 07/09/2014 09:57 AM, Chuck Lever wrote:
>> xprtrdma is currently throwing away queued completions during
>> a reconnect. RPC replies posted just before connection loss, or
>> successful completions that change the state of an FRMR, can be
>> missed.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> net/sunrpc/xprtrdma/verbs.c | 14 +++++++++-----
>> 1 file changed, 9 insertions(+), 5 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>> index 0d5187d..7fd457e 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -310,6 +310,13 @@ rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
>> rpcrdma_recvcq_poll(cq, ep);
>> }
>>
>> +static void
>> +rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
>> +{
>> + rpcrdma_recvcq_upcall(ep->rep_attr.recv_cq, ep);
>> + rpcrdma_sendcq_upcall(ep->rep_attr.send_cq, ep);
>> +}
>> +
>> #ifdef RPC_DEBUG
>> static const char * const conn[] = {
>> "address resolved",
>> @@ -872,9 +879,7 @@ retry:
>> if (rc && rc != -ENOTCONN)
>> dprintk("RPC: %s: rpcrdma_ep_disconnect"
>> " status %i\n", __func__, rc);
>> -
>> - rpcrdma_clean_cq(ep->rep_attr.recv_cq);
>> - rpcrdma_clean_cq(ep->rep_attr.send_cq);
>> + rpcrdma_flush_cqs(ep);
>>
>> xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
>> id = rpcrdma_create_id(xprt, ia,
>> @@ -985,8 +990,7 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
>> {
>> int rc;
>>
>> - rpcrdma_clean_cq(ep->rep_attr.recv_cq);
>> - rpcrdma_clean_cq(ep->rep_attr.send_cq);
>> + rpcrdma_flush_cqs(ep);
>> rc = rdma_disconnect(ia->ri_id);
>> if (!rc) {
>> /* returns without wait if not connected */
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever
chuck[dot]lever[at]oracle[dot]com




2014-07-09 16:58:48

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 16/21] xprtrdma: Rename frmr_wr

Clean up: Name frmr_wr after the opcode of the Work Request,
consistent with the send and local invalidation paths.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 26 +++++++++++++-------------
1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 8eef343..d0a893d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1701,7 +1701,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
struct rpcrdma_frmr *frmr = &mw->r.frmr;
- struct ib_send_wr frmr_wr, *bad_wr;
+ struct ib_send_wr fastreg_wr, *bad_wr;
int len, pageoff;
int i, rc;
int seg_len;
@@ -1734,26 +1734,26 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,

frmr->fr_state = FRMR_IS_VALID;

- memset(&frmr_wr, 0, sizeof frmr_wr);
- frmr_wr.wr_id = (unsigned long)(void *)mw;
- frmr_wr.opcode = IB_WR_FAST_REG_MR;
- frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
- frmr_wr.wr.fast_reg.page_list = frmr->fr_pgl;
- frmr_wr.wr.fast_reg.page_list_len = page_no;
- frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
- frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
- if (frmr_wr.wr.fast_reg.length < len) {
+ memset(&fastreg_wr, 0, sizeof(fastreg_wr));
+ fastreg_wr.wr_id = (unsigned long)(void *)mw;
+ fastreg_wr.opcode = IB_WR_FAST_REG_MR;
+ fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma;
+ fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
+ fastreg_wr.wr.fast_reg.page_list_len = page_no;
+ fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
+ fastreg_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
+ if (fastreg_wr.wr.fast_reg.length < len) {
rc = -EIO;
goto out_err;
}
- frmr_wr.wr.fast_reg.access_flags = (writing ?
+ fastreg_wr.wr.fast_reg.access_flags = (writing ?
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ);
rpcrdma_increment_frmr_rkey(mw);
- frmr_wr.wr.fast_reg.rkey = frmr->fr_mr->rkey;
+ fastreg_wr.wr.fast_reg.rkey = frmr->fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);

- rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
+ rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
if (rc) {
dprintk("RPC: %s: failed ib_post_send for register,"
" status %i\n", __func__, rc);


2014-07-10 19:19:24

by Steve Wise

[permalink] [raw]
Subject: Re: [PATCH v2 00/21] NFS/RDMA client patches for 3.17

Hey Chuck,

I tested this series and it tested ok for me. I tested NFS v3 and v4
over cxgb4 and mlx4: cthon, iozone, fio, and xfs. No regressions seen.

---

Tested-by: Steve Wise <[email protected]>


2014-07-09 16:59:35

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 21/21] xprtrdma: Handle additional connection events

Commit 38ca83a5 added RDMA_CM_EVENT_TIMEWAIT_EXIT. But that status
is relevant only for consumers that re-use their QPs on new
connections. xprtrdma creates a fresh QP on reconnection, so that
event should be explicitly ignored.

Squelch the alarming "unexpected CM event" message.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 25 ++++++++++++++-----------
1 file changed, 14 insertions(+), 11 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index bc0909a..9de5965 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -326,8 +326,16 @@ static const char * const conn[] = {
"rejected",
"established",
"disconnected",
- "device removal"
+ "device removal",
+ "multicast join",
+ "multicast error",
+ "address change",
+ "timewait exit",
};
+
+#define CONNECTION_MSG(status) \
+ ((status) < ARRAY_SIZE(conn) ? \
+ conn[(status)] : "unrecognized connection error")
#endif

static int
@@ -385,23 +393,18 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
case RDMA_CM_EVENT_DEVICE_REMOVAL:
connstate = -ENODEV;
connected:
- dprintk("RPC: %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
- __func__,
- (event->event <= 11) ? conn[event->event] :
- "unknown connection error",
- &addr->sin_addr.s_addr,
- ntohs(addr->sin_port),
- ep, event->event);
atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
dprintk("RPC: %s: %sconnected\n",
__func__, connstate > 0 ? "" : "dis");
ep->rep_connected = connstate;
ep->rep_func(ep);
wake_up_all(&ep->rep_connect_wait);
- break;
+ /*FALLTHROUGH*/
default:
- dprintk("RPC: %s: unexpected CM event %d\n",
- __func__, event->event);
+ dprintk("RPC: %s: %pI4:%u (ep 0x%p): %s\n",
+ __func__, &addr->sin_addr.s_addr,
+ ntohs(addr->sin_port), ep,
+ CONNECTION_MSG(event->event));
break;
}



2014-07-09 16:56:59

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 03/21] xprtrdma: Limit data payload size for ALLPHYSICAL

When the client uses physical memory registration, each page in the
payload gets its own array entry in the RPC/RDMA header's chunk list.

Therefore, don't advertise a maximum payload size that would require
more array entries than can fit in the RPC buffer where RPC/RDMA
headers are built.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=248
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 4 +++-
net/sunrpc/xprtrdma/verbs.c | 41 +++++++++++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
3 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 66f91f0..4185102 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -296,7 +296,6 @@ xprt_setup_rdma(struct xprt_create *args)

xprt->resvport = 0; /* privileged port not needed */
xprt->tsh_size = 0; /* RPC-RDMA handles framing */
- xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
xprt->ops = &xprt_rdma_procs;

/*
@@ -382,6 +381,9 @@ xprt_setup_rdma(struct xprt_create *args)
new_ep->rep_xprt = xprt;

xprt_rdma_format_addresses(xprt);
+ xprt->max_payload = rpcrdma_max_payload(new_xprt);
+ dprintk("RPC: %s: transport data payload maximum: %zu bytes\n",
+ __func__, xprt->max_payload);

if (!try_module_get(THIS_MODULE))
goto out4;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index cab742d..0d5187d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1822,3 +1822,44 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
rc);
return rc;
}
+
+/* Physical mapping means one Read/Write list entry per-page.
+ * All list entries must fit within an inline buffer
+ *
+ * NB: The server must return a Write list for NFS READ,
+ * which has the same constraint. Factor in the inline
+ * rsize as well.
+ */
+static size_t
+rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ unsigned int inline_size, pages;
+
+ inline_size = min_t(unsigned int,
+ cdata->inline_wsize, cdata->inline_rsize);
+ inline_size -= RPCRDMA_HDRLEN_MIN;
+ pages = inline_size / sizeof(struct rpcrdma_segment);
+ return pages << PAGE_SHIFT;
+}
+
+static size_t
+rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
+{
+ return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
+}
+
+size_t
+rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
+{
+ size_t result;
+
+ switch (r_xprt->rx_ia.ri_memreg_strategy) {
+ case RPCRDMA_ALLPHYSICAL:
+ result = rpcrdma_physical_max_payload(r_xprt);
+ break;
+ default:
+ result = rpcrdma_mr_max_payload(r_xprt);
+ }
+ return result;
+}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 97ca516..f3d86b2 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -348,6 +348,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
int rpcrdma_marshal_req(struct rpc_rqst *);
+size_t rpcrdma_max_payload(struct rpcrdma_xprt *);

/* Temporary NFS request map cache. Created in svc_rdma.c */
extern struct kmem_cache *svc_rdma_map_cachep;


2014-07-09 16:59:25

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 20/21] xprtrdma: Remove RPCRDMA_PERSISTENT_REGISTRATION macro

Clean up.

RPCRDMA_PERSISTENT_REGISTRATION was a compile-time switch between
RPCRDMA_REGISTER mode and RPCRDMA_ALLPHYSICAL mode. Since
RPCRDMA_REGISTER has been removed, there's no need for the extra
conditional compilation.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprtrdma.h | 2 --
net/sunrpc/xprtrdma/verbs.c | 13 -------------
2 files changed, 15 deletions(-)

diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
index c2f04e1..64a0a0a 100644
--- a/include/linux/sunrpc/xprtrdma.h
+++ b/include/linux/sunrpc/xprtrdma.h
@@ -62,8 +62,6 @@
#define RPCRDMA_INLINE_PAD_THRESH (512)/* payload threshold to pad (bytes) */

/* memory registration strategies */
-#define RPCRDMA_PERSISTENT_REGISTRATION (1)
-
enum rpcrdma_memreg {
RPCRDMA_BOUNCEBUFFERS = 0,
RPCRDMA_REGISTER,
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index da4bbf3..bc0909a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -561,12 +561,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
if (!ia->ri_id->device->alloc_fmr) {
dprintk("RPC: %s: MTHCAFMR registration "
"not supported by HCA\n", __func__);
-#if RPCRDMA_PERSISTENT_REGISTRATION
memreg = RPCRDMA_ALLPHYSICAL;
-#else
- rc = -ENOMEM;
- goto out2;
-#endif
}
}

@@ -581,20 +576,16 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
switch (memreg) {
case RPCRDMA_FRMR:
break;
-#if RPCRDMA_PERSISTENT_REGISTRATION
case RPCRDMA_ALLPHYSICAL:
mem_priv = IB_ACCESS_LOCAL_WRITE |
IB_ACCESS_REMOTE_WRITE |
IB_ACCESS_REMOTE_READ;
goto register_setup;
-#endif
case RPCRDMA_MTHCAFMR:
if (ia->ri_have_dma_lkey)
break;
mem_priv = IB_ACCESS_LOCAL_WRITE;
-#if RPCRDMA_PERSISTENT_REGISTRATION
register_setup:
-#endif
ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
if (IS_ERR(ia->ri_bind_mem)) {
printk(KERN_ALERT "%s: ib_get_dma_mr for "
@@ -1914,7 +1905,6 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg,

switch (ia->ri_memreg_strategy) {

-#if RPCRDMA_PERSISTENT_REGISTRATION
case RPCRDMA_ALLPHYSICAL:
rpcrdma_map_one(ia, seg, writing);
seg->mr_rkey = ia->ri_bind_mem->rkey;
@@ -1922,7 +1912,6 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
seg->mr_nsegs = 1;
nsegs = 1;
break;
-#endif

/* Registration using frmr registration */
case RPCRDMA_FRMR:
@@ -1952,13 +1941,11 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,

switch (ia->ri_memreg_strategy) {

-#if RPCRDMA_PERSISTENT_REGISTRATION
case RPCRDMA_ALLPHYSICAL:
read_lock(&ia->ri_qplock);
rpcrdma_unmap_one(ia, seg);
read_unlock(&ia->ri_qplock);
break;
-#endif

case RPCRDMA_FRMR:
rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);


2014-07-09 16:57:32

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 07/21] xprtrdma: Unclutter struct rpcrdma_mr_seg

Clean ups:
- make it obvious that the rl_mw field is a pointer -- allocated
separately, not as part of struct rpcrdma_mr_seg
- promote "struct {} frmr;" to a named type
- promote the state enum to a named type
- name the MW state field the same way other fields in
rpcrdma_mw are named

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 33 ++++++++++++++---------------
net/sunrpc/xprtrdma/xprt_rdma.h | 44 +++++++++++++++++++++++++++++----------
2 files changed, 49 insertions(+), 28 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 7fd457e..ce847d2 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -156,9 +156,9 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
return;

if (wc->opcode == IB_WC_FAST_REG_MR)
- frmr->r.frmr.state = FRMR_IS_VALID;
+ frmr->r.frmr.fr_state = FRMR_IS_VALID;
else if (wc->opcode == IB_WC_LOCAL_INV)
- frmr->r.frmr.state = FRMR_IS_INVALID;
+ frmr->r.frmr.fr_state = FRMR_IS_INVALID;
}

static int
@@ -1493,6 +1493,8 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_mr_seg *seg1 = seg;
+ struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
+ struct rpcrdma_frmr *frmr = &mw->r.frmr;
struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;

u8 key;
@@ -1512,8 +1514,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
rpcrdma_map_one(ia, seg, writing);
pa = seg->mr_dma;
for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
- seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
- page_list[page_no++] = pa;
+ frmr->fr_pgl->page_list[page_no++] = pa;
pa += PAGE_SIZE;
}
len += seg->mr_len;
@@ -1525,20 +1526,18 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
break;
}
dprintk("RPC: %s: Using frmr %p to map %d segments\n",
- __func__, seg1->mr_chunk.rl_mw, i);
+ __func__, mw, i);

- if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
+ if (unlikely(frmr->fr_state == FRMR_IS_VALID)) {
dprintk("RPC: %s: frmr %x left valid, posting invalidate.\n",
- __func__,
- seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
+ __func__, frmr->fr_mr->rkey);
/* Invalidate before using. */
memset(&invalidate_wr, 0, sizeof invalidate_wr);
- invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
+ invalidate_wr.wr_id = (unsigned long)(void *)mw;
invalidate_wr.next = &frmr_wr;
invalidate_wr.opcode = IB_WR_LOCAL_INV;
invalidate_wr.send_flags = IB_SEND_SIGNALED;
- invalidate_wr.ex.invalidate_rkey =
- seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+ invalidate_wr.ex.invalidate_rkey = frmr->fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);
post_wr = &invalidate_wr;
} else
@@ -1546,11 +1545,11 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,

/* Prepare FRMR WR */
memset(&frmr_wr, 0, sizeof frmr_wr);
- frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
+ frmr_wr.wr_id = (unsigned long)(void *)mw;
frmr_wr.opcode = IB_WR_FAST_REG_MR;
frmr_wr.send_flags = IB_SEND_SIGNALED;
frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
- frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
+ frmr_wr.wr.fast_reg.page_list = frmr->fr_pgl;
frmr_wr.wr.fast_reg.page_list_len = page_no;
frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
@@ -1560,13 +1559,13 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
}

/* Bump the key */
- key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
- ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
+ key = (u8)(frmr->fr_mr->rkey & 0x000000FF);
+ ib_update_fast_reg_key(frmr->fr_mr, ++key);

frmr_wr.wr.fast_reg.access_flags = (writing ?
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ);
- frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+ frmr_wr.wr.fast_reg.rkey = frmr->fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);

rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
@@ -1576,7 +1575,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
" status %i\n", __func__, rc);
goto out_err;
} else {
- seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+ seg1->mr_rkey = frmr->fr_mr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
seg1->mr_len = len;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c270e59..84c3455 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -146,6 +146,38 @@ struct rpcrdma_rep {
};

/*
+ * struct rpcrdma_mw - external memory region metadata
+ *
+ * An external memory region is any buffer or page that is registered
+ * on the fly (ie, not pre-registered).
+ *
+ * Each rpcrdma_buffer has a list of these anchored in rb_mws. During
+ * call_allocate, rpcrdma_buffer_get() assigns one to each segment in
+ * an rpcrdma_req. Then rpcrdma_register_external() grabs these to keep
+ * track of registration metadata while each RPC is pending.
+ * rpcrdma_deregister_external() uses this metadata to unmap and
+ * release these resources when an RPC is complete.
+ */
+enum rpcrdma_frmr_state {
+ FRMR_IS_INVALID, /* ready to be used */
+ FRMR_IS_VALID, /* in use */
+};
+
+struct rpcrdma_frmr {
+ struct ib_fast_reg_page_list *fr_pgl;
+ struct ib_mr *fr_mr;
+ enum rpcrdma_frmr_state fr_state;
+};
+
+struct rpcrdma_mw {
+ union {
+ struct ib_fmr *fmr;
+ struct rpcrdma_frmr frmr;
+ } r;
+ struct list_head mw_list;
+};
+
+/*
* struct rpcrdma_req -- structure central to the request/reply sequence.
*
* N of these are associated with a transport instance, and stored in
@@ -172,17 +204,7 @@ struct rpcrdma_rep {
struct rpcrdma_mr_seg { /* chunk descriptors */
union { /* chunk memory handles */
struct ib_mr *rl_mr; /* if registered directly */
- struct rpcrdma_mw { /* if registered from region */
- union {
- struct ib_fmr *fmr;
- struct {
- struct ib_fast_reg_page_list *fr_pgl;
- struct ib_mr *fr_mr;
- enum { FRMR_IS_INVALID, FRMR_IS_VALID } state;
- } frmr;
- } r;
- struct list_head mw_list;
- } *rl_mw;
+ struct rpcrdma_mw *rl_mw; /* if registered from region */
} mr_chunk;
u64 mr_base; /* registration result */
u32 mr_rkey; /* registration result */


2014-07-09 16:58:23

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 13/21] xprtrdma: Don't post a LOCAL_INV in rpcrdma_register_frmr_external()

Any FRMR arriving in rpcrdma_register_frmr_external() is now
guaranteed to be either invalid, or to be targeted by a queued
LOCAL_INV that will invalidate it before the adapter processes
the FAST_REG_MR being built here.

The problem with current arrangement of chaining a LOCAL_INV to the
FAST_REG_MR is that if the transport is not connected, the LOCAL_INV
is flushed and the FAST_REG_MR is flushed. This leaves the FRMR
valid with the old rkey. But rpcrdma_register_frmr_external() has
already bumped the in-memory rkey.

Next time through rpcrdma_register_frmr_external(), a LOCAL_INV and
FAST_REG_MR is attempted again because the FRMR is still valid. But
the rkey no longer matches the hardware's rkey, and a memory
management operation error occurs.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 21 ++-------------------
1 file changed, 2 insertions(+), 19 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 877c928..78290bb 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1709,7 +1709,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
struct rpcrdma_frmr *frmr = &mw->r.frmr;
- struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
+ struct ib_send_wr frmr_wr, *bad_wr;
int len, pageoff;
int i, rc;
int seg_len;
@@ -1740,22 +1740,6 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
dprintk("RPC: %s: Using frmr %p to map %d segments\n",
__func__, mw, i);

- if (unlikely(frmr->fr_state != FRMR_IS_INVALID)) {
- dprintk("RPC: %s: frmr %x left valid, posting invalidate.\n",
- __func__, frmr->fr_mr->rkey);
- /* Invalidate before using. */
- memset(&invalidate_wr, 0, sizeof invalidate_wr);
- invalidate_wr.wr_id = (unsigned long)(void *)mw;
- invalidate_wr.next = &frmr_wr;
- invalidate_wr.opcode = IB_WR_LOCAL_INV;
- invalidate_wr.send_flags = IB_SEND_SIGNALED;
- invalidate_wr.ex.invalidate_rkey = frmr->fr_mr->rkey;
- DECR_CQCOUNT(&r_xprt->rx_ep);
- post_wr = &invalidate_wr;
- } else
- post_wr = &frmr_wr;
-
- /* Prepare FRMR WR */
memset(&frmr_wr, 0, sizeof frmr_wr);
frmr_wr.wr_id = (unsigned long)(void *)mw;
frmr_wr.opcode = IB_WR_FAST_REG_MR;
@@ -1776,8 +1760,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
frmr_wr.wr.fast_reg.rkey = frmr->fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);

- rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
-
+ rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
if (rc) {
dprintk("RPC: %s: failed ib_post_send for register,"
" status %i\n", __func__, rc);


2014-07-09 16:58:40

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 15/21] xprtrdma: Disable completions for LOCAL_INV Work Requests

Instead of relying on a completion to change the state of an FRMR
to FRMR_IS_INVALID, set it in advance. If an error occurs, a completion
will fire anyway and mark the FRMR FRMR_IS_STALE.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e83eda6..8eef343 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -154,12 +154,8 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)

if (wc->wr_id == 0ULL)
return;
- if (wc->status != IB_WC_SUCCESS) {
+ if (wc->status != IB_WC_SUCCESS)
frmr->r.frmr.fr_state = FRMR_IS_STALE;
- return;
- }
-
- frmr->r.frmr.fr_state = FRMR_IS_INVALID;
}

static int
@@ -1369,12 +1365,11 @@ rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);

/* When this FRMR is re-inserted into rb_mws, it is no longer stale */
- r->r.frmr.fr_state = FRMR_IS_VALID;
+ r->r.frmr.fr_state = FRMR_IS_INVALID;

memset(&invalidate_wr, 0, sizeof(invalidate_wr));
invalidate_wr.wr_id = (unsigned long)(void *)r;
invalidate_wr.opcode = IB_WR_LOCAL_INV;
- invalidate_wr.send_flags = IB_SEND_SIGNALED;
invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);

@@ -1787,10 +1782,11 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
struct ib_send_wr invalidate_wr, *bad_wr;
int rc;

+ seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
+
memset(&invalidate_wr, 0, sizeof invalidate_wr);
invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
invalidate_wr.opcode = IB_WR_LOCAL_INV;
- invalidate_wr.send_flags = IB_SEND_SIGNALED;
invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);

@@ -1799,9 +1795,12 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
rpcrdma_unmap_one(ia, seg++);
rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
read_unlock(&ia->ri_qplock);
- if (rc)
+ if (rc) {
+ /* Force rpcrdma_buffer_get() to retry */
+ seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
dprintk("RPC: %s: failed ib_post_send for invalidate,"
" status %i\n", __func__, rc);
+ }
return rc;
}



2014-07-09 16:58:14

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 12/21] xprtrdma: Reset FRMRs after a flushed LOCAL_INV Work Request

When a LOCAL_INV Work Request is flushed, it leaves an FRMR in the
VALID state. This FRMR can be returned by rpcrdma_buffer_get(), and
must be knocked down in rpcrdma_register_frmr_external() before it
can be re-used.

Instead, capture these in rpcrdma_buffer_get(), and reset them.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 94 ++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 92 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 56b7979..877c928 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1358,8 +1358,91 @@ rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
}
}

+/* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
+ * Redo only the ib_post_send().
+ */
+static void
+rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
+{
+ struct rpcrdma_xprt *r_xprt =
+ container_of(ia, struct rpcrdma_xprt, rx_ia);
+ struct ib_send_wr invalidate_wr, *bad_wr;
+ int rc;
+
+ dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
+
+ /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
+ r->r.frmr.fr_state = FRMR_IS_VALID;
+
+ memset(&invalidate_wr, 0, sizeof(invalidate_wr));
+ invalidate_wr.wr_id = (unsigned long)(void *)r;
+ invalidate_wr.opcode = IB_WR_LOCAL_INV;
+ invalidate_wr.send_flags = IB_SEND_SIGNALED;
+ invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
+ DECR_CQCOUNT(&r_xprt->rx_ep);
+
+ dprintk("RPC: %s: frmr %p invalidating rkey %08x\n",
+ __func__, r, r->r.frmr.fr_mr->rkey);
+
+ read_lock(&ia->ri_qplock);
+ rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
+ read_unlock(&ia->ri_qplock);
+ if (rc) {
+ /* Force rpcrdma_buffer_get() to retry */
+ r->r.frmr.fr_state = FRMR_IS_STALE;
+ dprintk("RPC: %s: ib_post_send failed, %i\n",
+ __func__, rc);
+ }
+}
+
+static void
+rpcrdma_retry_flushed_linv(struct list_head *stale,
+ struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+ struct list_head *pos;
+ struct rpcrdma_mw *r;
+ unsigned long flags;
+
+ list_for_each(pos, stale) {
+ r = list_entry(pos, struct rpcrdma_mw, mw_list);
+ rpcrdma_retry_local_inv(r, ia);
+ }
+
+ spin_lock_irqsave(&buf->rb_lock, flags);
+ list_splice_tail(stale, &buf->rb_mws);
+ spin_unlock_irqrestore(&buf->rb_lock, flags);
+}
+
static struct rpcrdma_req *
-rpcrdma_buffer_get_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
+ struct list_head *stale)
+{
+ struct rpcrdma_mw *r;
+ int i;
+
+ i = RPCRDMA_MAX_SEGS - 1;
+ while (!list_empty(&buf->rb_mws)) {
+ r = list_entry(buf->rb_mws.next,
+ struct rpcrdma_mw, mw_list);
+ list_del(&r->mw_list);
+ if (r->r.frmr.fr_state == FRMR_IS_STALE) {
+ list_add(&r->mw_list, stale);
+ continue;
+ }
+ req->rl_segments[i].mr_chunk.rl_mw = r;
+ if (unlikely(i-- == 0))
+ return req; /* Success */
+ }
+
+ /* Not enough entries on rb_mws for this req */
+ rpcrdma_buffer_put_sendbuf(req, buf);
+ rpcrdma_buffer_put_mrs(req, buf);
+ return NULL;
+}
+
+static struct rpcrdma_req *
+rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
{
struct rpcrdma_mw *r;
int i;
@@ -1393,6 +1476,7 @@ struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
+ struct list_head stale;
struct rpcrdma_req *req;
unsigned long flags;

@@ -1414,15 +1498,21 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
}
buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+
+ INIT_LIST_HEAD(&stale);
switch (ia->ri_memreg_strategy) {
case RPCRDMA_FRMR:
+ req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
+ break;
case RPCRDMA_MTHCAFMR:
- req = rpcrdma_buffer_get_mrs(req, buffers);
+ req = rpcrdma_buffer_get_fmrs(req, buffers);
break;
default:
break;
}
spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ if (!list_empty(&stale))
+ rpcrdma_retry_flushed_linv(&stale, buffers);
return req;
}



2014-07-09 16:59:16

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 19/21] xprtrdma: Make rpcrdma_ep_disconnect() return void

Clean up: The return code is used only for dprintk's that are
already redundant.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 2 +-
net/sunrpc/xprtrdma/verbs.c | 14 ++++----------
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +-
3 files changed, 6 insertions(+), 12 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index f6d280b..2faac49 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -414,7 +414,7 @@ xprt_rdma_close(struct rpc_xprt *xprt)
if (r_xprt->rx_ep.rep_connected > 0)
xprt->reestablish_timeout = 0;
xprt_disconnect_done(xprt);
- (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+ rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
}

static void
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index a8a86a0..da4bbf3 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -830,10 +830,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
cancel_delayed_work_sync(&ep->rep_connect_worker);

if (ia->ri_id->qp) {
- rc = rpcrdma_ep_disconnect(ep, ia);
- if (rc)
- dprintk("RPC: %s: rpcrdma_ep_disconnect"
- " returned %i\n", __func__, rc);
+ rpcrdma_ep_disconnect(ep, ia);
rdma_destroy_qp(ia->ri_id);
ia->ri_id->qp = NULL;
}
@@ -871,10 +868,8 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
struct rpcrdma_xprt *xprt;
retry:
dprintk("RPC: %s: reconnecting...\n", __func__);
- rc = rpcrdma_ep_disconnect(ep, ia);
- if (rc && rc != -ENOTCONN)
- dprintk("RPC: %s: rpcrdma_ep_disconnect"
- " status %i\n", __func__, rc);
+
+ rpcrdma_ep_disconnect(ep, ia);
rpcrdma_flush_cqs(ep);

if (ia->ri_memreg_strategy == RPCRDMA_FRMR)
@@ -984,7 +979,7 @@ out:
* This call is not reentrant, and must not be made in parallel
* on the same endpoint.
*/
-int
+void
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
int rc;
@@ -1001,7 +996,6 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
ep->rep_connected = rc;
}
- return rc;
}

static int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 1ee6db3..c419498 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -341,7 +341,7 @@ int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
struct rpcrdma_create_data_internal *);
void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
-int rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
+void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);

int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
struct rpcrdma_req *);


2014-07-09 21:28:17

by Shirley Ma

[permalink] [raw]
Subject: Re: [PATCH v2 05/21] xprtrdma: On disconnect, don't ignore pending CQEs

Should all rdma_clean_cq be replaced by flush_cqs? The outstanding CQEs should be processed in any context.

Shirley

On 07/09/2014 09:57 AM, Chuck Lever wrote:
> xprtrdma is currently throwing away queued completions during
> a reconnect. RPC replies posted just before connection loss, or
> successful completions that change the state of an FRMR, can be
> missed.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/verbs.c | 14 +++++++++-----
> 1 file changed, 9 insertions(+), 5 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 0d5187d..7fd457e 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -310,6 +310,13 @@ rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
> rpcrdma_recvcq_poll(cq, ep);
> }
>
> +static void
> +rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
> +{
> + rpcrdma_recvcq_upcall(ep->rep_attr.recv_cq, ep);
> + rpcrdma_sendcq_upcall(ep->rep_attr.send_cq, ep);
> +}
> +
> #ifdef RPC_DEBUG
> static const char * const conn[] = {
> "address resolved",
> @@ -872,9 +879,7 @@ retry:
> if (rc && rc != -ENOTCONN)
> dprintk("RPC: %s: rpcrdma_ep_disconnect"
> " status %i\n", __func__, rc);
> -
> - rpcrdma_clean_cq(ep->rep_attr.recv_cq);
> - rpcrdma_clean_cq(ep->rep_attr.send_cq);
> + rpcrdma_flush_cqs(ep);
>
> xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
> id = rpcrdma_create_id(xprt, ia,
> @@ -985,8 +990,7 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
> {
> int rc;
>
> - rpcrdma_clean_cq(ep->rep_attr.recv_cq);
> - rpcrdma_clean_cq(ep->rep_attr.send_cq);
> + rpcrdma_flush_cqs(ep);
> rc = rdma_disconnect(ia->ri_id);
> if (!rc) {
> /* returns without wait if not connected */
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>

2014-07-09 16:57:41

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 08/21] xprtrdma: Back off rkey when FAST_REG_MR fails

If posting a FAST_REG_MR Work Reqeust fails, revert the rkey update
to avoid subsequent IB_WC_MW_BIND_ERR completions.

Suggested-by: Steve Wise <[email protected]>
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++++++++-------
1 file changed, 20 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ce847d2..14d24ec 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1487,6 +1487,24 @@ rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
}

+static void
+rpcrdma_increment_frmr_rkey(struct rpcrdma_mw *mw)
+{
+ struct ib_mr *mr = mw->r.frmr.fr_mr;
+ u8 key = mr->rkey & 0x000000FF;
+
+ ib_update_fast_reg_key(mr, ++key);
+}
+
+static void
+rpcrdma_decrement_frmr_rkey(struct rpcrdma_mw *mw)
+{
+ struct ib_mr *mr = mw->r.frmr.fr_mr;
+ u8 key = mr->rkey & 0x000000FF;
+
+ ib_update_fast_reg_key(mr, --key);
+}
+
static int
rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
int *nsegs, int writing, struct rpcrdma_ia *ia,
@@ -1496,8 +1514,6 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
struct rpcrdma_frmr *frmr = &mw->r.frmr;
struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
-
- u8 key;
int len, pageoff;
int i, rc;
int seg_len;
@@ -1557,14 +1573,10 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
rc = -EIO;
goto out_err;
}
-
- /* Bump the key */
- key = (u8)(frmr->fr_mr->rkey & 0x000000FF);
- ib_update_fast_reg_key(frmr->fr_mr, ++key);
-
frmr_wr.wr.fast_reg.access_flags = (writing ?
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ);
+ rpcrdma_increment_frmr_rkey(mw);
frmr_wr.wr.fast_reg.rkey = frmr->fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);

@@ -1573,6 +1585,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
if (rc) {
dprintk("RPC: %s: failed ib_post_send for register,"
" status %i\n", __func__, rc);
+ rpcrdma_decrement_frmr_rkey(seg1->mr_chunk.rl_mw);
goto out_err;
} else {
seg1->mr_rkey = frmr->fr_mr->rkey;


2014-07-09 16:57:49

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 09/21] xprtrdma: Chain together all MWs in same buffer pool

During connection loss recovery, need to visit every MW in a
buffer pool. Any MW that is in use by an RPC will not be on the
rb_mws list.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 4 ++++
net/sunrpc/xprtrdma/xprt_rdma.h | 4 +++-
2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 14d24ec..aa14a36 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1074,6 +1074,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
p += cdata->padding;

INIT_LIST_HEAD(&buf->rb_mws);
+ INIT_LIST_HEAD(&buf->rb_all);
r = (struct rpcrdma_mw *)p;
switch (ia->ri_memreg_strategy) {
case RPCRDMA_FRMR:
@@ -1098,6 +1099,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
ib_dereg_mr(r->r.frmr.fr_mr);
goto out;
}
+ list_add(&r->mw_all, &buf->rb_all);
list_add(&r->mw_list, &buf->rb_mws);
++r;
}
@@ -1116,6 +1118,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
" failed %i\n", __func__, rc);
goto out;
}
+ list_add(&r->mw_all, &buf->rb_all);
list_add(&r->mw_list, &buf->rb_mws);
++r;
}
@@ -1225,6 +1228,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
while (!list_empty(&buf->rb_mws)) {
r = list_entry(buf->rb_mws.next,
struct rpcrdma_mw, mw_list);
+ list_del(&r->mw_all);
list_del(&r->mw_list);
switch (ia->ri_memreg_strategy) {
case RPCRDMA_FRMR:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 84c3455..c1d8652 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -151,7 +151,7 @@ struct rpcrdma_rep {
* An external memory region is any buffer or page that is registered
* on the fly (ie, not pre-registered).
*
- * Each rpcrdma_buffer has a list of these anchored in rb_mws. During
+ * Each rpcrdma_buffer has a list of free MWs anchored in rb_mws. During
* call_allocate, rpcrdma_buffer_get() assigns one to each segment in
* an rpcrdma_req. Then rpcrdma_register_external() grabs these to keep
* track of registration metadata while each RPC is pending.
@@ -175,6 +175,7 @@ struct rpcrdma_mw {
struct rpcrdma_frmr frmr;
} r;
struct list_head mw_list;
+ struct list_head mw_all;
};

/*
@@ -246,6 +247,7 @@ struct rpcrdma_buffer {
atomic_t rb_credits; /* most recent server credits */
int rb_max_requests;/* client max requests */
struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */
+ struct list_head rb_all;
int rb_send_index;
struct rpcrdma_req **rb_send_bufs;
int rb_recv_index;


2014-07-10 15:48:28

by Shirley Ma

[permalink] [raw]
Subject: Re: [PATCH v2 15/21] xprtrdma: Disable completions for LOCAL_INV Work Requests

That would significantly reduce the interrupts rate. I will run some performance test.

Would these unsignaled SENDs be possible to stall?

Shirley

On 07/09/2014 09:58 AM, Chuck Lever wrote:
> Instead of relying on a completion to change the state of an FRMR
> to FRMR_IS_INVALID, set it in advance. If an error occurs, a completion
> will fire anyway and mark the FRMR FRMR_IS_STALE.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/verbs.c | 17 ++++++++---------
> 1 file changed, 8 insertions(+), 9 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index e83eda6..8eef343 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -154,12 +154,8 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
>
> if (wc->wr_id == 0ULL)
> return;
> - if (wc->status != IB_WC_SUCCESS) {
> + if (wc->status != IB_WC_SUCCESS)
> frmr->r.frmr.fr_state = FRMR_IS_STALE;
> - return;
> - }
> -
> - frmr->r.frmr.fr_state = FRMR_IS_INVALID;
> }
>
> static int
> @@ -1369,12 +1365,11 @@ rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
> dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
>
> /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
> - r->r.frmr.fr_state = FRMR_IS_VALID;
> + r->r.frmr.fr_state = FRMR_IS_INVALID;
>
> memset(&invalidate_wr, 0, sizeof(invalidate_wr));
> invalidate_wr.wr_id = (unsigned long)(void *)r;
> invalidate_wr.opcode = IB_WR_LOCAL_INV;
> - invalidate_wr.send_flags = IB_SEND_SIGNALED;
> invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
> DECR_CQCOUNT(&r_xprt->rx_ep);
>
> @@ -1787,10 +1782,11 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
> struct ib_send_wr invalidate_wr, *bad_wr;
> int rc;
>
> + seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
> +
> memset(&invalidate_wr, 0, sizeof invalidate_wr);
> invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
> invalidate_wr.opcode = IB_WR_LOCAL_INV;
> - invalidate_wr.send_flags = IB_SEND_SIGNALED;
> invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
> DECR_CQCOUNT(&r_xprt->rx_ep);
>
> @@ -1799,9 +1795,12 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
> rpcrdma_unmap_one(ia, seg++);
> rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
> read_unlock(&ia->ri_qplock);
> - if (rc)
> + if (rc) {
> + /* Force rpcrdma_buffer_get() to retry */
> + seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
> dprintk("RPC: %s: failed ib_post_send for invalidate,"
> " status %i\n", __func__, rc);
> + }
> return rc;
> }
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>

2014-07-09 20:17:09

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v2 08/21] xprtrdma: Back off rkey when FAST_REG_MR fails

On 07/09/2014 12:57 PM, Chuck Lever wrote:
> If posting a FAST_REG_MR Work Reqeust fails, revert the rkey update
> to avoid subsequent IB_WC_MW_BIND_ERR completions.
>
> Suggested-by: Steve Wise <[email protected]>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++++++++-------
> 1 file changed, 20 insertions(+), 7 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index ce847d2..14d24ec 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -1487,6 +1487,24 @@ rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
> seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
> }
>
> +static void
> +rpcrdma_increment_frmr_rkey(struct rpcrdma_mw *mw)
> +{
> + struct ib_mr *mr = mw->r.frmr.fr_mr;
> + u8 key = mr->rkey & 0x000000FF;

It's not super obvious to be how this key is being calculated. Can you change 0x000000FF into a named constant to help?

It might also be useful to have an frmr_get_rkey() function rather than doing this calculation in two different places.

Anna

> +
> + ib_update_fast_reg_key(mr, ++key);
> +}
> +
> +static void
> +rpcrdma_decrement_frmr_rkey(struct rpcrdma_mw *mw)
> +{
> + struct ib_mr *mr = mw->r.frmr.fr_mr;
> + u8 key = mr->rkey & 0x000000FF;
> +
> + ib_update_fast_reg_key(mr, --key);
> +}
> +
> static int
> rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
> int *nsegs, int writing, struct rpcrdma_ia *ia,
> @@ -1496,8 +1514,6 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
> struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
> struct rpcrdma_frmr *frmr = &mw->r.frmr;
> struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
> -
> - u8 key;
> int len, pageoff;
> int i, rc;
> int seg_len;
> @@ -1557,14 +1573,10 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
> rc = -EIO;
> goto out_err;
> }
> -
> - /* Bump the key */
> - key = (u8)(frmr->fr_mr->rkey & 0x000000FF);
> - ib_update_fast_reg_key(frmr->fr_mr, ++key);
> -
> frmr_wr.wr.fast_reg.access_flags = (writing ?
> IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
> IB_ACCESS_REMOTE_READ);
> + rpcrdma_increment_frmr_rkey(mw);
> frmr_wr.wr.fast_reg.rkey = frmr->fr_mr->rkey;
> DECR_CQCOUNT(&r_xprt->rx_ep);
>
> @@ -1573,6 +1585,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
> if (rc) {
> dprintk("RPC: %s: failed ib_post_send for register,"
> " status %i\n", __func__, rc);
> + rpcrdma_decrement_frmr_rkey(seg1->mr_chunk.rl_mw);
> goto out_err;
> } else {
> seg1->mr_rkey = frmr->fr_mr->rkey;
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html


2014-07-09 16:58:06

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 11/21] xprtrdma: Reset FRMRs when FAST_REG_MR is flushed by a disconnect

FAST_REG_MR Work Requests update a Memory Region's rkey. Rkey's are
used to block unwanted access to the memory controlled by an MR. The
rkey is passed to the receiver (the NFS server, in our case), and is
also used by xprtrdma to invalidate the MR when the RPC is complete.

When a FAST_REG_MR Work Request is flushed after a transport
disconnect, xprtrdma cannot tell whether the WR actually hit the
adapter or not. So it is indeterminant at that point whether the
existing rkey is still valid.

After the transport connection is re-established, the next
FAST_REG_MR or LOCAL_INV Work Request against that MR can sometimes
fail because the rkey value does not match what xprtrdma expects.

The only reliable way to recover in this case is to deregister and
register the MR before it is used again. These operations can be
done only in a process context, so handle it in the transport
connect worker.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 67 ++++++++++++++++++++++++++++++++++++++-
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
2 files changed, 66 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 5796e09..56b7979 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -61,6 +61,8 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif

+static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
+
/*
* internal functions
*/
@@ -152,8 +154,10 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)

if (wc->wr_id == 0ULL)
return;
- if (wc->status != IB_WC_SUCCESS)
+ if (wc->status != IB_WC_SUCCESS) {
+ frmr->r.frmr.fr_state = FRMR_IS_STALE;
return;
+ }

if (wc->opcode == IB_WC_FAST_REG_MR)
frmr->r.frmr.fr_state = FRMR_IS_VALID;
@@ -881,6 +885,9 @@ retry:
" status %i\n", __func__, rc);
rpcrdma_flush_cqs(ep);

+ if (ia->ri_memreg_strategy == RPCRDMA_FRMR)
+ rpcrdma_reset_frmrs(ia);
+
xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
id = rpcrdma_create_id(xprt, ia,
(struct sockaddr *)&xprt->rx_data.addr);
@@ -1256,6 +1263,62 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
kfree(buf->rb_pool);
}

+/* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
+ * an unusable state. Find FRMRs in this state and dereg / reg
+ * each. FRMRs that are VALID and attached to an rpcrdma_req are
+ * also torn down.
+ *
+ * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
+ *
+ * This is invoked only in the transport connect worker in order
+ * to serialize with rpcrdma_register_frmr_external().
+ */
+static void
+rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
+{
+ struct rpcrdma_xprt *r_xprt =
+ container_of(ia, struct rpcrdma_xprt, rx_ia);
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct list_head *pos;
+ struct rpcrdma_mw *r;
+ int rc;
+
+ list_for_each(pos, &buf->rb_all) {
+ r = list_entry(pos, struct rpcrdma_mw, mw_all);
+
+ if (r->r.frmr.fr_state == FRMR_IS_INVALID)
+ continue;
+
+ rc = ib_dereg_mr(r->r.frmr.fr_mr);
+ if (rc)
+ dprintk("RPC: %s: ib_dereg_mr failed %i\n",
+ __func__, rc);
+ ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
+
+ r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
+ ia->ri_max_frmr_depth);
+ if (IS_ERR(r->r.frmr.fr_mr)) {
+ rc = PTR_ERR(r->r.frmr.fr_mr);
+ dprintk("RPC: %s: ib_alloc_fast_reg_mr"
+ " failed %i\n", __func__, rc);
+ continue;
+ }
+ r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
+ ia->ri_id->device,
+ ia->ri_max_frmr_depth);
+ if (IS_ERR(r->r.frmr.fr_pgl)) {
+ rc = PTR_ERR(r->r.frmr.fr_pgl);
+ dprintk("RPC: %s: "
+ "ib_alloc_fast_reg_page_list "
+ "failed %i\n", __func__, rc);
+
+ ib_dereg_mr(r->r.frmr.fr_mr);
+ continue;
+ }
+ r->r.frmr.fr_state = FRMR_IS_INVALID;
+ }
+}
+
/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
* some req segments uninitialized.
*/
@@ -1587,7 +1650,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
dprintk("RPC: %s: Using frmr %p to map %d segments\n",
__func__, mw, i);

- if (unlikely(frmr->fr_state == FRMR_IS_VALID)) {
+ if (unlikely(frmr->fr_state != FRMR_IS_INVALID)) {
dprintk("RPC: %s: frmr %x left valid, posting invalidate.\n",
__func__, frmr->fr_mr->rkey);
/* Invalidate before using. */
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c1d8652..1ee6db3 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -161,6 +161,7 @@ struct rpcrdma_rep {
enum rpcrdma_frmr_state {
FRMR_IS_INVALID, /* ready to be used */
FRMR_IS_VALID, /* in use */
+ FRMR_IS_STALE, /* failed completion */
};

struct rpcrdma_frmr {


2014-07-09 16:56:50

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 02/21] xprtrdma: Protect ia->ri_id when unmapping/invalidating MRs

Ensure ia->ri_id remains valid while invoking dma_unmap_page() or
posting LOCAL_INV during a transport reconnect. Otherwise,
ia->ri_id->device or ia->ri_id->qp is NULL, which triggers a panic.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=259
Fixes: ec62f40 'xprtrdma: Ensure ia->ri_id->qp is not NULL when reconnecting'
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 23 +++++++++++++++++------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 78bd7c6..cab742d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -613,6 +613,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
/* Else will do memory reg/dereg for each chunk */
ia->ri_memreg_strategy = memreg;

+ rwlock_init(&ia->ri_qplock);
return 0;
out2:
rdma_destroy_id(ia->ri_id);
@@ -859,7 +860,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
int
rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
- struct rdma_cm_id *id;
+ struct rdma_cm_id *id, *old;
int rc = 0;
int retry_count = 0;

@@ -905,9 +906,14 @@ retry:
rc = -ENETUNREACH;
goto out;
}
- rdma_destroy_qp(ia->ri_id);
- rdma_destroy_id(ia->ri_id);
+
+ write_lock(&ia->ri_qplock);
+ old = ia->ri_id;
ia->ri_id = id;
+ write_unlock(&ia->ri_qplock);
+
+ rdma_destroy_qp(old);
+ rdma_destroy_id(old);
} else {
dprintk("RPC: %s: connecting...\n", __func__);
rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
@@ -1587,9 +1593,6 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
struct ib_send_wr invalidate_wr, *bad_wr;
int rc;

- while (seg1->mr_nsegs--)
- rpcrdma_unmap_one(ia, seg++);
-
memset(&invalidate_wr, 0, sizeof invalidate_wr);
invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
invalidate_wr.opcode = IB_WR_LOCAL_INV;
@@ -1597,7 +1600,11 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);

+ read_lock(&ia->ri_qplock);
+ while (seg1->mr_nsegs--)
+ rpcrdma_unmap_one(ia, seg++);
rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
+ read_unlock(&ia->ri_qplock);
if (rc)
dprintk("RPC: %s: failed ib_post_send for invalidate,"
" status %i\n", __func__, rc);
@@ -1658,8 +1665,10 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,

list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
rc = ib_unmap_fmr(&l);
+ read_lock(&ia->ri_qplock);
while (seg1->mr_nsegs--)
rpcrdma_unmap_one(ia, seg++);
+ read_unlock(&ia->ri_qplock);
if (rc)
dprintk("RPC: %s: failed ib_unmap_fmr,"
" status %i\n", __func__, rc);
@@ -1715,7 +1724,9 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,

#if RPCRDMA_PERSISTENT_REGISTRATION
case RPCRDMA_ALLPHYSICAL:
+ read_lock(&ia->ri_qplock);
rpcrdma_unmap_one(ia, seg);
+ read_unlock(&ia->ri_qplock);
break;
#endif

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 89e7cd4..97ca516 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -59,6 +59,7 @@
* Interface Adapter -- one per transport instance
*/
struct rpcrdma_ia {
+ rwlock_t ri_qplock;
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
struct ib_mr *ri_bind_mem;