2018-09-10 20:03:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 00/22] NFS/RDMA client patches for v4.20

A mixed bag of fixes and clean ups, the important ones being:

- Ensure credit grant is reduced to 1 after a reconnect
- Delete MRs instead of recovering them
- Support sending RPCs with both Write and Reply chunks
- Modernize the connect logic

Details can be found in the patch descriptions.

---

Chuck Lever (22):
xprtrdma: Reset credit grant properly after a disconnect
xprtrdma: Create more MRs at a time
xprtrdma: Explicitly resetting MRs is no longer necessary
xprtrdma: Name MR trace events consistently
xprtrdma: Refactor chunk encoding
xprtrdma: Refactor chunktype handling
xprtrdma: Support Write+Reply Replies
sunrpc: Fix connect metrics
sunrpc: Report connect_time in seconds
xprtrdma: Rename rpcrdma_conn_upcall
xprtrdma: Conventional variable names in rpcrdma_conn_upcall
xprtrdma: Eliminate "connstate" variable from rpcrdma_conn_upcall()
xprtrdma: Re-organize the switch() in rpcrdma_conn_upcall
xprtrdma: Simplify RPC wake-ups on connect
xprtrdma: Rename rpcrdma_qp_async_error_upcall
xprtrdma: Remove memory address of "ep" from an error message
svcrdma: Don't disable BH's in backchannel
xprtrdma: Move rb_flags initialization
xprtrdma: Report when there were zero posted Receives
xprtrdma: Add documenting comments
xprtrdma: Clean up xprt_rdma_disconnect_inject
xprtrdma: Squelch a sparse warning


include/trace/events/rpcrdma.h | 40 ++++----
net/sunrpc/xprt.c | 10 +-
net/sunrpc/xprtrdma/backchannel.c | 16 ++-
net/sunrpc/xprtrdma/fmr_ops.c | 131 ++++++++++++---------------
net/sunrpc/xprtrdma/frwr_ops.c | 137 +++++++++++-----------------
net/sunrpc/xprtrdma/rpc_rdma.c | 168 +++++++++++++++++++----------------
net/sunrpc/xprtrdma/transport.c | 103 +++++++++++----------
net/sunrpc/xprtrdma/verbs.c | 178 +++++++++++++++++++------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 23 ++---
net/sunrpc/xprtsock.c | 14 ++-
10 files changed, 400 insertions(+), 420 deletions(-)

--
Chuck Lever


2018-09-10 20:03:34

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 01/22] xprtrdma: Reset credit grant properly after a disconnect

On a fresh connection, an RPC/RDMA client is supposed to send only
one RPC Call until it gets a credit grant in the first RPC Reply
from the server [RFC 8166, Section 3.3.3].

There is a bug in the Linux client's credit accounting mechanism
introduced by commit e7ce710a8802 ("xprtrdma: Avoid deadlock when
credit window is reset"). On connect, it simply dumps all pending
RPC Calls onto the new connection.

Servers have been tolerant of this bad behavior. Currently no server
implementation ever changes its credit grant over reconnects, and
servers always repost enough Receives before connections are fully
established.

To correct this issue, ensure that the client resets both the credit
grant _and_ the congestion window when handling a reconnect.

Fixes: e7ce710a8802 ("xprtrdma: Avoid deadlock when credit ... ")
Signed-off-by: Chuck Lever <[email protected]>
Cc: [email protected]
---
net/sunrpc/xprtrdma/transport.c | 6 ++++++
1 file changed, 6 insertions(+)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 143ce25..98cbc7b 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -468,6 +468,12 @@
xprt->reestablish_timeout = 0;
xprt_disconnect_done(xprt);
rpcrdma_ep_disconnect(ep, ia);
+
+ /* Prepare @xprt for the next connection by reinitializing
+ * its credit grant to one (see RFC 8166, Section 3.3.3).
+ */
+ r_xprt->rx_buf.rb_credits = 1;
+ xprt->cwnd = RPC_CWNDSHIFT;
}

/**

2018-09-10 20:03:45

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 03/22] xprtrdma: Explicitly resetting MRs is no longer necessary

When a memory operation fails, the MR's driver state might not match
its hardware state. The only reliable recourse is to dereg the MR.
This is done in ->ro_recover_mr, which then attempts to allocate a
fresh MR to replace the released MR.

Since commit e2ac236c0b651 ("xprtrdma: Allocate MRs on demand"),
xprtrdma dynamically allocates MRs. It can add more MRs whenever
they are needed.

That makes it possible to simply release an MR when a memory
operation fails, instead of "recovering" it. It will automatically
be replaced by the on-demand MR allocator.

This commit is a little larger than I wanted, but it replaces
->ro_recover_mr, rb_recovery_lock, rb_recovery_worker, and the
rb_stale_mrs list with a generic work queue.

Since MRs are no longer orphaned, the mrs_orphaned metric is no
longer used.

Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 2 -
net/sunrpc/xprtrdma/fmr_ops.c | 124 +++++++++++++++++--------------------
net/sunrpc/xprtrdma/frwr_ops.c | 130 ++++++++++++++-------------------------
net/sunrpc/xprtrdma/rpc_rdma.c | 2 -
net/sunrpc/xprtrdma/transport.c | 2 -
net/sunrpc/xprtrdma/verbs.c | 38 -----------
net/sunrpc/xprtrdma/xprt_rdma.h | 14 ++--
7 files changed, 114 insertions(+), 198 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 53df203..9906f4d 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -655,7 +655,7 @@
DEFINE_MR_EVENT(xprtrdma_dma_map);
DEFINE_MR_EVENT(xprtrdma_dma_unmap);
DEFINE_MR_EVENT(xprtrdma_remoteinv);
-DEFINE_MR_EVENT(xprtrdma_recover_mr);
+DEFINE_MR_EVENT(xprtrdma_mr_recycle);

/**
** Reply events
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index db589a2..f1cf3a3 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -49,46 +49,7 @@ enum {
return true;
}

-static int
-fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
-{
- static struct ib_fmr_attr fmr_attr = {
- .max_pages = RPCRDMA_MAX_FMR_SGES,
- .max_maps = 1,
- .page_shift = PAGE_SHIFT
- };
-
- mr->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES,
- sizeof(u64), GFP_KERNEL);
- if (!mr->fmr.fm_physaddrs)
- goto out_free;
-
- mr->mr_sg = kcalloc(RPCRDMA_MAX_FMR_SGES,
- sizeof(*mr->mr_sg), GFP_KERNEL);
- if (!mr->mr_sg)
- goto out_free;
-
- sg_init_table(mr->mr_sg, RPCRDMA_MAX_FMR_SGES);
-
- mr->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS,
- &fmr_attr);
- if (IS_ERR(mr->fmr.fm_mr))
- goto out_fmr_err;
-
- INIT_LIST_HEAD(&mr->mr_list);
- return 0;
-
-out_fmr_err:
- dprintk("RPC: %s: ib_alloc_fmr returned %ld\n", __func__,
- PTR_ERR(mr->fmr.fm_mr));
-
-out_free:
- kfree(mr->mr_sg);
- kfree(mr->fmr.fm_physaddrs);
- return -ENOMEM;
-}
-
-static int
+static void
__fmr_unmap(struct rpcrdma_mr *mr)
{
LIST_HEAD(l);
@@ -97,13 +58,16 @@ enum {
list_add(&mr->fmr.fm_mr->list, &l);
rc = ib_unmap_fmr(&l);
list_del(&mr->fmr.fm_mr->list);
- return rc;
+ if (rc)
+ pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n",
+ mr, rc);
}

+/* Release an MR.
+ */
static void
fmr_op_release_mr(struct rpcrdma_mr *mr)
{
- LIST_HEAD(unmap_list);
int rc;

kfree(mr->fmr.fm_physaddrs);
@@ -112,10 +76,7 @@ enum {
/* In case this one was left mapped, try to unmap it
* to prevent dealloc_fmr from failing with EBUSY
*/
- rc = __fmr_unmap(mr);
- if (rc)
- pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n",
- mr, rc);
+ __fmr_unmap(mr);

rc = ib_dealloc_fmr(mr->fmr.fm_mr);
if (rc)
@@ -125,28 +86,16 @@ enum {
kfree(mr);
}

-/* Reset of a single FMR.
+/* MRs are dynamically allocated, so simply clean up and release the MR.
+ * A replacement MR will subsequently be allocated on demand.
*/
static void
-fmr_op_recover_mr(struct rpcrdma_mr *mr)
+fmr_mr_recycle_worker(struct work_struct *work)
{
+ struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- int rc;

- /* ORDER: invalidate first */
- rc = __fmr_unmap(mr);
- if (rc)
- goto out_release;
-
- /* ORDER: then DMA unmap */
- rpcrdma_mr_unmap_and_put(mr);
-
- r_xprt->rx_stats.mrs_recovered++;
- return;
-
-out_release:
- pr_err("rpcrdma: FMR reset failed (%d), %p released\n", rc, mr);
- r_xprt->rx_stats.mrs_orphaned++;
+ trace_xprtrdma_mr_recycle(mr);

trace_xprtrdma_dma_unmap(mr);
ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
@@ -154,11 +103,51 @@ enum {

spin_lock(&r_xprt->rx_buf.rb_mrlock);
list_del(&mr->mr_all);
+ r_xprt->rx_stats.mrs_recycled++;
spin_unlock(&r_xprt->rx_buf.rb_mrlock);
-
fmr_op_release_mr(mr);
}

+static int
+fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
+{
+ static struct ib_fmr_attr fmr_attr = {
+ .max_pages = RPCRDMA_MAX_FMR_SGES,
+ .max_maps = 1,
+ .page_shift = PAGE_SHIFT
+ };
+
+ mr->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES,
+ sizeof(u64), GFP_KERNEL);
+ if (!mr->fmr.fm_physaddrs)
+ goto out_free;
+
+ mr->mr_sg = kcalloc(RPCRDMA_MAX_FMR_SGES,
+ sizeof(*mr->mr_sg), GFP_KERNEL);
+ if (!mr->mr_sg)
+ goto out_free;
+
+ sg_init_table(mr->mr_sg, RPCRDMA_MAX_FMR_SGES);
+
+ mr->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS,
+ &fmr_attr);
+ if (IS_ERR(mr->fmr.fm_mr))
+ goto out_fmr_err;
+
+ INIT_LIST_HEAD(&mr->mr_list);
+ INIT_WORK(&mr->mr_recycle, fmr_mr_recycle_worker);
+ return 0;
+
+out_fmr_err:
+ dprintk("RPC: %s: ib_alloc_fmr returned %ld\n", __func__,
+ PTR_ERR(mr->fmr.fm_mr));
+
+out_free:
+ kfree(mr->mr_sg);
+ kfree(mr->fmr.fm_physaddrs);
+ return -ENOMEM;
+}
+
/* On success, sets:
* ep->rep_attr.cap.max_send_wr
* ep->rep_attr.cap.max_recv_wr
@@ -312,7 +301,7 @@ enum {
r_xprt->rx_stats.local_inv_needed++;
rc = ib_unmap_fmr(&unmap_list);
if (rc)
- goto out_reset;
+ goto out_release;

/* ORDER: Now DMA unmap all of the req's MRs, and return
* them to the free MW list.
@@ -325,13 +314,13 @@ enum {

return;

-out_reset:
+out_release:
pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);

while (!list_empty(mrs)) {
mr = rpcrdma_mr_pop(mrs);
list_del(&mr->fmr.fm_mr->list);
- fmr_op_recover_mr(mr);
+ rpcrdma_mr_recycle(mr);
}
}

@@ -339,7 +328,6 @@ enum {
.ro_map = fmr_op_map,
.ro_send = fmr_op_send,
.ro_unmap_sync = fmr_op_unmap_sync,
- .ro_recover_mr = fmr_op_recover_mr,
.ro_open = fmr_op_open,
.ro_maxpages = fmr_op_maxpages,
.ro_init_mr = fmr_op_init_mr,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 1cc4db5..6594627 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -97,6 +97,44 @@
return false;
}

+static void
+frwr_op_release_mr(struct rpcrdma_mr *mr)
+{
+ int rc;
+
+ rc = ib_dereg_mr(mr->frwr.fr_mr);
+ if (rc)
+ pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
+ mr, rc);
+ kfree(mr->mr_sg);
+ kfree(mr);
+}
+
+/* MRs are dynamically allocated, so simply clean up and release the MR.
+ * A replacement MR will subsequently be allocated on demand.
+ */
+static void
+frwr_mr_recycle_worker(struct work_struct *work)
+{
+ struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
+ enum rpcrdma_frwr_state state = mr->frwr.fr_state;
+ struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
+
+ trace_xprtrdma_mr_recycle(mr);
+
+ if (state != FRWR_FLUSHED_LI) {
+ trace_xprtrdma_dma_unmap(mr);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mr->mr_sg, mr->mr_nents, mr->mr_dir);
+ }
+
+ spin_lock(&r_xprt->rx_buf.rb_mrlock);
+ list_del(&mr->mr_all);
+ r_xprt->rx_stats.mrs_recycled++;
+ spin_unlock(&r_xprt->rx_buf.rb_mrlock);
+ frwr_op_release_mr(mr);
+}
+
static int
frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
{
@@ -113,6 +151,7 @@
goto out_list_err;

INIT_LIST_HEAD(&mr->mr_list);
+ INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
sg_init_table(mr->mr_sg, depth);
init_completion(&frwr->fr_linv_done);
return 0;
@@ -131,79 +170,6 @@
return rc;
}

-static void
-frwr_op_release_mr(struct rpcrdma_mr *mr)
-{
- int rc;
-
- rc = ib_dereg_mr(mr->frwr.fr_mr);
- if (rc)
- pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
- mr, rc);
- kfree(mr->mr_sg);
- kfree(mr);
-}
-
-static int
-__frwr_mr_reset(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
-{
- struct rpcrdma_frwr *frwr = &mr->frwr;
- int rc;
-
- rc = ib_dereg_mr(frwr->fr_mr);
- if (rc) {
- pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
- rc, mr);
- return rc;
- }
-
- frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype,
- ia->ri_max_frwr_depth);
- if (IS_ERR(frwr->fr_mr)) {
- pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
- PTR_ERR(frwr->fr_mr), mr);
- return PTR_ERR(frwr->fr_mr);
- }
-
- dprintk("RPC: %s: recovered FRWR %p\n", __func__, frwr);
- frwr->fr_state = FRWR_IS_INVALID;
- return 0;
-}
-
-/* Reset of a single FRWR. Generate a fresh rkey by replacing the MR.
- */
-static void
-frwr_op_recover_mr(struct rpcrdma_mr *mr)
-{
- enum rpcrdma_frwr_state state = mr->frwr.fr_state;
- struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- int rc;
-
- rc = __frwr_mr_reset(ia, mr);
- if (state != FRWR_FLUSHED_LI) {
- trace_xprtrdma_dma_unmap(mr);
- ib_dma_unmap_sg(ia->ri_device,
- mr->mr_sg, mr->mr_nents, mr->mr_dir);
- }
- if (rc)
- goto out_release;
-
- rpcrdma_mr_put(mr);
- r_xprt->rx_stats.mrs_recovered++;
- return;
-
-out_release:
- pr_err("rpcrdma: FRWR reset failed %d, %p released\n", rc, mr);
- r_xprt->rx_stats.mrs_orphaned++;
-
- spin_lock(&r_xprt->rx_buf.rb_mrlock);
- list_del(&mr->mr_all);
- spin_unlock(&r_xprt->rx_buf.rb_mrlock);
-
- frwr_op_release_mr(mr);
-}
-
/* On success, sets:
* ep->rep_attr.cap.max_send_wr
* ep->rep_attr.cap.max_recv_wr
@@ -385,7 +351,7 @@
mr = NULL;
do {
if (mr)
- rpcrdma_mr_defer_recovery(mr);
+ rpcrdma_mr_recycle(mr);
mr = rpcrdma_mr_get(r_xprt);
if (!mr)
return ERR_PTR(-EAGAIN);
@@ -452,7 +418,7 @@
out_mapmr_err:
pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
frwr->fr_mr, n, mr->mr_nents);
- rpcrdma_mr_defer_recovery(mr);
+ rpcrdma_mr_recycle(mr);
return ERR_PTR(-EIO);
}

@@ -571,7 +537,7 @@
if (bad_wr != first)
wait_for_completion(&frwr->fr_linv_done);
if (rc)
- goto reset_mrs;
+ goto out_release;

/* ORDER: Now DMA unmap all of the MRs, and return
* them to the free MR list.
@@ -583,22 +549,21 @@
}
return;

-reset_mrs:
+out_release:
pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc);

- /* Find and reset the MRs in the LOCAL_INV WRs that did not
+ /* Unmap and release the MRs in the LOCAL_INV WRs that did not
* get posted.
*/
while (bad_wr) {
frwr = container_of(bad_wr, struct rpcrdma_frwr,
fr_invwr);
mr = container_of(frwr, struct rpcrdma_mr, frwr);
-
- __frwr_mr_reset(ia, mr);
-
bad_wr = bad_wr->next;
+
+ list_del(&mr->mr_list);
+ frwr_op_release_mr(mr);
}
- goto unmap;
}

const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
@@ -606,7 +571,6 @@
.ro_send = frwr_op_send,
.ro_reminv = frwr_op_reminv,
.ro_unmap_sync = frwr_op_unmap_sync,
- .ro_recover_mr = frwr_op_recover_mr,
.ro_open = frwr_op_open,
.ro_maxpages = frwr_op_maxpages,
.ro_init_mr = frwr_op_init_mr,
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 33da912..ec4b1f9 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -803,7 +803,7 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr *mr;

mr = rpcrdma_mr_pop(&req->rl_registered);
- rpcrdma_mr_defer_recovery(mr);
+ rpcrdma_mr_recycle(mr);
}

/* This implementation supports the following combinations
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 98cbc7b..3ae73e6 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -792,7 +792,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
r_xprt->rx_stats.bad_reply_count,
r_xprt->rx_stats.nomsg_call_count);
seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
- r_xprt->rx_stats.mrs_recovered,
+ r_xprt->rx_stats.mrs_recycled,
r_xprt->rx_stats.mrs_orphaned,
r_xprt->rx_stats.mrs_allocated,
r_xprt->rx_stats.local_inv_needed,
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 5625a50..88fe75e 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -978,39 +978,6 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
}

static void
-rpcrdma_mr_recovery_worker(struct work_struct *work)
-{
- struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
- rb_recovery_worker.work);
- struct rpcrdma_mr *mr;
-
- spin_lock(&buf->rb_recovery_lock);
- while (!list_empty(&buf->rb_stale_mrs)) {
- mr = rpcrdma_mr_pop(&buf->rb_stale_mrs);
- spin_unlock(&buf->rb_recovery_lock);
-
- trace_xprtrdma_recover_mr(mr);
- mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr);
-
- spin_lock(&buf->rb_recovery_lock);
- }
- spin_unlock(&buf->rb_recovery_lock);
-}
-
-void
-rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr)
-{
- struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-
- spin_lock(&buf->rb_recovery_lock);
- rpcrdma_mr_push(mr, &buf->rb_stale_mrs);
- spin_unlock(&buf->rb_recovery_lock);
-
- schedule_delayed_work(&buf->rb_recovery_worker, 0);
-}
-
-static void
rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
@@ -1142,14 +1109,10 @@ struct rpcrdma_req *
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_mrlock);
spin_lock_init(&buf->rb_lock);
- spin_lock_init(&buf->rb_recovery_lock);
INIT_LIST_HEAD(&buf->rb_mrs);
INIT_LIST_HEAD(&buf->rb_all);
- INIT_LIST_HEAD(&buf->rb_stale_mrs);
INIT_DELAYED_WORK(&buf->rb_refresh_worker,
rpcrdma_mr_refresh_worker);
- INIT_DELAYED_WORK(&buf->rb_recovery_worker,
- rpcrdma_mr_recovery_worker);

rpcrdma_mrs_create(r_xprt);

@@ -1233,7 +1196,6 @@ struct rpcrdma_req *
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
- cancel_delayed_work_sync(&buf->rb_recovery_worker);
cancel_delayed_work_sync(&buf->rb_refresh_worker);

rpcrdma_sendctxs_destroy(buf);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2ca14f7..eae2166 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -280,6 +280,7 @@ struct rpcrdma_mr {
u32 mr_handle;
u32 mr_length;
u64 mr_offset;
+ struct work_struct mr_recycle;
struct list_head mr_all;
};

@@ -411,9 +412,6 @@ struct rpcrdma_buffer {

u32 rb_bc_max_requests;

- spinlock_t rb_recovery_lock; /* protect rb_stale_mrs */
- struct list_head rb_stale_mrs;
- struct delayed_work rb_recovery_worker;
struct delayed_work rb_refresh_worker;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -452,7 +450,7 @@ struct rpcrdma_stats {
unsigned long hardway_register_count;
unsigned long failed_marshal_count;
unsigned long bad_reply_count;
- unsigned long mrs_recovered;
+ unsigned long mrs_recycled;
unsigned long mrs_orphaned;
unsigned long mrs_allocated;
unsigned long empty_sendctx_q;
@@ -481,7 +479,6 @@ struct rpcrdma_memreg_ops {
struct list_head *mrs);
void (*ro_unmap_sync)(struct rpcrdma_xprt *,
struct list_head *);
- void (*ro_recover_mr)(struct rpcrdma_mr *mr);
int (*ro_open)(struct rpcrdma_ia *,
struct rpcrdma_ep *,
struct rpcrdma_create_data_internal *);
@@ -578,7 +575,12 @@ int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt);
void rpcrdma_mr_put(struct rpcrdma_mr *mr);
void rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr);
-void rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr);
+
+static inline void
+rpcrdma_mr_recycle(struct rpcrdma_mr *mr)
+{
+ schedule_work(&mr->mr_recycle);
+}

struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_req *);

2018-09-10 20:03:40

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 02/22] xprtrdma: Create more MRs at a time

Some devices require more than 3 MRs to build a single 1MB I/O.
Ensure that rpcrdma_mrs_create() will add enough MRs to build that
I/O. Otherwise it's possible for ->send_request to loop acquiring
some MRs, not getting enough, getting called again, recycling the
previous MRs, then not getting enough, lather rinse repeat.

I'm "reusing" ia->ri_max_segs. All of its accessors seem to want the
maximum number of data segments plus two, so I'm going to bake that
into the initial calculation.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 1 +
net/sunrpc/xprtrdma/frwr_ops.c | 1 +
net/sunrpc/xprtrdma/rpc_rdma.c | 2 --
net/sunrpc/xprtrdma/verbs.c | 2 +-
4 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 0f7c465..db589a2 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -187,6 +187,7 @@ enum {

ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
RPCRDMA_MAX_FMR_SGES);
+ ia->ri_max_segs += 2; /* segments for head and tail buffers */
return 0;
}

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 1bb00dd..1cc4db5 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -276,6 +276,7 @@

ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
ia->ri_max_frwr_depth);
+ ia->ri_max_segs += 2; /* segments for head and tail buffers */
return 0;
}

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c8ae983..33da912 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -71,7 +71,6 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
size = RPCRDMA_HDRLEN_MIN;

/* Maximum Read list size */
- maxsegs += 2; /* segment for head and tail buffers */
size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);

/* Minimal Read chunk size */
@@ -97,7 +96,6 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
size = RPCRDMA_HDRLEN_MIN;

/* Maximum Write list size */
- maxsegs += 2; /* segment for head and tail buffers */
size = sizeof(__be32); /* segment count */
size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 956a5ea..5625a50 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1019,7 +1019,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
LIST_HEAD(free);
LIST_HEAD(all);

- for (count = 0; count < 3; count++) {
+ for (count = 0; count < ia->ri_max_segs; count++) {
struct rpcrdma_mr *mr;
int rc;


2018-09-10 20:03:50

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 04/22] xprtrdma: Name MR trace events consistently

Clean up the names of trace events related to MRs so that it's
easy to enable these with a glob.

Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 12 ++++++------
net/sunrpc/xprtrdma/fmr_ops.c | 6 +++---
net/sunrpc/xprtrdma/frwr_ops.c | 8 ++++----
net/sunrpc/xprtrdma/verbs.c | 2 +-
4 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 9906f4d..89e017b 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -263,7 +263,7 @@
);

#define DEFINE_MR_EVENT(name) \
- DEFINE_EVENT(xprtrdma_mr, name, \
+ DEFINE_EVENT(xprtrdma_mr, xprtrdma_mr_##name, \
TP_PROTO( \
const struct rpcrdma_mr *mr \
), \
@@ -651,11 +651,11 @@
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li);
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_wake);

-DEFINE_MR_EVENT(xprtrdma_localinv);
-DEFINE_MR_EVENT(xprtrdma_dma_map);
-DEFINE_MR_EVENT(xprtrdma_dma_unmap);
-DEFINE_MR_EVENT(xprtrdma_remoteinv);
-DEFINE_MR_EVENT(xprtrdma_mr_recycle);
+DEFINE_MR_EVENT(localinv);
+DEFINE_MR_EVENT(map);
+DEFINE_MR_EVENT(unmap);
+DEFINE_MR_EVENT(remoteinv);
+DEFINE_MR_EVENT(recycle);

/**
** Reply events
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index f1cf3a3..7f5632c 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -97,7 +97,7 @@ enum {

trace_xprtrdma_mr_recycle(mr);

- trace_xprtrdma_dma_unmap(mr);
+ trace_xprtrdma_mr_unmap(mr);
ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
mr->mr_sg, mr->mr_nents, mr->mr_dir);

@@ -234,7 +234,7 @@ enum {
mr->mr_sg, i, mr->mr_dir);
if (!mr->mr_nents)
goto out_dmamap_err;
- trace_xprtrdma_dma_map(mr);
+ trace_xprtrdma_mr_map(mr);

for (i = 0, dma_pages = mr->fmr.fm_physaddrs; i < mr->mr_nents; i++)
dma_pages[i] = sg_dma_address(&mr->mr_sg[i]);
@@ -295,7 +295,7 @@ enum {
list_for_each_entry(mr, mrs, mr_list) {
dprintk("RPC: %s: unmapping fmr %p\n",
__func__, &mr->fmr);
- trace_xprtrdma_localinv(mr);
+ trace_xprtrdma_mr_localinv(mr);
list_add_tail(&mr->fmr.fm_mr->list, &unmap_list);
}
r_xprt->rx_stats.local_inv_needed++;
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 6594627..fc6378cc 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -123,7 +123,7 @@
trace_xprtrdma_mr_recycle(mr);

if (state != FRWR_FLUSHED_LI) {
- trace_xprtrdma_dma_unmap(mr);
+ trace_xprtrdma_mr_unmap(mr);
ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
mr->mr_sg, mr->mr_nents, mr->mr_dir);
}
@@ -384,7 +384,7 @@
mr->mr_nents = ib_dma_map_sg(ia->ri_device, mr->mr_sg, i, mr->mr_dir);
if (!mr->mr_nents)
goto out_dmamap_err;
- trace_xprtrdma_dma_map(mr);
+ trace_xprtrdma_mr_map(mr);

ibmr = frwr->fr_mr;
n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
@@ -466,7 +466,7 @@
list_for_each_entry(mr, mrs, mr_list)
if (mr->mr_handle == rep->rr_inv_rkey) {
list_del_init(&mr->mr_list);
- trace_xprtrdma_remoteinv(mr);
+ trace_xprtrdma_mr_remoteinv(mr);
mr->frwr.fr_state = FRWR_IS_INVALID;
rpcrdma_mr_unmap_and_put(mr);
break; /* only one invalidated MR per RPC */
@@ -503,7 +503,7 @@
mr->frwr.fr_state = FRWR_IS_INVALID;

frwr = &mr->frwr;
- trace_xprtrdma_localinv(mr);
+ trace_xprtrdma_mr_localinv(mr);

frwr->fr_cqe.done = frwr_wc_localinv;
last = &frwr->fr_invwr;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 88fe75e..3a9a62d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1288,7 +1288,7 @@ struct rpcrdma_mr *
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;

- trace_xprtrdma_dma_unmap(mr);
+ trace_xprtrdma_mr_unmap(mr);
ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
mr->mr_sg, mr->mr_nents, mr->mr_dir);
__rpcrdma_mr_put(&r_xprt->rx_buf, mr);

2018-09-10 20:03:56

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 05/22] xprtrdma: Refactor chunk encoding

Move the "not present" case into the individual chunk encoders. This
improves code organization and readability, and prepares the marsh-
aling code path for handling cases like "Write + Reply".

The reason for the original organization was to optimize this code
path for the case where there there are no chunks. The optimization
was inconsequential.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 37 ++++++++++++++++---------------------
1 file changed, 16 insertions(+), 21 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index ec4b1f9..39996df 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -346,6 +346,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
unsigned int pos;
int nsegs;

+ if (rtype == rpcrdma_noch)
+ goto done;
+
pos = rqst->rq_snd_buf.head[0].iov_len;
if (rtype == rpcrdma_areadch)
pos = 0;
@@ -370,7 +373,8 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
nsegs -= mr->mr_nents;
} while (nsegs);

- return 0;
+done:
+ return encode_item_not_present(xdr);
}

/* Register and XDR encode the Write list. Supports encoding a list
@@ -398,6 +402,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
int nsegs, nchunks;
__be32 *segcount;

+ if (wtype != rpcrdma_writech)
+ goto done;
+
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
rqst->rq_rcv_buf.head[0].iov_len,
@@ -433,7 +440,8 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
/* Update count of segments in this Write chunk */
*segcount = cpu_to_be32(nchunks);

- return 0;
+done:
+ return encode_item_not_present(xdr);
}

/* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -458,6 +466,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
int nsegs, nchunks;
__be32 *segcount;

+ if (wtype != rpcrdma_replych)
+ return encode_item_not_present(xdr);
+
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
if (nsegs < 0)
@@ -828,31 +839,15 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
* send a Call message with a Position Zero Read chunk and a
* regular Read chunk at the same time.
*/
- if (rtype != rpcrdma_noch) {
- ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
- if (ret)
- goto out_err;
- }
- ret = encode_item_not_present(xdr);
+ ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
if (ret)
goto out_err;
-
- if (wtype == rpcrdma_writech) {
- ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
- if (ret)
- goto out_err;
- }
- ret = encode_item_not_present(xdr);
+ ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
if (ret)
goto out_err;
-
- if (wtype != rpcrdma_replych)
- ret = encode_item_not_present(xdr);
- else
- ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
+ ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
if (ret)
goto out_err;
-
trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);

ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),

2018-09-10 20:04:07

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 07/22] xprtrdma: Support Write+Reply Replies

Currently the client handles a large NFS READ request by providing
the server with a Write chunk, and expecting that the non-payload
part of the RPC Reply will always fit inline.

When the inline threshold is small (for instance, when talking to a
server that uses a 1024-byte threshold) the non-payload part of the
Reply might not fit inline in certain rare cases. The server has to
drop the Reply or return an ERR_CHUNK and the RPC transaction fails.

Let's add a little logic to recognize when the non-payload part of
an NFS READ might be large, and marshal both a Write chunk and a
Reply chunk to enable the server to send the payload in the Write
chunk and the large non-payload part in the Reply chunk.

I've never seen this failure in the wild.

Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 4 ++
net/sunrpc/xprtrdma/rpc_rdma.c | 63 +++++++++++++++++++++++++--------------
net/sunrpc/xprtrdma/xprt_rdma.h | 3 +-
3 files changed, 46 insertions(+), 24 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index b9e6802..cd3e5e7 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -446,6 +446,7 @@
TRACE_DEFINE_ENUM(rpcrdma_areadch);
TRACE_DEFINE_ENUM(rpcrdma_writech);
TRACE_DEFINE_ENUM(rpcrdma_replych);
+TRACE_DEFINE_ENUM(rpcrdma_writereply);

#define xprtrdma_show_chunktype(x) \
__print_symbolic(x, \
@@ -453,7 +454,8 @@
{ rpcrdma_readch, "read list" }, \
{ rpcrdma_areadch, "*read list" }, \
{ rpcrdma_writech, "write list" }, \
- { rpcrdma_replych, "reply chunk" })
+ { rpcrdma_replych, "reply chunk" }, \
+ { rpcrdma_writereply, "write+reply" })

TRACE_EVENT(xprtrdma_marshal,
TP_PROTO(
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 26640e6..3594562 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -202,21 +202,20 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
*/
static int
rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
- unsigned int pos, struct rpcrdma_mr_seg *seg,
- bool omit_xdr_pad)
+ unsigned int pos, unsigned int page_len,
+ struct rpcrdma_mr_seg *seg, bool omit_xdr_pad)
{
unsigned long page_base;
- unsigned int len, n;
struct page **ppages;
+ unsigned int n;

n = 0;
if (pos == 0)
seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);

- len = xdrbuf->page_len;
ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
page_base = offset_in_page(xdrbuf->page_base);
- while (len) {
+ while (page_len) {
if (unlikely(!*ppages)) {
/* XXX: Certain upper layer operations do
* not provide receive buffer pages.
@@ -227,8 +226,8 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
}
seg->mr_page = *ppages;
seg->mr_offset = (char *)page_base;
- seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
- len -= seg->mr_len;
+ seg->mr_len = min_t(u32, PAGE_SIZE - page_base, page_len);
+ page_len -= seg->mr_len;
++ppages;
++seg;
++n;
@@ -352,8 +351,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
}

seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, seg,
- omit_xdr_pad);
+ nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
+ rqst->rq_snd_buf.page_len,
+ seg, omit_xdr_pad);
if (nsegs < 0)
return nsegs;

@@ -401,8 +401,13 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
int nsegs, nchunks;
__be32 *segcount;

- if (restype != rpcrdma_writech)
+ switch (restype) {
+ case rpcrdma_writech:
+ case rpcrdma_writereply:
+ break;
+ default:
goto done;
+ }

/* When encoding a Write chunk, some servers need to see an
* extra segment for non-XDR-aligned Write chunks. The upper
@@ -411,8 +416,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
*/
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
- rqst->rq_rcv_buf.head[0].iov_len, seg,
- r_xprt->rx_ia.ri_implicit_roundup);
+ rqst->rq_rcv_buf.head[0].iov_len,
+ rqst->rq_rcv_buf.page_len,
+ seg, r_xprt->rx_ia.ri_implicit_roundup);
if (nsegs < 0)
return nsegs;

@@ -468,14 +474,24 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
struct rpcrdma_mr *mr;
+ unsigned int page_len;
int nsegs, nchunks;
__be32 *segcount;

- if (restype != rpcrdma_replych)
+ switch (restype) {
+ case rpcrdma_replych:
+ page_len = rqst->rq_rcv_buf.page_len;
+ break;
+ case rpcrdma_writereply:
+ page_len = 0;
+ break;
+ default:
return encode_item_not_present(xdr);
+ }

seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, seg, false);
+ nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0,
+ page_len, seg, false);
if (nsegs < 0)
return nsegs;

@@ -775,16 +791,21 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
*
* o If the expected result is under the inline threshold, all ops
* return as inline.
- * o Large read ops return data as write chunk(s), header as
- * inline.
+ * o Large read ops return data as a write chunk and
+ * small header as inline, large header as a reply chunk.
* o Large non-read ops return as a single reply chunk.
*/
if (rpcrdma_results_inline(r_xprt, rqst))
restype = rpcrdma_noch;
- else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
+ else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) {
restype = rpcrdma_writech;
- else
+ if ((rqst->rq_rcv_buf.head[0].iov_len +
+ rqst->rq_rcv_buf.tail[0].iov_len) >
+ r_xprt->rx_ia.ri_max_inline_read)
+ restype = rpcrdma_writereply;
+ } else {
restype = rpcrdma_replych;
+ }

/*
* Chunks needed for arguments?
@@ -1163,14 +1184,12 @@ static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
return -EIO;

/* RDMA_NOMSG sanity checks */
- if (unlikely(writelist))
- return -EIO;
if (unlikely(!replychunk))
return -EIO;

/* Reply chunk buffer already is the reply vector */
- r_xprt->rx_stats.total_rdma_reply += replychunk;
- return replychunk;
+ r_xprt->rx_stats.total_rdma_reply += writelist + replychunk;
+ return writelist + replychunk;
}

static noinline int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index d29bf38..5e19bb59 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -627,7 +627,8 @@ enum rpcrdma_chunktype {
rpcrdma_readch,
rpcrdma_areadch,
rpcrdma_writech,
- rpcrdma_replych
+ rpcrdma_replych,
+ rpcrdma_writereply,
};

int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,

2018-09-10 20:04:02

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 06/22] xprtrdma: Refactor chunktype handling

This is a pre-requisite for adding support for sending both a Write
chunk and a Reply chunk in the same RPC. It is a refactoring/clean
up patch, so no functional change is expected.

rpcrdma_convert_iovs is a helper function that is invoked by each
chunk encoder. It's passed an enum rpcrdma_chunktype variable that
indicates either the set of chunks needed for the Call arguments or
the Reply results.

While marshaling the same RPC, rpcrdma_convert_iovs is invoked once
for the Write chunk and once for the Reply chunk. But it's passed
the same @type both times. Thus it can't distinguish between when
it's called to encode the Write chunk versus when it's called to
encode the Reply chunk.

Let's refactor rpcrdma_convert_iovs so it needs just a detail about
how the chunk should be handled. That detail is different for Write
chunks and Reply chunks.

Looking at this change I realized that "wtype" is unhelpfully named
in the entire marshaling code path, since it will include both the
Write chunk list and the Reply chunk. As part of refactoring
rpcrdma_convert_iovs, rename wtype to better fit its purpose. Update
"rtype" as well to match this convention.

Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 18 ++++---
net/sunrpc/xprtrdma/rpc_rdma.c | 100 +++++++++++++++++++++------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 2 -
3 files changed, 63 insertions(+), 57 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 89e017b..b9e6802 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -459,11 +459,11 @@
TP_PROTO(
const struct rpc_rqst *rqst,
unsigned int hdrlen,
- unsigned int rtype,
- unsigned int wtype
+ unsigned int argtype,
+ unsigned int restype
),

- TP_ARGS(rqst, hdrlen, rtype, wtype),
+ TP_ARGS(rqst, hdrlen, argtype, restype),

TP_STRUCT__entry(
__field(unsigned int, task_id)
@@ -473,8 +473,8 @@
__field(unsigned int, headlen)
__field(unsigned int, pagelen)
__field(unsigned int, taillen)
- __field(unsigned int, rtype)
- __field(unsigned int, wtype)
+ __field(unsigned int, argtype)
+ __field(unsigned int, restype)
),

TP_fast_assign(
@@ -485,16 +485,16 @@
__entry->headlen = rqst->rq_snd_buf.head[0].iov_len;
__entry->pagelen = rqst->rq_snd_buf.page_len;
__entry->taillen = rqst->rq_snd_buf.tail[0].iov_len;
- __entry->rtype = rtype;
- __entry->wtype = wtype;
+ __entry->argtype = argtype;
+ __entry->restype = restype;
),

TP_printk("task:%u@%u xid=0x%08x: hdr=%u xdr=%u/%u/%u %s/%s",
__entry->task_id, __entry->client_id, __entry->xid,
__entry->hdrlen,
__entry->headlen, __entry->pagelen, __entry->taillen,
- xprtrdma_show_chunktype(__entry->rtype),
- xprtrdma_show_chunktype(__entry->wtype)
+ xprtrdma_show_chunktype(__entry->argtype),
+ xprtrdma_show_chunktype(__entry->restype)
)
);

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 39996df..26640e6 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -200,11 +200,10 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
*
* Returns positive number of SGEs consumed, or a negative errno.
*/
-
static int
rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
- unsigned int pos, enum rpcrdma_chunktype type,
- struct rpcrdma_mr_seg *seg)
+ unsigned int pos, struct rpcrdma_mr_seg *seg,
+ bool omit_xdr_pad)
{
unsigned long page_base;
unsigned int len, n;
@@ -236,18 +235,10 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
page_base = 0;
}

- /* When encoding a Read chunk, the tail iovec contains an
- * XDR pad and may be omitted.
- */
- if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
- goto out;
-
- /* When encoding a Write chunk, some servers need to see an
- * extra segment for non-XDR-aligned Write chunks. The upper
- * layer provides space in the tail iovec that may be used
- * for this purpose.
+ /* A normal Read or Write chunk may omit the XDR pad.
+ * The upper layer provides the pad in @xdrbuf's tail.
*/
- if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
+ if (omit_xdr_pad)
goto out;

if (xdrbuf->tail[0].iov_len)
@@ -338,23 +329,31 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
*/
static noinline int
rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
+ struct rpc_rqst *rqst, enum rpcrdma_chunktype argtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
struct rpcrdma_mr *mr;
+ bool omit_xdr_pad;
unsigned int pos;
int nsegs;

- if (rtype == rpcrdma_noch)
+ switch (argtype) {
+ case rpcrdma_readch:
+ omit_xdr_pad = r_xprt->rx_ia.ri_implicit_roundup;
+ pos = rqst->rq_snd_buf.head[0].iov_len;
+ break;
+ case rpcrdma_areadch:
+ omit_xdr_pad = false;
+ pos = 0;
+ break;
+ default:
goto done;
+ }

- pos = rqst->rq_snd_buf.head[0].iov_len;
- if (rtype == rpcrdma_areadch)
- pos = 0;
seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
- rtype, seg);
+ nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, seg,
+ omit_xdr_pad);
if (nsegs < 0)
return nsegs;

@@ -394,7 +393,7 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
*/
static noinline int
rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+ struct rpc_rqst *rqst, enum rpcrdma_chunktype restype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -402,13 +401,18 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
int nsegs, nchunks;
__be32 *segcount;

- if (wtype != rpcrdma_writech)
+ if (restype != rpcrdma_writech)
goto done;

+ /* When encoding a Write chunk, some servers need to see an
+ * extra segment for non-XDR-aligned Write chunks. The upper
+ * layer provides space in the tail iovec that may be used
+ * for this purpose.
+ */
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
- rqst->rq_rcv_buf.head[0].iov_len,
- wtype, seg);
+ rqst->rq_rcv_buf.head[0].iov_len, seg,
+ r_xprt->rx_ia.ri_implicit_roundup);
if (nsegs < 0)
return nsegs;

@@ -457,8 +461,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
* @xdr is advanced to the next position in the stream.
*/
static noinline int
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype restype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -466,11 +471,11 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
int nsegs, nchunks;
__be32 *segcount;

- if (wtype != rpcrdma_replych)
+ if (restype != rpcrdma_replych)
return encode_item_not_present(xdr);

seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
+ nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, seg, false);
if (nsegs < 0)
return nsegs;

@@ -563,7 +568,7 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
*/
static bool
rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
- struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+ struct xdr_buf *xdr, enum rpcrdma_chunktype argtype)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
unsigned int sge_no, page_base, len, remaining;
@@ -591,7 +596,7 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
* well as additional content, and may not reside in the
* same page as the head iovec.
*/
- if (rtype == rpcrdma_readch) {
+ if (argtype == rpcrdma_readch) {
len = xdr->tail[0].iov_len;

/* Do not include the tail if it is only an XDR pad */
@@ -688,14 +693,14 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
* @req: context of RPC Call being marshalled
* @hdrlen: size of transport header, in bytes
* @xdr: xdr_buf containing RPC Call
- * @rtype: chunk type being encoded
+ * @argtype: chunk type being encoded
*
* Returns 0 on success; otherwise a negative errno is returned.
*/
int
rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
- struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+ struct xdr_buf *xdr, enum rpcrdma_chunktype argtype)
{
req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
if (!req->rl_sendctx)
@@ -708,8 +713,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
return -EIO;

- if (rtype != rpcrdma_areadch)
- if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
+ if (argtype != rpcrdma_areadch)
+ if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req,
+ xdr, argtype))
return -EIO;

return 0;
@@ -739,7 +745,7 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
{
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct xdr_stream *xdr = &req->rl_stream;
- enum rpcrdma_chunktype rtype, wtype;
+ enum rpcrdma_chunktype argtype, restype;
bool ddp_allowed;
__be32 *p;
int ret;
@@ -774,11 +780,11 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
* o Large non-read ops return as a single reply chunk.
*/
if (rpcrdma_results_inline(r_xprt, rqst))
- wtype = rpcrdma_noch;
+ restype = rpcrdma_noch;
else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
- wtype = rpcrdma_writech;
+ restype = rpcrdma_writech;
else
- wtype = rpcrdma_replych;
+ restype = rpcrdma_replych;

/*
* Chunks needed for arguments?
@@ -796,14 +802,14 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
*/
if (rpcrdma_args_inline(r_xprt, rqst)) {
*p++ = rdma_msg;
- rtype = rpcrdma_noch;
+ argtype = rpcrdma_noch;
} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
*p++ = rdma_msg;
- rtype = rpcrdma_readch;
+ argtype = rpcrdma_readch;
} else {
r_xprt->rx_stats.nomsg_call_count++;
*p++ = rdma_nomsg;
- rtype = rpcrdma_areadch;
+ argtype = rpcrdma_areadch;
}

/* If this is a retransmit, discard previously registered
@@ -839,19 +845,19 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
* send a Call message with a Position Zero Read chunk and a
* regular Read chunk at the same time.
*/
- ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
+ ret = rpcrdma_encode_read_list(r_xprt, req, rqst, argtype);
if (ret)
goto out_err;
- ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
+ ret = rpcrdma_encode_write_list(r_xprt, req, rqst, restype);
if (ret)
goto out_err;
- ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
+ ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, restype);
if (ret)
goto out_err;
- trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
+ trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), argtype, restype);

ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
- &rqst->rq_snd_buf, rtype);
+ &rqst->rq_snd_buf, argtype);
if (ret)
goto out_err;
return 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index eae2166..d29bf38 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -633,7 +633,7 @@ enum rpcrdma_chunktype {
int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr,
- enum rpcrdma_chunktype rtype);
+ enum rpcrdma_chunktype argtype);
void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc);
int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);

2018-09-10 20:04:17

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 09/22] sunrpc: Report connect_time in seconds

The way connection-oriented transports report connect_time is wrong:
it's supposed to be in seconds, not in jiffies.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 2 +-
net/sunrpc/xprtsock.c | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 087acfc..289d13c 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -776,7 +776,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
0, /* need a local port? */
xprt->stat.bind_count,
xprt->stat.connect_count,
- xprt->stat.connect_time,
+ xprt->stat.connect_time / HZ,
idle_time,
xprt->stat.sends,
xprt->stat.recvs,
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index e146caa..9bbc395 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2563,7 +2563,7 @@ static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
"%llu %llu %lu %llu %llu\n",
xprt->stat.bind_count,
xprt->stat.connect_count,
- xprt->stat.connect_time,
+ xprt->stat.connect_time / HZ,
idle_time,
xprt->stat.sends,
xprt->stat.recvs,
@@ -2618,7 +2618,7 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
transport->srcport,
xprt->stat.bind_count,
xprt->stat.connect_count,
- xprt->stat.connect_time,
+ xprt->stat.connect_time / HZ,
idle_time,
xprt->stat.sends,
xprt->stat.recvs,

2018-09-10 20:04:12

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 08/22] sunrpc: Fix connect metrics

For TCP, the logic in xprt_connect_status is currently never invoked
to record a successful connection. Commit 2a4919919a97 ("SUNRPC:
Return EAGAIN instead of ENOTCONN when waking up xprt->pending")
changed the way TCP xprt's are awoken after a connect succeeds.

Instead, change connection-oriented transports to bump connect_count
and compute connect_time the moment that XPRT_CONNECTED is set.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprt.c | 10 +++-------
net/sunrpc/xprtrdma/transport.c | 6 +++++-
net/sunrpc/xprtsock.c | 10 ++++++----
3 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index a8db2e3f..f03ffa2 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -791,15 +791,11 @@ static void xprt_connect_status(struct rpc_task *task)
{
struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;

- if (task->tk_status == 0) {
- xprt->stat.connect_count++;
- xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
+ switch (task->tk_status) {
+ case 0:
dprintk("RPC: %5u xprt_connect_status: connection established\n",
task->tk_pid);
- return;
- }
-
- switch (task->tk_status) {
+ break;
case -ECONNREFUSED:
case -ECONNRESET:
case -ECONNABORTED:
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 3ae73e6..087acfc 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -242,8 +242,12 @@

spin_lock_bh(&xprt->transport_lock);
if (ep->rep_connected > 0) {
- if (!xprt_test_and_set_connected(xprt))
+ if (!xprt_test_and_set_connected(xprt)) {
+ xprt->stat.connect_count++;
+ xprt->stat.connect_time += (long)jiffies -
+ xprt->stat.connect_start;
xprt_wake_pending_tasks(xprt, 0);
+ }
} else {
if (xprt_test_and_clear_connected(xprt))
xprt_wake_pending_tasks(xprt, -ENOTCONN);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 6b7539c..e146caa 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1611,6 +1611,9 @@ static void xs_tcp_state_change(struct sock *sk)
clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
xprt_clear_connecting(xprt);

+ xprt->stat.connect_count++;
+ xprt->stat.connect_time += (long)jiffies -
+ xprt->stat.connect_start;
xprt_wake_pending_tasks(xprt, -EAGAIN);
}
spin_unlock(&xprt->transport_lock);
@@ -2029,8 +2032,6 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
}

/* Tell the socket layer to start connecting... */
- xprt->stat.connect_count++;
- xprt->stat.connect_start = jiffies;
return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
}

@@ -2062,6 +2063,9 @@ static int xs_local_setup_socket(struct sock_xprt *transport)
case 0:
dprintk("RPC: xprt %p connected to %s\n",
xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
+ xprt->stat.connect_count++;
+ xprt->stat.connect_time += (long)jiffies -
+ xprt->stat.connect_start;
xprt_set_connected(xprt);
case -ENOBUFS:
break;
@@ -2387,8 +2391,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
xs_set_memalloc(xprt);

/* Tell the socket layer to start connecting... */
- xprt->stat.connect_count++;
- xprt->stat.connect_start = jiffies;
set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
switch (ret) {

2018-09-10 20:04:32

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 11/22] xprtrdma: Conventional variable names in rpcrdma_conn_upcall

Clean up: The convention throughout other parts of xprtrdma is to
name variables of type struct rpcrdma_xprt "r_xprt", not "xprt".
This convention enables the use of the name "xprt" for a "struct
rpc_xprt" type variable, as in other parts of the RPC client.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 23 ++++++++++++-----------
1 file changed, 12 insertions(+), 11 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 7bf0e65..f2c8c3c 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -230,14 +230,15 @@
static int
rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
- struct rpcrdma_xprt *xprt = id->context;
- struct rpcrdma_ia *ia = &xprt->rx_ia;
- struct rpcrdma_ep *ep = &xprt->rx_ep;
+ struct rpcrdma_xprt *r_xprt = id->context;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
int connstate = 0;

might_sleep();

- trace_xprtrdma_cm_event(xprt, event);
+ trace_xprtrdma_cm_event(r_xprt, event);
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
case RDMA_CM_EVENT_ROUTE_RESOLVED:
@@ -256,11 +257,11 @@
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
pr_info("rpcrdma: removing device %s for %s:%s\n",
ia->ri_device->name,
- rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt));
+ rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
#endif
set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
ep->rep_connected = -ENODEV;
- xprt_force_disconnect(&xprt->rx_xprt);
+ xprt_force_disconnect(xprt);
wait_for_completion(&ia->ri_remove_done);

ia->ri_id = NULL;
@@ -268,9 +269,9 @@
/* Return 1 to ensure the core destroys the id. */
return 1;
case RDMA_CM_EVENT_ESTABLISHED:
- ++xprt->rx_xprt.connect_cookie;
+ ++xprt->connect_cookie;
connstate = 1;
- rpcrdma_update_connect_private(xprt, &event->param.conn);
+ rpcrdma_update_connect_private(r_xprt, &event->param.conn);
goto connected;
case RDMA_CM_EVENT_CONNECT_ERROR:
connstate = -ENOTCONN;
@@ -280,14 +281,14 @@
goto connected;
case RDMA_CM_EVENT_REJECTED:
dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
- rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
+ rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
rdma_reject_msg(id, event->status));
connstate = -ECONNREFUSED;
if (event->status == IB_CM_REJ_STALE_CONN)
connstate = -EAGAIN;
goto connected;
case RDMA_CM_EVENT_DISCONNECTED:
- ++xprt->rx_xprt.connect_cookie;
+ ++xprt->connect_cookie;
connstate = -ECONNABORTED;
connected:
ep->rep_connected = connstate;
@@ -297,7 +298,7 @@
default:
dprintk("RPC: %s: %s:%s on %s/%s (ep 0x%p): %s\n",
__func__,
- rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
+ rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
ia->ri_device->name, ia->ri_ops->ro_displayname,
ep, rdma_event_msg(event->event));
break;

2018-09-10 20:04:44

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 14/22] xprtrdma: Simplify RPC wake-ups on connect

Currently, when a connection is established, rpcrdma_conn_upcall
invokes rpcrdma_conn_func and then
wake_up_all(&ep->rep_connect_wait). The former wakes waiting RPCs,
but the connect worker is not done yet, and that leads to races,
double wakes, and difficulty understanding how this logic is
supposed to work.

Instead, collect all the "connection established" logic in the
connect worker (xprt_rdma_connect_worker). A disconnect worker is
retained to handle provider upcalls safely.

Fixes: 254f91e2fa1f ("xprtrdma: RPC/RDMA must invoke ... ")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 54 +++++++++++++--------------------------
net/sunrpc/xprtrdma/verbs.c | 42 ++++++++++++++++++++++++------
net/sunrpc/xprtrdma/xprt_rdma.h | 4 +--
3 files changed, 52 insertions(+), 48 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 289d13c..d7c4255 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -225,51 +225,35 @@
}
}

-void
-rpcrdma_conn_func(struct rpcrdma_ep *ep)
-{
- schedule_delayed_work(&ep->rep_connect_worker, 0);
-}
-
-void
-rpcrdma_connect_worker(struct work_struct *work)
+/**
+ * xprt_rdma_connect_worker - establish connection in the background
+ * @work: worker thread context
+ *
+ * Requester holds the xprt's send lock to prevent activity on this
+ * transport while a fresh connection is being established. RPC tasks
+ * sleep on the xprt's pending queue waiting for connect to complete.
+ */
+static void
+xprt_rdma_connect_worker(struct work_struct *work)
{
- struct rpcrdma_ep *ep =
- container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
- struct rpcrdma_xprt *r_xprt =
- container_of(ep, struct rpcrdma_xprt, rx_ep);
+ struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
+ rx_connect_worker.work);
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ int rc;

- spin_lock_bh(&xprt->transport_lock);
- if (ep->rep_connected > 0) {
+ rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+ xprt_clear_connecting(xprt);
+ if (r_xprt->rx_ep.rep_connected > 0) {
if (!xprt_test_and_set_connected(xprt)) {
xprt->stat.connect_count++;
xprt->stat.connect_time += (long)jiffies -
xprt->stat.connect_start;
- xprt_wake_pending_tasks(xprt, 0);
+ xprt_wake_pending_tasks(xprt, -EAGAIN);
}
} else {
if (xprt_test_and_clear_connected(xprt))
- xprt_wake_pending_tasks(xprt, -ENOTCONN);
+ xprt_wake_pending_tasks(xprt, rc);
}
- spin_unlock_bh(&xprt->transport_lock);
-}
-
-static void
-xprt_rdma_connect_worker(struct work_struct *work)
-{
- struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
- rx_connect_worker.work);
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
- int rc = 0;
-
- xprt_clear_connected(xprt);
-
- rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
- if (rc)
- xprt_wake_pending_tasks(xprt, rc);
-
- xprt_clear_connecting(xprt);
}

static void
@@ -302,8 +286,6 @@

cancel_delayed_work_sync(&r_xprt->rx_connect_worker);

- xprt_clear_connected(xprt);
-
rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
rpcrdma_buffer_destroy(&r_xprt->rx_buf);
rpcrdma_ia_close(&r_xprt->rx_ia);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c60172f..abbd3cd 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -108,6 +108,25 @@
}
}

+/**
+ * rpcrdma_disconnect_worker - Force a disconnect
+ * @work: endpoint to be disconnected
+ *
+ * Provider callbacks can possibly run in an IRQ context. This function
+ * is invoked in a worker thread to guarantee that disconnect wake-up
+ * calls are always done in process context.
+ */
+static void
+rpcrdma_disconnect_worker(struct work_struct *work)
+{
+ struct rpcrdma_ep *ep = container_of(work, struct rpcrdma_ep,
+ rep_disconnect_worker.work);
+ struct rpcrdma_xprt *r_xprt =
+ container_of(ep, struct rpcrdma_xprt, rx_ep);
+
+ xprt_force_disconnect(&r_xprt->rx_xprt);
+}
+
static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
@@ -121,7 +140,7 @@

if (ep->rep_connected == 1) {
ep->rep_connected = -EIO;
- rpcrdma_conn_func(ep);
+ schedule_delayed_work(&ep->rep_disconnect_worker, 0);
wake_up_all(&ep->rep_connect_wait);
}
}
@@ -271,13 +290,14 @@
++xprt->connect_cookie;
ep->rep_connected = 1;
rpcrdma_update_connect_private(r_xprt, &event->param.conn);
- goto connected;
+ wake_up_all(&ep->rep_connect_wait);
+ break;
case RDMA_CM_EVENT_CONNECT_ERROR:
ep->rep_connected = -ENOTCONN;
- goto connected;
+ goto disconnected;
case RDMA_CM_EVENT_UNREACHABLE:
ep->rep_connected = -ENETUNREACH;
- goto connected;
+ goto disconnected;
case RDMA_CM_EVENT_REJECTED:
dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
@@ -285,12 +305,12 @@
ep->rep_connected = -ECONNREFUSED;
if (event->status == IB_CM_REJ_STALE_CONN)
ep->rep_connected = -EAGAIN;
- goto connected;
+ goto disconnected;
case RDMA_CM_EVENT_DISCONNECTED:
++xprt->connect_cookie;
ep->rep_connected = -ECONNABORTED;
-connected:
- rpcrdma_conn_func(ep);
+disconnected:
+ xprt_force_disconnect(xprt);
wake_up_all(&ep->rep_connect_wait);
break;
default:
@@ -550,7 +570,8 @@
cdata->max_requests >> 2);
ep->rep_send_count = ep->rep_send_batch;
init_waitqueue_head(&ep->rep_connect_wait);
- INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
+ INIT_DELAYED_WORK(&ep->rep_disconnect_worker,
+ rpcrdma_disconnect_worker);

sendcq = ib_alloc_cq(ia->ri_device, NULL,
ep->rep_attr.cap.max_send_wr + 1,
@@ -623,7 +644,7 @@
void
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
- cancel_delayed_work_sync(&ep->rep_connect_worker);
+ cancel_delayed_work_sync(&ep->rep_disconnect_worker);

if (ia->ri_id && ia->ri_id->qp) {
rpcrdma_ep_disconnect(ep, ia);
@@ -736,6 +757,7 @@
{
struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
rx_ia);
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
int rc;

retry:
@@ -762,6 +784,8 @@
}

ep->rep_connected = 0;
+ xprt_clear_connected(xprt);
+
rpcrdma_post_recvs(r_xprt, true);

rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 5e19bb59..9413a20 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -101,7 +101,7 @@ struct rpcrdma_ep {
wait_queue_head_t rep_connect_wait;
struct rpcrdma_connect_private rep_cm_private;
struct rdma_conn_param rep_remote_cma;
- struct delayed_work rep_connect_worker;
+ struct delayed_work rep_disconnect_worker;
};

/* Pre-allocate extra Work Requests for handling backward receives
@@ -556,7 +556,6 @@ int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
struct rpcrdma_create_data_internal *);
void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
-void rpcrdma_conn_func(struct rpcrdma_ep *ep);
void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);

int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
@@ -655,7 +654,6 @@ static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
extern unsigned int xprt_rdma_max_inline_read;
void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
void xprt_rdma_free_addresses(struct rpc_xprt *xprt);
-void rpcrdma_connect_worker(struct work_struct *work);
void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq);
int xprt_rdma_init(void);
void xprt_rdma_cleanup(void);

2018-09-10 20:04:54

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 16/22] xprtrdma: Remove memory address of "ep" from an error message

Clean up: Replace the hashed memory address of the target rpcrdma_ep
with the server's IP address and port. The server address is more
useful in an administrative error message.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 72bf811..6ac6183 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -143,8 +143,9 @@
rx_ep);

trace_xprtrdma_qp_event(r_xprt, event);
- pr_err("rpcrdma: %s on device %s ep %p\n",
- ib_event_msg(event->event), event->device->name, context);
+ pr_err("rpcrdma: %s on device %s connected to %s:%s\n",
+ ib_event_msg(event->event), event->device->name,
+ rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));

if (ep->rep_connected == 1) {
ep->rep_connected = -EIO;

2018-09-10 20:04:22

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 10/22] xprtrdma: Rename rpcrdma_conn_upcall

Clean up: Use a function name that is consistent with the RDMA core
API and with other consumers. Because this is a function that is
invoked from outside the rpcrdma.ko module, add an appropriate
documenting comment.

Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 2 +-
net/sunrpc/xprtrdma/verbs.c | 16 +++++++++++++---
2 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index cd3e5e7..d3d05fd 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -306,7 +306,7 @@
** Connection events
**/

-TRACE_EVENT(xprtrdma_conn_upcall,
+TRACE_EVENT(xprtrdma_cm_event,
TP_PROTO(
const struct rpcrdma_xprt *r_xprt,
struct rdma_cm_event *event
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 3a9a62d..7bf0e65 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -219,15 +219,25 @@
rpcrdma_set_max_header_sizes(r_xprt);
}

+/**
+ * rpcrdma_cm_event_handler - Handle RDMA CM events
+ * @id: rdma_cm_id on which an event has occurred
+ * @event: details of the event
+ *
+ * Called with @id's mutex held. Returns 1 if caller should
+ * destroy @id, otherwise 0.
+ */
static int
-rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
+rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
struct rpcrdma_xprt *xprt = id->context;
struct rpcrdma_ia *ia = &xprt->rx_ia;
struct rpcrdma_ep *ep = &xprt->rx_ep;
int connstate = 0;

- trace_xprtrdma_conn_upcall(xprt, event);
+ might_sleep();
+
+ trace_xprtrdma_cm_event(xprt, event);
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
case RDMA_CM_EVENT_ROUTE_RESOLVED:
@@ -308,7 +318,7 @@
init_completion(&ia->ri_done);
init_completion(&ia->ri_remove_done);

- id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_conn_upcall,
+ id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
xprt, RDMA_PS_TCP, IB_QPT_RC);
if (IS_ERR(id)) {
rc = PTR_ERR(id);

2018-09-10 20:04:38

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 13/22] xprtrdma: Re-organize the switch() in rpcrdma_conn_upcall

Clean up: Eliminate the FALLTHROUGH into the default arm to make the
switch easier to read.

Also, as long as I'm here, do not display the memory address of the
target rpcrdma_ep. A hashed memory address is of marginal use here.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 422d3db..c60172f 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -243,15 +243,15 @@
case RDMA_CM_EVENT_ROUTE_RESOLVED:
ia->ri_async_rc = 0;
complete(&ia->ri_done);
- break;
+ return 0;
case RDMA_CM_EVENT_ADDR_ERROR:
ia->ri_async_rc = -EPROTO;
complete(&ia->ri_done);
- break;
+ return 0;
case RDMA_CM_EVENT_ROUTE_ERROR:
ia->ri_async_rc = -ENETUNREACH;
complete(&ia->ri_done);
- break;
+ return 0;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
pr_info("rpcrdma: removing device %s for %s:%s\n",
@@ -292,16 +292,15 @@
connected:
rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
- /*FALLTHROUGH*/
+ break;
default:
- dprintk("RPC: %s: %s:%s on %s/%s (ep 0x%p): %s\n",
- __func__,
- rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
- ia->ri_device->name, ia->ri_ops->ro_displayname,
- ep, rdma_event_msg(event->event));
break;
}

+ dprintk("RPC: %s: %s:%s on %s/%s: %s\n", __func__,
+ rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
+ ia->ri_device->name, ia->ri_ops->ro_displayname,
+ rdma_event_msg(event->event));
return 0;
}


2018-09-10 20:05:10

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 19/22] xprtrdma: Report when there were zero posted Receives

To show that a caller did attempt to allocate and post more Receive
buffers, the trace point in rpcrdma_post_recvs() should report when
rpcrdma_post_recvs() was invoked but no new Receive buffers were
posted.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 74f6de3..0c56fe5 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1521,9 +1521,11 @@ struct rpcrdma_regbuf *
struct ib_recv_wr *wr, *bad_wr;
int needed, count, rc;

+ rc = 0;
+ count = 0;
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
if (buf->rb_posted_receives > needed)
- return;
+ goto out;
needed -= buf->rb_posted_receives;

count = 0;
@@ -1559,7 +1561,7 @@ struct rpcrdma_regbuf *
--needed;
}
if (!count)
- return;
+ goto out;

rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
(const struct ib_recv_wr **)&bad_wr);
@@ -1573,5 +1575,6 @@ struct rpcrdma_regbuf *
}
}
buf->rb_posted_receives += count;
+out:
trace_xprtrdma_post_recvs(r_xprt, count, rc);
}

2018-09-10 20:05:05

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 18/22] xprtrdma: Move rb_flags initialization

Clean up: rb_flags can be used for other things besides
RPCRDMA_BUF_F_EMPTY_SCQ, so initialize it in a generic spot
instead of in a send-completion-queue-related helper.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 6ac6183..74f6de3 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -918,7 +918,6 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
sc->sc_xprt = r_xprt;
buf->rb_sc_ctxs[i] = sc;
}
- buf->rb_flags = 0;

return 0;

@@ -1146,6 +1145,7 @@ struct rpcrdma_req *
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
int i, rc;

+ buf->rb_flags = 0;
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_mrlock);

2018-09-10 20:04:49

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 15/22] xprtrdma: Rename rpcrdma_qp_async_error_upcall

Clean up: Use a function name that is consistent with the RDMA core
API and with other consumers. Because this is a function that is
invoked from outside the rpcrdma.ko module, add an appropriate
documenting comment.

Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 2 +-
net/sunrpc/xprtrdma/verbs.c | 14 +++++++++++---
2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index d3d05fd..d8ddcac 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -377,7 +377,7 @@
DEFINE_RXPRT_EVENT(xprtrdma_reconnect);
DEFINE_RXPRT_EVENT(xprtrdma_inject_dsc);

-TRACE_EVENT(xprtrdma_qp_error,
+TRACE_EVENT(xprtrdma_qp_event,
TP_PROTO(
const struct rpcrdma_xprt *r_xprt,
const struct ib_event *event
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index abbd3cd..72bf811 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -127,14 +127,22 @@
xprt_force_disconnect(&r_xprt->rx_xprt);
}

+/**
+ * rpcrdma_qp_event_handler - Handle one QP event (error notification)
+ * @event: details of the event
+ * @context: ep that owns QP where event occurred
+ *
+ * Called from RDMA provider (device driver) possibly in a non-blocking
+ * context.
+ */
static void
-rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
+rpcrdma_qp_event_handler(struct ib_event *event, void *context)
{
struct rpcrdma_ep *ep = context;
struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
rx_ep);

- trace_xprtrdma_qp_error(r_xprt, event);
+ trace_xprtrdma_qp_event(r_xprt, event);
pr_err("rpcrdma: %s on device %s ep %p\n",
ib_event_msg(event->event), event->device->name, context);

@@ -547,7 +555,7 @@
if (rc)
return rc;

- ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
+ ep->rep_attr.event_handler = rpcrdma_qp_event_handler;
ep->rep_attr.qp_context = ep;
ep->rep_attr.srq = NULL;
ep->rep_attr.cap.max_send_sge = max_sge;

2018-09-10 20:05:20

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 21/22] xprtrdma: Clean up xprt_rdma_disconnect_inject

Clean up: Use the appropriate C macro instead of open-coding
container_of() .

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 67eb900..afc8fad 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -266,8 +266,7 @@
static void
xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
{
- struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt,
- rx_xprt);
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);

trace_xprtrdma_inject_dsc(r_xprt);
rdma_disconnect(r_xprt->rx_ia.ri_id);

2018-09-10 20:04:34

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 12/22] xprtrdma: Eliminate "connstate" variable from rpcrdma_conn_upcall()

Clean up.

Since commit 173b8f49b3af ("xprtrdma: Demote "connect" log messages")
there has been no need to initialize connstat to zero. In fact, in
this code path there's now no reason not to set rep_connected
directly.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f2c8c3c..422d3db 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -234,7 +234,6 @@
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
- int connstate = 0;

might_sleep();

@@ -270,28 +269,27 @@
return 1;
case RDMA_CM_EVENT_ESTABLISHED:
++xprt->connect_cookie;
- connstate = 1;
+ ep->rep_connected = 1;
rpcrdma_update_connect_private(r_xprt, &event->param.conn);
goto connected;
case RDMA_CM_EVENT_CONNECT_ERROR:
- connstate = -ENOTCONN;
+ ep->rep_connected = -ENOTCONN;
goto connected;
case RDMA_CM_EVENT_UNREACHABLE:
- connstate = -ENETUNREACH;
+ ep->rep_connected = -ENETUNREACH;
goto connected;
case RDMA_CM_EVENT_REJECTED:
dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
rdma_reject_msg(id, event->status));
- connstate = -ECONNREFUSED;
+ ep->rep_connected = -ECONNREFUSED;
if (event->status == IB_CM_REJ_STALE_CONN)
- connstate = -EAGAIN;
+ ep->rep_connected = -EAGAIN;
goto connected;
case RDMA_CM_EVENT_DISCONNECTED:
++xprt->connect_cookie;
- connstate = -ECONNABORTED;
+ ep->rep_connected = -ECONNABORTED;
connected:
- ep->rep_connected = connstate;
rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
/*FALLTHROUGH*/

2018-09-10 20:05:00

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 17/22] svcrdma: Don't disable BH's in backchannel

Clean up: This code was copied from xprtsock.c and
backchannel_rqst.c. For rpcrdma, the backchannel runs exclusively in
process context, thus disabling bottom-halves is unnecessary.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 90adeff..675b530 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -54,9 +54,9 @@ static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
INIT_LIST_HEAD(&rqst->rq_list);
INIT_LIST_HEAD(&rqst->rq_bc_list);
__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
- spin_lock_bh(&xprt->bc_pa_lock);
+ spin_lock(&xprt->bc_pa_lock);
list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
- spin_unlock_bh(&xprt->bc_pa_lock);
+ spin_unlock(&xprt->bc_pa_lock);

size = r_xprt->rx_data.inline_rsize;
rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
@@ -228,16 +228,16 @@ void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpc_rqst *rqst, *tmp;

- spin_lock_bh(&xprt->bc_pa_lock);
+ spin_lock(&xprt->bc_pa_lock);
list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
list_del(&rqst->rq_bc_pa_list);
- spin_unlock_bh(&xprt->bc_pa_lock);
+ spin_unlock(&xprt->bc_pa_lock);

rpcrdma_bc_free_rqst(r_xprt, rqst);

- spin_lock_bh(&xprt->bc_pa_lock);
+ spin_lock(&xprt->bc_pa_lock);
}
- spin_unlock_bh(&xprt->bc_pa_lock);
+ spin_unlock(&xprt->bc_pa_lock);
}

/**
@@ -255,9 +255,9 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
rpcrdma_recv_buffer_put(req->rl_reply);
req->rl_reply = NULL;

- spin_lock_bh(&xprt->bc_pa_lock);
+ spin_lock(&xprt->bc_pa_lock);
list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
- spin_unlock_bh(&xprt->bc_pa_lock);
+ spin_unlock(&xprt->bc_pa_lock);
}

/**

2018-09-10 20:05:15

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 20/22] xprtrdma: Add documenting comments

Clean up: fill in or update documenting comments for transport
switch entry points.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 36 +++++++++++++++++++++++-------------
1 file changed, 23 insertions(+), 13 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index d7c4255..67eb900 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -256,6 +256,13 @@
}
}

+/**
+ * xprt_rdma_inject_disconnect - inject a connection fault
+ * @xprt: transport context
+ *
+ * If @xprt is connected, disconnect it to simulate spurious connection
+ * loss.
+ */
static void
xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
{
@@ -266,16 +273,12 @@
rdma_disconnect(r_xprt->rx_ia.ri_id);
}

-/*
- * xprt_rdma_destroy
+/**
+ * xprt_rdma_destroy - Full tear down of transport
+ * @xprt: doomed transport context
*
- * Destroy the xprt.
- * Free all memory associated with the object, including its own.
- * NOTE: none of the *destroy methods free memory for their top-level
- * objects, even though they may have allocated it (they do free
- * private memory). It's up to the caller to handle it. In this
- * case (RDMA transport), all structure memory is inlined with the
- * struct rpcrdma_xprt.
+ * Caller guarantees there will be no more calls to us with
+ * this @xprt.
*/
static void
xprt_rdma_destroy(struct rpc_xprt *xprt)
@@ -428,11 +431,12 @@
}

/**
- * xprt_rdma_close - Close down RDMA connection
- * @xprt: generic transport to be closed
+ * xprt_rdma_close - close a transport connection
+ * @xprt: transport context
*
- * Called during transport shutdown reconnect, or device
- * removal. Caller holds the transport's write lock.
+ * Called during transport shutdown, reconnect, or device removal.
+ * Caller holds @xprt's send lock to prevent activity on this
+ * transport while the connection is torn down.
*/
static void
xprt_rdma_close(struct rpc_xprt *xprt)
@@ -511,6 +515,12 @@
xprt_force_disconnect(xprt);
}

+/**
+ * xprt_rdma_connect - try to establish a transport connection
+ * @xprt: transport state
+ * @task: RPC scheduler context
+ *
+ */
static void
xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
{

2018-09-10 20:05:26

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 22/22] xprtrdma: Squelch a sparse warning

linux/include/trace/events/rpcrdma.h:501:1: warning: expression using sizeof bool
linux/include/trace/events/rpcrdma.h:501:1: warning: odd constant _Bool cast (ffffffffffffffff becomes 1)

Fixes: ab03eff58eb5 ("xprtrdma: Add trace points in RPC Call ... ")
Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index d8ddcac..3f1fc5f 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -511,7 +511,7 @@
TP_STRUCT__entry(
__field(const void *, req)
__field(int, num_sge)
- __field(bool, signaled)
+ __field(int, signaled)
__field(int, status)
),


2018-09-12 23:47:16

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v1 08/22] sunrpc: Fix connect metrics

Hi Chuck,

On Mon, 2018-09-10 at 11:09 -0400, Chuck Lever wrote:
> For TCP, the logic in xprt_connect_status is currently never invoked
> to record a successful connection. Commit 2a4919919a97 ("SUNRPC:
> Return EAGAIN instead of ENOTCONN when waking up xprt->pending")
> changed the way TCP xprt's are awoken after a connect succeeds.
>
> Instead, change connection-oriented transports to bump connect_count
> and compute connect_time the moment that XPRT_CONNECTED is set.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprt.c | 10 +++-------
> net/sunrpc/xprtrdma/transport.c | 6 +++++-
> net/sunrpc/xprtsock.c | 10 ++++++----
> 3 files changed, 14 insertions(+), 12 deletions(-)
>
> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> index a8db2e3f..f03ffa2 100644
> --- a/net/sunrpc/xprt.c
> +++ b/net/sunrpc/xprt.c
> @@ -791,15 +791,11 @@ static void xprt_connect_status(struct rpc_task *task)
> {
> struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;

Looks like the only remaining user of the xprt is in a dprintk() towards the
bottom of this function to get the servername. This is giving me an unused
variable warning when CONFIG_SUNRPC_DEBUG=n, so I'm wondering if you can take
out the variable and just access the server name the long way (task->tk_rqstp-
>rq_xprt->servername)?

Thanks,
Anna

>
> - if (task->tk_status == 0) {
> - xprt->stat.connect_count++;
> - xprt->stat.connect_time += (long)jiffies - xprt-
> >stat.connect_start;
> + switch (task->tk_status) {
> + case 0:
> dprintk("RPC: %5u xprt_connect_status: connection
> established\n",
> task->tk_pid);
> - return;
> - }
> -
> - switch (task->tk_status) {
> + break;
> case -ECONNREFUSED:
> case -ECONNRESET:
> case -ECONNABORTED:
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index 3ae73e6..087acfc 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -242,8 +242,12 @@
>
> spin_lock_bh(&xprt->transport_lock);
> if (ep->rep_connected > 0) {
> - if (!xprt_test_and_set_connected(xprt))
> + if (!xprt_test_and_set_connected(xprt)) {
> + xprt->stat.connect_count++;
> + xprt->stat.connect_time += (long)jiffies -
> + xprt->stat.connect_start;
> xprt_wake_pending_tasks(xprt, 0);
> + }
> } else {
> if (xprt_test_and_clear_connected(xprt))
> xprt_wake_pending_tasks(xprt, -ENOTCONN);
> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> index 6b7539c..e146caa 100644
> --- a/net/sunrpc/xprtsock.c
> +++ b/net/sunrpc/xprtsock.c
> @@ -1611,6 +1611,9 @@ static void xs_tcp_state_change(struct sock *sk)
> clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
> xprt_clear_connecting(xprt);
>
> + xprt->stat.connect_count++;
> + xprt->stat.connect_time += (long)jiffies -
> + xprt->stat.connect_start;
> xprt_wake_pending_tasks(xprt, -EAGAIN);
> }
> spin_unlock(&xprt->transport_lock);
> @@ -2029,8 +2032,6 @@ static int xs_local_finish_connecting(struct rpc_xprt
> *xprt,
> }
>
> /* Tell the socket layer to start connecting... */
> - xprt->stat.connect_count++;
> - xprt->stat.connect_start = jiffies;
> return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
> }
>
> @@ -2062,6 +2063,9 @@ static int xs_local_setup_socket(struct sock_xprt
> *transport)
> case 0:
> dprintk("RPC: xprt %p connected to %s\n",
> xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
> + xprt->stat.connect_count++;
> + xprt->stat.connect_time += (long)jiffies -
> + xprt->stat.connect_start;
> xprt_set_connected(xprt);
> case -ENOBUFS:
> break;
> @@ -2387,8 +2391,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt
> *xprt, struct socket *sock)
> xs_set_memalloc(xprt);
>
> /* Tell the socket layer to start connecting... */
> - xprt->stat.connect_count++;
> - xprt->stat.connect_start = jiffies;
> set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
> ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
> switch (ret) {
>