2019-08-19 22:36:39

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 00/21] NFS/RDMA client-side for-5.4

Hi Anna-

These feel like they are ready for you to merge.

Changes since v1:
- Rebased on v5.3-rc5
- Improved "Boost maximum transport header size"
- Minor clarifications

---

Chuck Lever (21):
SUNRPC: Remove rpc_wake_up_queued_task_on_wq()
SUNRPC: Inline xdr_commit_encode
xprtrdma: Refresh the documenting comment in frwr_ops.c
xprtrdma: Update obsolete comment
xprtrdma: Fix calculation of ri_max_segs again
xprtrdma: Boost maximum transport header size
xprtrdma: Boost client's max slot table size to match Linux server
xprtrdma: Rename CQE field in Receive trace points
xprtrdma: Rename rpcrdma_buffer::rb_all
xprtrdma: Toggle XPRT_CONGESTED in xprtrdma's slot methods
xprtrdma: Simplify rpcrdma_mr_pop
xprtrdma: Combine rpcrdma_mr_put and rpcrdma_mr_unmap_and_put
xprtrdma: Move rpcrdma_mr_get out of frwr_map
xprtrdma: Ensure creating an MR does not trigger FS writeback
xprtrdma: Cache free MRs in each rpcrdma_req
xprtrdma: Remove rpcrdma_buffer::rb_mrlock
xprtrdma: Use an llist to manage free rpcrdma_reps
xprtrdma: Clean up xprt_rdma_set_connect_timeout()
xprtrdma: Fix bc_max_slots return value
xprtrdma: Inline XDR chunk encoder functions
xprtrdma: Optimize rpcrdma_post_recvs()


include/linux/sunrpc/sched.h | 3
include/linux/sunrpc/xprtrdma.h | 4 -
include/trace/events/rpcrdma.h | 88 ++++++++++++--
net/sunrpc/sched.c | 27 +---
net/sunrpc/xdr.c | 2
net/sunrpc/xprtrdma/backchannel.c | 4 -
net/sunrpc/xprtrdma/frwr_ops.c | 131 +++++++--------------
net/sunrpc/xprtrdma/rpc_rdma.c | 63 +++++++---
net/sunrpc/xprtrdma/transport.c | 12 +-
net/sunrpc/xprtrdma/verbs.c | 235 ++++++++++++++++---------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 58 ++++-----
11 files changed, 305 insertions(+), 322 deletions(-)

--
Chuck Lever


2019-08-19 22:37:59

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 02/21] SUNRPC: Inline xdr_commit_encode

Micro-optimization: For xdr_commit_encode call sites in
net/sunrpc/xdr.c, eliminate the extra calling sequence. On my
client, this change saves about a microsecond for every 30 calls
to xdr_reserve_space().

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xdr.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 48c93b9e525e..7ba0ede6b417 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -560,7 +560,7 @@ EXPORT_SYMBOL_GPL(xdr_init_encode);
* required at the end of encoding, or any other time when the xdr_buf
* data might be read.
*/
-void xdr_commit_encode(struct xdr_stream *xdr)
+inline void xdr_commit_encode(struct xdr_stream *xdr)
{
int shift = xdr->scratch.iov_len;
void *page;

2019-08-19 22:38:51

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 01/21] SUNRPC: Remove rpc_wake_up_queued_task_on_wq()

Clean up: commit c544577daddb ("SUNRPC: Clean up transport write
space handling") appears to have removed the last caller of
rpc_wake_up_queued_task_on_wq().

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/sched.h | 3 ---
net/sunrpc/sched.c | 27 ++++-----------------------
2 files changed, 4 insertions(+), 26 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index baa3ecdb882f..d1283bddd218 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -243,9 +243,6 @@ void rpc_sleep_on_priority_timeout(struct rpc_wait_queue *queue,
void rpc_sleep_on_priority(struct rpc_wait_queue *,
struct rpc_task *,
int priority);
-void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
- struct rpc_wait_queue *queue,
- struct rpc_task *task);
void rpc_wake_up_queued_task(struct rpc_wait_queue *,
struct rpc_task *);
void rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *,
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 1f275aba786f..f25c4b9ba185 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -541,33 +541,14 @@ rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
return NULL;
}

-static void
-rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
- struct rpc_wait_queue *queue, struct rpc_task *task)
-{
- rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, NULL, NULL);
-}
-
/*
* Wake up a queued task while the queue lock is being held
*/
-static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue,
+ struct rpc_task *task)
{
- rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
-}
-
-/*
- * Wake up a task on a specific queue
- */
-void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
- struct rpc_wait_queue *queue,
- struct rpc_task *task)
-{
- if (!RPC_IS_QUEUED(task))
- return;
- spin_lock(&queue->lock);
- rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
- spin_unlock(&queue->lock);
+ rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
+ task, NULL, NULL);
}

/*

2019-08-19 22:39:18

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 04/21] xprtrdma: Update obsolete comment

Comment was made obsolete by commit 8cec3dba76a4 ("xprtrdma:
rpcrdma_regbuf alignment").

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/xprt_rdma.h | 3 ---
1 file changed, 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 92ce09fcea74..3b2f2041e889 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -117,9 +117,6 @@ struct rpcrdma_ep {
#endif

/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
- *
- * The below structure appears at the front of a large region of kmalloc'd
- * memory, which always starts on a good alignment boundary.
*/

struct rpcrdma_regbuf {

2019-08-19 22:40:05

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 03/21] xprtrdma: Refresh the documenting comment in frwr_ops.c

Things have changed since this comment was written. In particular,
the reworking of connection closing, on-demand creation of MRs, and
the removal of fr_state all mean that deferring MR recovery to
frwr_map is no longer needed. The description is obsolete.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 66 +++++++++++-----------------------------
1 file changed, 18 insertions(+), 48 deletions(-)

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 0b6dad7580a1..a30f2ae49578 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -7,67 +7,37 @@
/* Lightweight memory registration using Fast Registration Work
* Requests (FRWR).
*
- * FRWR features ordered asynchronous registration and deregistration
- * of arbitrarily sized memory regions. This is the fastest and safest
+ * FRWR features ordered asynchronous registration and invalidation
+ * of arbitrarily-sized memory regions. This is the fastest and safest
* but most complex memory registration mode.
*/

/* Normal operation
*
- * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
+ * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
* Work Request (frwr_map). When the RDMA operation is finished, this
* Memory Region is invalidated using a LOCAL_INV Work Request
- * (frwr_unmap_sync).
+ * (frwr_unmap_async and frwr_unmap_sync).
*
- * Typically these Work Requests are not signaled, and neither are RDMA
- * SEND Work Requests (with the exception of signaling occasionally to
- * prevent provider work queue overflows). This greatly reduces HCA
+ * Typically FAST_REG Work Requests are not signaled, and neither are
+ * RDMA Send Work Requests (with the exception of signaling occasionally
+ * to prevent provider work queue overflows). This greatly reduces HCA
* interrupt workload.
- *
- * As an optimization, frwr_unmap marks MRs INVALID before the
- * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
- * rb_mrs immediately so that no work (like managing a linked list
- * under a spinlock) is needed in the completion upcall.
- *
- * But this means that frwr_map() can occasionally encounter an MR
- * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
- * ordering prevents a subsequent FAST_REG WR from executing against
- * that MR while it is still being invalidated.
*/

/* Transport recovery
*
- * ->op_map and the transport connect worker cannot run at the same
- * time, but ->op_unmap can fire while the transport connect worker
- * is running. Thus MR recovery is handled in ->op_map, to guarantee
- * that recovered MRs are owned by a sending RPC, and not one where
- * ->op_unmap could fire at the same time transport reconnect is
- * being done.
- *
- * When the underlying transport disconnects, MRs are left in one of
- * four states:
- *
- * INVALID: The MR was not in use before the QP entered ERROR state.
- *
- * VALID: The MR was registered before the QP entered ERROR state.
- *
- * FLUSHED_FR: The MR was being registered when the QP entered ERROR
- * state, and the pending WR was flushed.
- *
- * FLUSHED_LI: The MR was being invalidated when the QP entered ERROR
- * state, and the pending WR was flushed.
- *
- * When frwr_map encounters FLUSHED and VALID MRs, they are recovered
- * with ib_dereg_mr and then are re-initialized. Because MR recovery
- * allocates fresh resources, it is deferred to a workqueue, and the
- * recovered MRs are placed back on the rb_mrs list when recovery is
- * complete. frwr_map allocates another MR for the current RPC while
- * the broken MR is reset.
- *
- * To ensure that frwr_map doesn't encounter an MR that is marked
- * INVALID but that is about to be flushed due to a previous transport
- * disconnect, the transport connect worker attempts to drain all
- * pending send queue WRs before the transport is reconnected.
+ * frwr_map and frwr_unmap_* cannot run at the same time the transport
+ * connect worker is running. The connect worker holds the transport
+ * send lock, just as ->send_request does. This prevents frwr_map and
+ * the connect worker from running concurrently. When a connection is
+ * closed, the Receive completion queue is drained before the allowing
+ * the connect worker to get control. This prevents frwr_unmap and the
+ * connect worker from running concurrently.
+ *
+ * When the underlying transport disconnects, MRs that are in flight
+ * are flushed and are likely unusable. Thus all flushed MRs are
+ * destroyed. New MRs are created on demand.
*/

#include <linux/sunrpc/rpc_rdma.h>

2019-08-19 22:40:11

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 05/21] xprtrdma: Fix calculation of ri_max_segs again

Commit 302d3deb206 ("xprtrdma: Prevent inline overflow") added this
calculation back in 2016, but got it wrong. I tested only the lower
bound, which is why there is a max_t there. The upper bound should be
rounded up too.

Now, when using DIV_ROUND_UP, that takes care of the lower bound as
well.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index a30f2ae49578..3a10bfff2125 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -260,8 +260,8 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */

- ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
- ia->ri_max_frwr_depth);
+ ia->ri_max_segs =
+ DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
/* Reply chunks require segments for head and tail buffers */
ia->ri_max_segs += 2;
if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)

2019-08-19 22:40:54

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 06/21] xprtrdma: Boost maximum transport header size

Although I haven't seen any performance results that justify it,
I've received several complaints that NFS/RDMA no longer supports
a maximum rsize and wsize of 1MB. These days it is somewhat smaller.

To simplify the logic that determines whether a chunk list is
necessary, the implementation uses a fixed maximum size of the
transport header. Currently that maximum size is 256 bytes, one
quarter of the default inline threshold size for RPC/RDMA v1.

Since commit a78868497c2e ("xprtrdma: Reduce max_frwr_depth"), the
size of chunks is also smaller to take advantage of inline page
lists in device internal MR data structures.

The combination of these two design choices has reduced the maximum
NFS rsize and wsize that can be used for most RNIC/HCAs. Increasing
the maximum transport header size and the maximum number of RDMA
segments it can contain increases the negotiated maximum rsize/wsize
on common RNIC/HCAs.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 9 ++++++++-
net/sunrpc/xprtrdma/xprt_rdma.h | 23 ++++++++++-------------
2 files changed, 18 insertions(+), 14 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 805b1f35e1ca..e639ea0faf19 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -53,6 +53,7 @@
#include <linux/slab.h>
#include <linux/sunrpc/addr.h>
#include <linux/sunrpc/svc_rdma.h>
+#include <linux/log2.h>

#include <asm-generic/barrier.h>
#include <asm/bitops.h>
@@ -1000,12 +1001,18 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
+ size_t maxhdrsize;

req = kzalloc(sizeof(*req), flags);
if (req == NULL)
goto out1;

- rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags);
+ /* Compute maximum header buffer size in bytes */
+ maxhdrsize = rpcrdma_fixed_maxsz + 3 +
+ r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
+ maxhdrsize *= sizeof(__be32);
+ rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
+ DMA_TO_DEVICE, flags);
if (!rb)
goto out2;
req->rl_rdmabuf = rb;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3b2f2041e889..eaf6b907a76e 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -155,25 +155,22 @@ static inline void *rdmab_data(const struct rpcrdma_regbuf *rb)

/* To ensure a transport can always make forward progress,
* the number of RDMA segments allowed in header chunk lists
- * is capped at 8. This prevents less-capable devices and
- * memory registrations from overrunning the Send buffer
- * while building chunk lists.
+ * is capped at 16. This prevents less-capable devices from
+ * overrunning the Send buffer while building chunk lists.
*
* Elements of the Read list take up more room than the
- * Write list or Reply chunk. 8 read segments means the Read
- * list (or Write list or Reply chunk) cannot consume more
- * than
+ * Write list or Reply chunk. 16 read segments means the
+ * chunk lists cannot consume more than
*
- * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes.
+ * ((16 + 2) * read segment size) + 1 XDR words,
*
- * And the fixed part of the header is another 24 bytes.
- *
- * The smallest inline threshold is 1024 bytes, ensuring that
- * at least 750 bytes are available for RPC messages.
+ * or about 400 bytes. The fixed part of the header is
+ * another 24 bytes. Thus when the inline threshold is
+ * 1024 bytes, at least 600 bytes are available for RPC
+ * message bodies.
*/
enum {
- RPCRDMA_MAX_HDR_SEGS = 8,
- RPCRDMA_HDRBUF_SIZE = 256,
+ RPCRDMA_MAX_HDR_SEGS = 16,
};

/*

2019-08-19 22:41:35

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 07/21] xprtrdma: Boost client's max slot table size to match Linux server

I've heard rumors of an NFS/RDMA server implementation that has a
default credit limit of 1024. The client's default setting remains
at 128.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprtrdma.h | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
index 86fc38ff0355..16c239e0d6dd 100644
--- a/include/linux/sunrpc/xprtrdma.h
+++ b/include/linux/sunrpc/xprtrdma.h
@@ -49,9 +49,9 @@
* fully-chunked NFS message (read chunks are the largest). Note only
* a single chunk type per message is supported currently.
*/
-#define RPCRDMA_MIN_SLOT_TABLE (2U)
+#define RPCRDMA_MIN_SLOT_TABLE (4U)
#define RPCRDMA_DEF_SLOT_TABLE (128U)
-#define RPCRDMA_MAX_SLOT_TABLE (256U)
+#define RPCRDMA_MAX_SLOT_TABLE (16384U)

#define RPCRDMA_MIN_INLINE (1024) /* min inline thresh */
#define RPCRDMA_DEF_INLINE (4096) /* default inline thresh */

2019-08-19 22:42:55

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 08/21] xprtrdma: Rename CQE field in Receive trace points

Make the field name the same for all trace points that handle
pointers to struct rpcrdma_rep. That makes it easy to grep for
matching rep points in trace output.

Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 21 +++++++++++----------
net/sunrpc/xprtrdma/verbs.c | 2 +-
2 files changed, 12 insertions(+), 11 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index f6a4eaa85a3e..6e6055eb67e7 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -623,21 +623,21 @@ TRACE_EVENT(xprtrdma_post_send,

TRACE_EVENT(xprtrdma_post_recv,
TP_PROTO(
- const struct ib_cqe *cqe
+ const struct rpcrdma_rep *rep
),

- TP_ARGS(cqe),
+ TP_ARGS(rep),

TP_STRUCT__entry(
- __field(const void *, cqe)
+ __field(const void *, rep)
),

TP_fast_assign(
- __entry->cqe = cqe;
+ __entry->rep = rep;
),

- TP_printk("cqe=%p",
- __entry->cqe
+ TP_printk("rep=%p",
+ __entry->rep
)
);

@@ -715,14 +715,15 @@ TRACE_EVENT(xprtrdma_wc_receive,
TP_ARGS(wc),

TP_STRUCT__entry(
- __field(const void *, cqe)
+ __field(const void *, rep)
__field(u32, byte_len)
__field(unsigned int, status)
__field(u32, vendor_err)
),

TP_fast_assign(
- __entry->cqe = wc->wr_cqe;
+ __entry->rep = container_of(wc->wr_cqe, struct rpcrdma_rep,
+ rr_cqe);
__entry->status = wc->status;
if (wc->status) {
__entry->byte_len = 0;
@@ -733,8 +734,8 @@ TRACE_EVENT(xprtrdma_wc_receive,
}
),

- TP_printk("cqe=%p %u bytes: %s (%u/0x%x)",
- __entry->cqe, __entry->byte_len,
+ TP_printk("rep=%p %u bytes: %s (%u/0x%x)",
+ __entry->rep, __entry->byte_len,
rdma_show_wc_status(__entry->status),
__entry->status, __entry->vendor_err
)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e639ea0faf19..3c275a7a4e4c 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1531,7 +1531,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
goto release_wrs;

- trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
+ trace_xprtrdma_post_recv(rep);
++count;
}


2019-08-19 22:43:39

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 09/21] xprtrdma: Rename rpcrdma_buffer::rb_all

Clean up: There are other "all" list heads. For code clarity
distinguish this one as for use only for MRs by renaming it.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 26 ++++++++------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +-
2 files changed, 9 insertions(+), 19 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 3c275a7a4e4c..e004873cc4f0 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -944,8 +944,6 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
unsigned int count;
- LIST_HEAD(free);
- LIST_HEAD(all);

for (count = 0; count < ia->ri_max_segs; count++) {
struct rpcrdma_mr *mr;
@@ -963,15 +961,13 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)

mr->mr_xprt = r_xprt;

- list_add(&mr->mr_list, &free);
- list_add(&mr->mr_all, &all);
+ spin_lock(&buf->rb_mrlock);
+ list_add(&mr->mr_list, &buf->rb_mrs);
+ list_add(&mr->mr_all, &buf->rb_all_mrs);
+ spin_unlock(&buf->rb_mrlock);
}

- spin_lock(&buf->rb_mrlock);
- list_splice(&free, &buf->rb_mrs);
- list_splice(&all, &buf->rb_all);
r_xprt->rx_stats.mrs_allocated += count;
- spin_unlock(&buf->rb_mrlock);
trace_xprtrdma_createmrs(r_xprt, count);
}

@@ -1089,7 +1085,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
spin_lock_init(&buf->rb_mrlock);
spin_lock_init(&buf->rb_lock);
INIT_LIST_HEAD(&buf->rb_mrs);
- INIT_LIST_HEAD(&buf->rb_all);
+ INIT_LIST_HEAD(&buf->rb_all_mrs);
INIT_DELAYED_WORK(&buf->rb_refresh_worker,
rpcrdma_mr_refresh_worker);

@@ -1156,24 +1152,18 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)

count = 0;
spin_lock(&buf->rb_mrlock);
- while (!list_empty(&buf->rb_all)) {
- mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
+ while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
+ struct rpcrdma_mr,
+ mr_all)) != NULL) {
list_del(&mr->mr_all);
-
spin_unlock(&buf->rb_mrlock);

- /* Ensure MW is not on any rl_registered list */
- if (!list_empty(&mr->mr_list))
- list_del(&mr->mr_list);
-
frwr_release_mr(mr);
count++;
spin_lock(&buf->rb_mrlock);
}
spin_unlock(&buf->rb_mrlock);
r_xprt->rx_stats.mrs_allocated = 0;
-
- dprintk("RPC: %s: released %u MRs\n", __func__, count);
}

/**
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index eaf6b907a76e..5aaa53b8ae12 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -360,7 +360,7 @@ rpcrdma_mr_pop(struct list_head *list)
struct rpcrdma_buffer {
spinlock_t rb_mrlock; /* protect rb_mrs list */
struct list_head rb_mrs;
- struct list_head rb_all;
+ struct list_head rb_all_mrs;

unsigned long rb_sc_head;
unsigned long rb_sc_tail;

2019-08-19 22:44:42

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 11/21] xprtrdma: Simplify rpcrdma_mr_pop

Clean up: rpcrdma_mr_pop call sites check if the list is empty
first. Let's replace the list_empty with less costly logic.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 12 ++++--------
net/sunrpc/xprtrdma/rpc_rdma.c | 7 +------
net/sunrpc/xprtrdma/verbs.c | 6 ++----
net/sunrpc/xprtrdma/xprt_rdma.h | 7 ++++---
4 files changed, 11 insertions(+), 21 deletions(-)

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 3a10bfff2125..d7e763fafa04 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -126,12 +126,10 @@ frwr_mr_recycle_worker(struct work_struct *work)
*/
void frwr_reset(struct rpcrdma_req *req)
{
- while (!list_empty(&req->rl_registered)) {
- struct rpcrdma_mr *mr;
+ struct rpcrdma_mr *mr;

- mr = rpcrdma_mr_pop(&req->rl_registered);
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
rpcrdma_mr_unmap_and_put(mr);
- }
}

/**
@@ -532,8 +530,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
frwr = NULL;
prev = &first;
- while (!list_empty(&req->rl_registered)) {
- mr = rpcrdma_mr_pop(&req->rl_registered);
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {

trace_xprtrdma_mr_localinv(mr);
r_xprt->rx_stats.local_inv_needed++;
@@ -632,8 +629,7 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
frwr = NULL;
prev = &first;
- while (!list_empty(&req->rl_registered)) {
- mr = rpcrdma_mr_pop(&req->rl_registered);
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {

trace_xprtrdma_mr_localinv(mr);
r_xprt->rx_stats.local_inv_needed++;
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 4345e6912392..0ac096a6348a 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -841,12 +841,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
* chunks. Very likely the connection has been replaced,
* so these registrations are invalid and unusable.
*/
- while (unlikely(!list_empty(&req->rl_registered))) {
- struct rpcrdma_mr *mr;
-
- mr = rpcrdma_mr_pop(&req->rl_registered);
- rpcrdma_mr_recycle(mr);
- }
+ frwr_reset(req);

/* This implementation supports the following combinations
* of chunk lists in one RPC-over-RDMA Call message:
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e004873cc4f0..ee6fcf10425b 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1213,13 +1213,11 @@ struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_mr *mr = NULL;
+ struct rpcrdma_mr *mr;

spin_lock(&buf->rb_mrlock);
- if (!list_empty(&buf->rb_mrs))
- mr = rpcrdma_mr_pop(&buf->rb_mrs);
+ mr = rpcrdma_mr_pop(&buf->rb_mrs);
spin_unlock(&buf->rb_mrlock);
-
if (!mr)
goto out_nomrs;
return mr;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 5aaa53b8ae12..9663b8ddd733 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -338,7 +338,7 @@ rpcr_to_rdmar(const struct rpc_rqst *rqst)
static inline void
rpcrdma_mr_push(struct rpcrdma_mr *mr, struct list_head *list)
{
- list_add_tail(&mr->mr_list, list);
+ list_add(&mr->mr_list, list);
}

static inline struct rpcrdma_mr *
@@ -346,8 +346,9 @@ rpcrdma_mr_pop(struct list_head *list)
{
struct rpcrdma_mr *mr;

- mr = list_first_entry(list, struct rpcrdma_mr, mr_list);
- list_del_init(&mr->mr_list);
+ mr = list_first_entry_or_null(list, struct rpcrdma_mr, mr_list);
+ if (mr)
+ list_del_init(&mr->mr_list);
return mr;
}


2019-08-19 22:45:17

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 10/21] xprtrdma: Toggle XPRT_CONGESTED in xprtrdma's slot methods

Commit 48be539dd44a ("xprtrdma: Introduce ->alloc_slot call-out for
xprtrdma") added a separate alloc_slot and free_slot to the RPC/RDMA
transport. Later, commit 75891f502f5f ("SUNRPC: Support for
congestion control when queuing is enabled") modified the generic
alloc/free_slot methods, but neglected the methods in xprtrdma.

Found via code review.

Fixes: 75891f502f5f ("SUNRPC: Support for congestion control ... ")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 2ec349ed4770..f4763e8a6761 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -571,6 +571,7 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
return;

out_sleep:
+ set_bit(XPRT_CONGESTED, &xprt->state);
rpc_sleep_on(&xprt->backlog, task, NULL);
task->tk_status = -EAGAIN;
}
@@ -589,7 +590,8 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)

memset(rqst, 0, sizeof(*rqst));
rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
- rpc_wake_up_next(&xprt->backlog);
+ if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
+ clear_bit(XPRT_CONGESTED, &xprt->state);
}

static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,

2019-08-19 22:45:30

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 12/21] xprtrdma: Combine rpcrdma_mr_put and rpcrdma_mr_unmap_and_put

Clean up. There is only one remaining rpcrdma_mr_put call site, and
it can be directly replaced with unmap_and_put because mr->mr_dir is
set to DMA_NONE just before the call.

Now all the call sites do a DMA unmap, and we can just rename
mr_unmap_and_put to mr_put, which nicely matches mr_get.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 6 +++---
net/sunrpc/xprtrdma/verbs.c | 32 ++++++++------------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
3 files changed, 11 insertions(+), 28 deletions(-)

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index d7e763fafa04..97e1804139b8 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -129,7 +129,7 @@ void frwr_reset(struct rpcrdma_req *req)
struct rpcrdma_mr *mr;

while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
- rpcrdma_mr_unmap_and_put(mr);
+ rpcrdma_mr_put(mr);
}

/**
@@ -453,7 +453,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
if (mr->mr_handle == rep->rr_inv_rkey) {
list_del_init(&mr->mr_list);
trace_xprtrdma_mr_remoteinv(mr);
- rpcrdma_mr_unmap_and_put(mr);
+ rpcrdma_mr_put(mr);
break; /* only one invalidated MR per RPC */
}
}
@@ -463,7 +463,7 @@ static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
if (wc->status != IB_WC_SUCCESS)
rpcrdma_mr_recycle(mr);
else
- rpcrdma_mr_unmap_and_put(mr);
+ rpcrdma_mr_put(mr);
}

/**
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ee6fcf10425b..5e0b774ed522 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1233,34 +1233,15 @@ rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
return NULL;
}

-static void
-__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
-{
- spin_lock(&buf->rb_mrlock);
- rpcrdma_mr_push(mr, &buf->rb_mrs);
- spin_unlock(&buf->rb_mrlock);
-}
-
-/**
- * rpcrdma_mr_put - Release an rpcrdma_mr object
- * @mr: object to release
- *
- */
-void
-rpcrdma_mr_put(struct rpcrdma_mr *mr)
-{
- __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
-}
-
/**
- * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
- * @mr: object to release
+ * rpcrdma_mr_put - DMA unmap an MR and release it
+ * @mr: MR to release
*
*/
-void
-rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
+void rpcrdma_mr_put(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;

if (mr->mr_dir != DMA_NONE) {
trace_xprtrdma_mr_unmap(mr);
@@ -1268,7 +1249,10 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
mr->mr_sg, mr->mr_nents, mr->mr_dir);
mr->mr_dir = DMA_NONE;
}
- __rpcrdma_mr_put(&r_xprt->rx_buf, mr);
+
+ spin_lock(&buf->rb_mrlock);
+ rpcrdma_mr_push(mr, &buf->rb_mrs);
+ spin_unlock(&buf->rb_mrlock);
}

/**
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 9663b8ddd733..3e0839c2cda2 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -485,7 +485,6 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt);

struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt);
void rpcrdma_mr_put(struct rpcrdma_mr *mr);
-void rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr);

static inline void
rpcrdma_mr_recycle(struct rpcrdma_mr *mr)

2019-08-19 22:46:45

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 13/21] xprtrdma: Move rpcrdma_mr_get out of frwr_map

Refactor: Retrieve an MR and handle error recovery entirely in
rpc_rdma.c, as this is not a device-specific function.

Note that since commit 89f90fe1ad8b ("SUNRPC: Allow calls to
xprt_transmit() to drain the entire transmit queue"), the
xprt_transmit function handles the cond_resched. The transport no
longer has to do this itself.

Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 29 ++++++++++++++++++++++++++++-
net/sunrpc/xprtrdma/frwr_ops.c | 23 +++++------------------
net/sunrpc/xprtrdma/rpc_rdma.c | 30 ++++++++++++++++++++++++------
net/sunrpc/xprtrdma/verbs.c | 21 ++++-----------------
net/sunrpc/xprtrdma/xprt_rdma.h | 4 ++--
5 files changed, 63 insertions(+), 44 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 6e6055eb67e7..83c4dfd7feea 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -464,7 +464,34 @@ TRACE_EVENT(xprtrdma_createmrs,
)
);

-DEFINE_RXPRT_EVENT(xprtrdma_nomrs);
+TRACE_EVENT(xprtrdma_nomrs,
+ TP_PROTO(
+ const struct rpcrdma_req *req
+ ),
+
+ TP_ARGS(req),
+
+ TP_STRUCT__entry(
+ __field(const void *, req)
+ __field(unsigned int, task_id)
+ __field(unsigned int, client_id)
+ __field(u32, xid)
+ ),
+
+ TP_fast_assign(
+ const struct rpc_rqst *rqst = &req->rl_slot;
+
+ __entry->req = req;
+ __entry->task_id = rqst->rq_task->tk_pid;
+ __entry->client_id = rqst->rq_task->tk_client->cl_clid;
+ __entry->xid = be32_to_cpu(rqst->rq_xid);
+ ),
+
+ TP_printk("task:%[email protected]%u xid=0x%08x req=%p",
+ __entry->task_id, __entry->client_id, __entry->xid,
+ __entry->req
+ )
+);

DEFINE_RDCH_EVENT(read);
DEFINE_WRCH_EVENT(write);
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 97e1804139b8..362056f4f48d 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -291,31 +291,25 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
* @nsegs: number of segments remaining
* @writing: true when RDMA Write will be used
* @xid: XID of RPC using the registered memory
- * @out: initialized MR
+ * @mr: MR to fill in
*
* Prepare a REG_MR Work Request to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
*
* Returns the next segment or a negative errno pointer.
- * On success, the prepared MR is planted in @out.
+ * On success, @mr is filled in.
*/
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr_seg *seg,
int nsegs, bool writing, __be32 xid,
- struct rpcrdma_mr **out)
+ struct rpcrdma_mr *mr)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
- struct rpcrdma_mr *mr;
- struct ib_mr *ibmr;
struct ib_reg_wr *reg_wr;
+ struct ib_mr *ibmr;
int i, n;
u8 key;

- mr = rpcrdma_mr_get(r_xprt);
- if (!mr)
- goto out_getmr_err;
-
if (nsegs > ia->ri_max_frwr_depth)
nsegs = ia->ri_max_frwr_depth;
for (i = 0; i < nsegs;) {
@@ -330,7 +324,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,

++seg;
++i;
- if (holes_ok)
+ if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS)
continue;
if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
@@ -365,22 +359,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
mr->mr_offset = ibmr->iova;
trace_xprtrdma_mr_map(mr);

- *out = mr;
return seg;

-out_getmr_err:
- xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
- return ERR_PTR(-EAGAIN);
-
out_dmamap_err:
mr->mr_dir = DMA_NONE;
trace_xprtrdma_frwr_sgerr(mr, i);
- rpcrdma_mr_put(mr);
return ERR_PTR(-EIO);

out_mapmr_err:
trace_xprtrdma_frwr_maperr(mr, n);
- rpcrdma_mr_recycle(mr);
return ERR_PTR(-EIO);
}

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 0ac096a6348a..34772cb19286 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -342,6 +342,27 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
return 0;
}

+static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing,
+ struct rpcrdma_mr **mr)
+{
+ *mr = rpcrdma_mr_get(r_xprt);
+ if (!*mr)
+ goto out_getmr_err;
+
+ rpcrdma_mr_push(*mr, &req->rl_registered);
+ return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
+
+out_getmr_err:
+ trace_xprtrdma_nomrs(req);
+ xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
+ if (r_xprt->rx_ep.rep_connected != -ENODEV)
+ schedule_work(&r_xprt->rx_buf.rb_refresh_worker);
+ return ERR_PTR(-EAGAIN);
+}
+
/* Register and XDR encode the Read list. Supports encoding a list of read
* segments that belong to a single read chunk.
*
@@ -379,10 +400,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return nsegs;

do {
- seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr);
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
- rpcrdma_mr_push(mr, &req->rl_registered);

if (encode_read_segment(xdr, mr, pos) < 0)
return -EMSGSIZE;
@@ -440,10 +460,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,

nchunks = 0;
do {
- seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
- rpcrdma_mr_push(mr, &req->rl_registered);

if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
@@ -501,10 +520,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,

nchunks = 0;
do {
- seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
- rpcrdma_mr_push(mr, &req->rl_registered);

if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 5e0b774ed522..c9fa0f27b10a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -408,7 +408,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
struct rpcrdma_req *req;
struct rpcrdma_rep *rep;

- cancel_delayed_work_sync(&buf->rb_refresh_worker);
+ cancel_work_sync(&buf->rb_refresh_worker);

/* This is similar to rpcrdma_ep_destroy, but:
* - Don't cancel the connect worker.
@@ -975,7 +975,7 @@ static void
rpcrdma_mr_refresh_worker(struct work_struct *work)
{
struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
- rb_refresh_worker.work);
+ rb_refresh_worker);
struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
rx_buf);

@@ -1086,8 +1086,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
spin_lock_init(&buf->rb_lock);
INIT_LIST_HEAD(&buf->rb_mrs);
INIT_LIST_HEAD(&buf->rb_all_mrs);
- INIT_DELAYED_WORK(&buf->rb_refresh_worker,
- rpcrdma_mr_refresh_worker);
+ INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);

rpcrdma_mrs_create(r_xprt);

@@ -1177,7 +1176,7 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
- cancel_delayed_work_sync(&buf->rb_refresh_worker);
+ cancel_work_sync(&buf->rb_refresh_worker);

rpcrdma_sendctxs_destroy(buf);

@@ -1218,19 +1217,7 @@ rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
spin_lock(&buf->rb_mrlock);
mr = rpcrdma_mr_pop(&buf->rb_mrs);
spin_unlock(&buf->rb_mrlock);
- if (!mr)
- goto out_nomrs;
return mr;
-
-out_nomrs:
- trace_xprtrdma_nomrs(r_xprt);
- if (r_xprt->rx_ep.rep_connected != -ENODEV)
- schedule_delayed_work(&buf->rb_refresh_worker, 0);
-
- /* Allow the reply handler and refresh worker to run */
- cond_resched();
-
- return NULL;
}

/**
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3e0839c2cda2..9573587ca602 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -379,7 +379,7 @@ struct rpcrdma_buffer {
u32 rb_bc_srv_max_requests;
u32 rb_bc_max_requests;

- struct delayed_work rb_refresh_worker;
+ struct work_struct rb_refresh_worker;
};

/*
@@ -548,7 +548,7 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr_seg *seg,
int nsegs, bool writing, __be32 xid,
- struct rpcrdma_mr **mr);
+ struct rpcrdma_mr *mr);
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);

2019-08-19 22:47:11

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 14/21] xprtrdma: Ensure creating an MR does not trigger FS writeback

Probably would be good to also pass GFP flags to ib_alloc_mr.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 7 ++++---
net/sunrpc/xprtrdma/verbs.c | 2 +-
2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 362056f4f48d..1f2e3dda7401 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -147,11 +147,14 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
struct ib_mr *frmr;
int rc;

+ /* NB: ib_alloc_mr and device drivers typically allocate
+ * memory with GFP_KERNEL.
+ */
frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
if (IS_ERR(frmr))
goto out_mr_err;

- sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL);
+ sg = kcalloc(depth, sizeof(*sg), GFP_NOFS);
if (!sg)
goto out_list_err;

@@ -171,8 +174,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
return rc;

out_list_err:
- dprintk("RPC: %s: sg allocation failure\n",
- __func__);
ib_dereg_mr(frmr);
return -ENOMEM;
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c9fa0f27b10a..cb6df58488bb 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -949,7 +949,7 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_mr *mr;
int rc;

- mr = kzalloc(sizeof(*mr), GFP_KERNEL);
+ mr = kzalloc(sizeof(*mr), GFP_NOFS);
if (!mr)
break;


2019-08-19 22:48:04

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 15/21] xprtrdma: Cache free MRs in each rpcrdma_req

Instead of a globally-contended MR free list, cache MRs in each
rpcrdma_req as they are released. This means acquiring and releasing
an MR will be lock-free in the common case, even outside the
transport send lock.

The original idea of per-rpcrdma_req MR free lists was suggested by
Shirley Ma <[email protected]> several years ago. I just now
figured out how to make that idea work with on-demand MR allocation.

Signed-off-by: Chuck Lever <[email protected]>
---
include/trace/events/rpcrdma.h | 38 ++++++++++++++++++++++++++++++++++++--
net/sunrpc/xprtrdma/frwr_ops.c | 9 ++++++---
net/sunrpc/xprtrdma/rpc_rdma.c | 11 ++++++++---
net/sunrpc/xprtrdma/verbs.c | 18 +++++++++++++++---
net/sunrpc/xprtrdma/xprt_rdma.h | 7 ++++---
5 files changed, 69 insertions(+), 14 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 83c4dfd7feea..a13830616107 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -451,16 +451,50 @@ TRACE_EVENT(xprtrdma_createmrs,

TP_STRUCT__entry(
__field(const void *, r_xprt)
+ __string(addr, rpcrdma_addrstr(r_xprt))
+ __string(port, rpcrdma_portstr(r_xprt))
__field(unsigned int, count)
),

TP_fast_assign(
__entry->r_xprt = r_xprt;
__entry->count = count;
+ __assign_str(addr, rpcrdma_addrstr(r_xprt));
+ __assign_str(port, rpcrdma_portstr(r_xprt));
),

- TP_printk("r_xprt=%p: created %u MRs",
- __entry->r_xprt, __entry->count
+ TP_printk("peer=[%s]:%s r_xprt=%p: created %u MRs",
+ __get_str(addr), __get_str(port), __entry->r_xprt,
+ __entry->count
+ )
+);
+
+TRACE_EVENT(xprtrdma_mr_get,
+ TP_PROTO(
+ const struct rpcrdma_req *req
+ ),
+
+ TP_ARGS(req),
+
+ TP_STRUCT__entry(
+ __field(const void *, req)
+ __field(unsigned int, task_id)
+ __field(unsigned int, client_id)
+ __field(u32, xid)
+ ),
+
+ TP_fast_assign(
+ const struct rpc_rqst *rqst = &req->rl_slot;
+
+ __entry->req = req;
+ __entry->task_id = rqst->rq_task->tk_pid;
+ __entry->client_id = rqst->rq_task->tk_client->cl_clid;
+ __entry->xid = be32_to_cpu(rqst->rq_xid);
+ ),
+
+ TP_printk("task:%[email protected]%u xid=0x%08x req=%p",
+ __entry->task_id, __entry->client_id, __entry->xid,
+ __entry->req
)
);

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 1f2e3dda7401..0e740bae2d80 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -488,8 +488,8 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)

/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_li_wake(wc, frwr);
- complete(&frwr->fr_linv_done);
__frwr_release_mr(wc, mr);
+ complete(&frwr->fr_linv_done);
}

/**
@@ -587,11 +587,15 @@ static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
struct rpcrdma_frwr *frwr =
container_of(cqe, struct rpcrdma_frwr, fr_cqe);
struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+ struct rpcrdma_rep *rep = mr->mr_req->rl_reply;

/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_li_done(wc, frwr);
- rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
__frwr_release_mr(wc, mr);
+
+ /* Ensure @rep is generated before __frwr_release_mr */
+ smp_rmb();
+ rpcrdma_complete_rqst(rep);
}

/**
@@ -624,7 +628,6 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)

frwr = &mr->frwr;
frwr->fr_cqe.done = frwr_wc_localinv;
- frwr->fr_req = req;
last = &frwr->fr_invwr;
last->next = NULL;
last->wr_cqe = &frwr->fr_cqe;
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 34772cb19286..ffeb4dfebd46 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -348,9 +348,14 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
int nsegs, bool writing,
struct rpcrdma_mr **mr)
{
- *mr = rpcrdma_mr_get(r_xprt);
- if (!*mr)
- goto out_getmr_err;
+ *mr = rpcrdma_mr_pop(&req->rl_free_mrs);
+ if (!*mr) {
+ *mr = rpcrdma_mr_get(r_xprt);
+ if (!*mr)
+ goto out_getmr_err;
+ trace_xprtrdma_mr_get(req);
+ (*mr)->mr_req = req;
+ }

rpcrdma_mr_push(*mr, &req->rl_registered);
return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index cb6df58488bb..69753ec73c36 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -77,6 +77,7 @@
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
+static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags);
@@ -1022,6 +1023,7 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
if (!req->rl_recvbuf)
goto out4;

+ INIT_LIST_HEAD(&req->rl_free_mrs);
INIT_LIST_HEAD(&req->rl_registered);
spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs);
@@ -1130,11 +1132,13 @@ static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
* This function assumes that the caller prevents concurrent device
* unload and transport tear-down.
*/
-void
-rpcrdma_req_destroy(struct rpcrdma_req *req)
+void rpcrdma_req_destroy(struct rpcrdma_req *req)
{
list_del(&req->rl_all);

+ while (!list_empty(&req->rl_free_mrs))
+ rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
+
rpcrdma_regbuf_free(req->rl_recvbuf);
rpcrdma_regbuf_free(req->rl_sendbuf);
rpcrdma_regbuf_free(req->rl_rdmabuf);
@@ -1228,7 +1232,6 @@ rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
void rpcrdma_mr_put(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;

if (mr->mr_dir != DMA_NONE) {
trace_xprtrdma_mr_unmap(mr);
@@ -1237,6 +1240,15 @@ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
mr->mr_dir = DMA_NONE;
}

+ rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
+}
+
+static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
+{
+ struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+ mr->mr_req = NULL;
spin_lock(&buf->rb_mrlock);
rpcrdma_mr_push(mr, &buf->rb_mrs);
spin_unlock(&buf->rb_mrlock);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 9573587ca602..c375b0e434ac 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -234,20 +234,20 @@ struct rpcrdma_sendctx {
* An external memory region is any buffer or page that is registered
* on the fly (ie, not pre-registered).
*/
-struct rpcrdma_req;
struct rpcrdma_frwr {
struct ib_mr *fr_mr;
struct ib_cqe fr_cqe;
struct completion fr_linv_done;
- struct rpcrdma_req *fr_req;
union {
struct ib_reg_wr fr_regwr;
struct ib_send_wr fr_invwr;
};
};

+struct rpcrdma_req;
struct rpcrdma_mr {
struct list_head mr_list;
+ struct rpcrdma_req *mr_req;
struct scatterlist *mr_sg;
int mr_nents;
enum dma_data_direction mr_dir;
@@ -325,7 +325,8 @@ struct rpcrdma_req {
struct list_head rl_all;
struct kref rl_kref;

- struct list_head rl_registered; /* registered segments */
+ struct list_head rl_free_mrs;
+ struct list_head rl_registered;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};


2019-08-19 22:48:36

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 16/21] xprtrdma: Remove rpcrdma_buffer::rb_mrlock

Clean up: Now that the free list is used sparingly, get rid of the
separate spin lock protecting it.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 4 ++--
net/sunrpc/xprtrdma/verbs.c | 21 ++++++++++-----------
net/sunrpc/xprtrdma/xprt_rdma.h | 9 ++++-----
3 files changed, 16 insertions(+), 18 deletions(-)

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 0e740bae2d80..368cdf3edfc9 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -106,10 +106,10 @@ frwr_mr_recycle_worker(struct work_struct *work)
mr->mr_dir = DMA_NONE;
}

- spin_lock(&r_xprt->rx_buf.rb_mrlock);
+ spin_lock(&r_xprt->rx_buf.rb_lock);
list_del(&mr->mr_all);
r_xprt->rx_stats.mrs_recycled++;
- spin_unlock(&r_xprt->rx_buf.rb_mrlock);
+ spin_unlock(&r_xprt->rx_buf.rb_lock);

frwr_release_mr(mr);
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 69753ec73c36..52444c4d1be2 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -962,10 +962,10 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)

mr->mr_xprt = r_xprt;

- spin_lock(&buf->rb_mrlock);
+ spin_lock(&buf->rb_lock);
list_add(&mr->mr_list, &buf->rb_mrs);
list_add(&mr->mr_all, &buf->rb_all_mrs);
- spin_unlock(&buf->rb_mrlock);
+ spin_unlock(&buf->rb_lock);
}

r_xprt->rx_stats.mrs_allocated += count;
@@ -1084,7 +1084,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)

buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
buf->rb_bc_srv_max_requests = 0;
- spin_lock_init(&buf->rb_mrlock);
spin_lock_init(&buf->rb_lock);
INIT_LIST_HEAD(&buf->rb_mrs);
INIT_LIST_HEAD(&buf->rb_all_mrs);
@@ -1154,18 +1153,18 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
unsigned int count;

count = 0;
- spin_lock(&buf->rb_mrlock);
+ spin_lock(&buf->rb_lock);
while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
struct rpcrdma_mr,
mr_all)) != NULL) {
list_del(&mr->mr_all);
- spin_unlock(&buf->rb_mrlock);
+ spin_unlock(&buf->rb_lock);

frwr_release_mr(mr);
count++;
- spin_lock(&buf->rb_mrlock);
+ spin_lock(&buf->rb_lock);
}
- spin_unlock(&buf->rb_mrlock);
+ spin_unlock(&buf->rb_lock);
r_xprt->rx_stats.mrs_allocated = 0;
}

@@ -1218,9 +1217,9 @@ rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_mr *mr;

- spin_lock(&buf->rb_mrlock);
+ spin_lock(&buf->rb_lock);
mr = rpcrdma_mr_pop(&buf->rb_mrs);
- spin_unlock(&buf->rb_mrlock);
+ spin_unlock(&buf->rb_lock);
return mr;
}

@@ -1249,9 +1248,9 @@ static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;

mr->mr_req = NULL;
- spin_lock(&buf->rb_mrlock);
+ spin_lock(&buf->rb_lock);
rpcrdma_mr_push(mr, &buf->rb_mrs);
- spin_unlock(&buf->rb_mrlock);
+ spin_unlock(&buf->rb_lock);
}

/**
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c375b0e434ac..200d075bbe31 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -360,19 +360,18 @@ rpcrdma_mr_pop(struct list_head *list)
* One of these is associated with a transport instance
*/
struct rpcrdma_buffer {
- spinlock_t rb_mrlock; /* protect rb_mrs list */
+ spinlock_t rb_lock;
+ struct list_head rb_send_bufs;
+ struct list_head rb_recv_bufs;
struct list_head rb_mrs;
- struct list_head rb_all_mrs;

unsigned long rb_sc_head;
unsigned long rb_sc_tail;
unsigned long rb_sc_last;
struct rpcrdma_sendctx **rb_sc_ctxs;

- spinlock_t rb_lock; /* protect buf lists */
- struct list_head rb_send_bufs;
- struct list_head rb_recv_bufs;
struct list_head rb_allreqs;
+ struct list_head rb_all_mrs;

u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */

2019-08-19 22:49:27

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 17/21] xprtrdma: Use an llist to manage free rpcrdma_reps

rpcrdma_rep objects are removed from their free list by only a
single thread: the Receive completion handler. Thus that free list
can be converted to an llist, where a single-threaded consumer and
a multi-threaded producer (rpcrdma_buffer_put) can both access the
llist without the need for any serialization.

This eliminates spin lock contention between the Receive completion
handler and rpcrdma_buffer_get, and makes the rep consumer wait-
free.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 106 ++++++++++++++++++---------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 6 +-
2 files changed, 53 insertions(+), 59 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 52444c4d1be2..db90083ed35b 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -75,6 +75,7 @@
* internal functions
*/
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
@@ -407,7 +408,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
- struct rpcrdma_rep *rep;

cancel_work_sync(&buf->rb_refresh_worker);

@@ -431,8 +431,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
/* The ULP is responsible for ensuring all DMA
* mappings and MRs are gone.
*/
- list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
- rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
+ rpcrdma_reps_destroy(buf);
list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
@@ -1071,6 +1070,40 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
return NULL;
}

+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+{
+ rpcrdma_regbuf_free(rep->rr_rdmabuf);
+ kfree(rep);
+}
+
+static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
+{
+ struct llist_node *node;
+
+ /* Calls to llist_del_first are required to be serialized */
+ node = llist_del_first(&buf->rb_free_reps);
+ if (!node)
+ return NULL;
+ return llist_entry(node, struct rpcrdma_rep, rr_node);
+}
+
+static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
+ struct rpcrdma_rep *rep)
+{
+ if (!rep->rr_temp)
+ llist_add(&rep->rr_node, &buf->rb_free_reps);
+ else
+ rpcrdma_rep_destroy(rep);
+}
+
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_rep *rep;
+
+ while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
+ rpcrdma_rep_destroy(rep);
+}
+
/**
* rpcrdma_buffer_create - Create initial set of req/rep objects
* @r_xprt: transport instance to (re)initialize
@@ -1106,7 +1139,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
}

buf->rb_credits = 1;
- INIT_LIST_HEAD(&buf->rb_recv_bufs);
+ init_llist_head(&buf->rb_free_reps);

rc = rpcrdma_sendctxs_create(r_xprt);
if (rc)
@@ -1118,12 +1151,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
return rc;
}

-static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
-{
- rpcrdma_regbuf_free(rep->rr_rdmabuf);
- kfree(rep);
-}
-
/**
* rpcrdma_req_destroy - Destroy an rpcrdma_req object
* @req: unused object to be destroyed
@@ -1182,15 +1209,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
cancel_work_sync(&buf->rb_refresh_worker);

rpcrdma_sendctxs_destroy(buf);
-
- while (!list_empty(&buf->rb_recv_bufs)) {
- struct rpcrdma_rep *rep;
-
- rep = list_first_entry(&buf->rb_recv_bufs,
- struct rpcrdma_rep, rr_list);
- list_del(&rep->rr_list);
- rpcrdma_rep_destroy(rep);
- }
+ rpcrdma_reps_destroy(buf);

while (!list_empty(&buf->rb_send_bufs)) {
struct rpcrdma_req *req;
@@ -1281,39 +1300,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
*/
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
- struct rpcrdma_rep *rep = req->rl_reply;
-
+ if (req->rl_reply)
+ rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;

spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs);
- if (rep) {
- if (!rep->rr_temp) {
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
- rep = NULL;
- }
- }
spin_unlock(&buffers->rb_lock);
- if (rep)
- rpcrdma_rep_destroy(rep);
}

-/*
- * Put reply buffers back into pool when not attached to
- * request. This happens in error conditions.
+/**
+ * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
+ * @rep: rep to release
+ *
+ * Used after error conditions.
*/
-void
-rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
- struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-
- if (!rep->rr_temp) {
- spin_lock(&buffers->rb_lock);
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
- spin_unlock(&buffers->rb_lock);
- } else {
- rpcrdma_rep_destroy(rep);
- }
+ rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
}

/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
@@ -1469,22 +1473,10 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)

/* fast path: all needed reps can be found on the free list */
wr = NULL;
- spin_lock(&buf->rb_lock);
while (needed) {
- rep = list_first_entry_or_null(&buf->rb_recv_bufs,
- struct rpcrdma_rep, rr_list);
+ rep = rpcrdma_rep_get_locked(buf);
if (!rep)
- break;
-
- list_del(&rep->rr_list);
- rep->rr_recv_wr.next = wr;
- wr = &rep->rr_recv_wr;
- --needed;
- }
- spin_unlock(&buf->rb_lock);
-
- while (needed) {
- rep = rpcrdma_rep_create(r_xprt, temp);
+ rep = rpcrdma_rep_create(r_xprt, temp);
if (!rep)
break;

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 200d075bbe31..bd1befa83d24 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -47,6 +47,7 @@
#include <linux/atomic.h> /* atomic_t, etc */
#include <linux/kref.h> /* struct kref */
#include <linux/workqueue.h> /* struct work_struct */
+#include <linux/llist.h>

#include <rdma/rdma_cm.h> /* RDMA connection api */
#include <rdma/ib_verbs.h> /* RDMA verbs api */
@@ -200,7 +201,7 @@ struct rpcrdma_rep {
struct rpc_rqst *rr_rqst;
struct xdr_buf rr_hdrbuf;
struct xdr_stream rr_stream;
- struct list_head rr_list;
+ struct llist_node rr_node;
struct ib_recv_wr rr_recv_wr;
};

@@ -362,7 +363,6 @@ rpcrdma_mr_pop(struct list_head *list)
struct rpcrdma_buffer {
spinlock_t rb_lock;
struct list_head rb_send_bufs;
- struct list_head rb_recv_bufs;
struct list_head rb_mrs;

unsigned long rb_sc_head;
@@ -373,6 +373,8 @@ struct rpcrdma_buffer {
struct list_head rb_allreqs;
struct list_head rb_all_mrs;

+ struct llist_head rb_free_reps;
+
u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */


2019-08-19 22:50:16

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 18/21] xprtrdma: Clean up xprt_rdma_set_connect_timeout()

Clean up: The function name should match the documenting comment.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index f4763e8a6761..993b96ff6760 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -494,9 +494,9 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
* @reconnect_timeout: reconnect timeout after server disconnects
*
*/
-static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt,
- unsigned long connect_timeout,
- unsigned long reconnect_timeout)
+static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt,
+ unsigned long connect_timeout,
+ unsigned long reconnect_timeout)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);

@@ -805,7 +805,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = {
.send_request = xprt_rdma_send_request,
.close = xprt_rdma_close,
.destroy = xprt_rdma_destroy,
- .set_connect_timeout = xprt_rdma_tcp_set_connect_timeout,
+ .set_connect_timeout = xprt_rdma_set_connect_timeout,
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,

2019-08-19 22:50:54

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 19/21] xprtrdma: Fix bc_max_slots return value

For the moment the returned value just happens to be correct because
the current backchannel server implementation does not vary the
number of credits it offers. The spec does permit this value to
change during the lifetime of a connection, however.

The actual maximum is fixed for all RPC/RDMA transports, because
each transport instance has to pre-allocate the resources for
processing BC requests. That's the value that should be returned.

Fixes: 7402a4fedc2b ("SUNRPC: Fix up backchannel slot table ... ")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 59e624b1d7a0..50e075fcdd8f 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -54,9 +54,7 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)

unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
{
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
-
- return r_xprt->rx_buf.rb_bc_srv_max_requests;
+ return RPCRDMA_BACKWARD_WRS >> 1;
}

static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)

2019-08-19 22:51:58

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 20/21] xprtrdma: Inline XDR chunk encoder functions

Micro-optimization: Save the cost of three function calls during
transport header encoding.

These were "noinline" before to generate more meaningful call stacks
during debugging, but this code is now pretty stable.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 21 ++++++++++++---------
1 file changed, 12 insertions(+), 9 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index ffeb4dfebd46..67e1684aee6d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -382,9 +382,10 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
*
* Only a single @pos value is currently supported.
*/
-static noinline int
-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
+static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype rtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -436,9 +437,10 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
*
* Only a single Write chunk is currently supported.
*/
-static noinline int
-rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -498,9 +500,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
* Returns zero on success, or a negative errno if a failure occurred.
* @xdr is advanced to the next position in the stream.
*/
-static noinline int
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;

2019-08-19 22:52:28

by Chuck Lever

[permalink] [raw]
Subject: [PATCH v2 21/21] xprtrdma: Optimize rpcrdma_post_recvs()

Micro-optimization: In rpcrdma_post_recvs, since commit e340c2d6ef2a
("xprtrdma: Reduce the doorbell rate (Receive)"), the common case is
to return without doing anything. Found with perf.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index db90083ed35b..ac2abf4578b9 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1465,7 +1465,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
count = 0;

needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
- if (ep->rep_receive_count > needed)
+ if (likely(ep->rep_receive_count > needed))
goto out;
needed -= ep->rep_receive_count;
if (!temp)

2019-08-23 08:01:33

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v2 00/21] NFS/RDMA client-side for-5.4

Thanks, Chuck! I've pushed these out to my linux-next branch.

Anna

On Mon, 2019-08-19 at 18:35 -0400, Chuck Lever wrote:
> Hi Anna-
>
> These feel like they are ready for you to merge.
>
> Changes since v1:
> - Rebased on v5.3-rc5
> - Improved "Boost maximum transport header size"
> - Minor clarifications
>
> ---
>
> Chuck Lever (21):
> SUNRPC: Remove rpc_wake_up_queued_task_on_wq()
> SUNRPC: Inline xdr_commit_encode
> xprtrdma: Refresh the documenting comment in frwr_ops.c
> xprtrdma: Update obsolete comment
> xprtrdma: Fix calculation of ri_max_segs again
> xprtrdma: Boost maximum transport header size
> xprtrdma: Boost client's max slot table size to match Linux
> server
> xprtrdma: Rename CQE field in Receive trace points
> xprtrdma: Rename rpcrdma_buffer::rb_all
> xprtrdma: Toggle XPRT_CONGESTED in xprtrdma's slot methods
> xprtrdma: Simplify rpcrdma_mr_pop
> xprtrdma: Combine rpcrdma_mr_put and rpcrdma_mr_unmap_and_put
> xprtrdma: Move rpcrdma_mr_get out of frwr_map
> xprtrdma: Ensure creating an MR does not trigger FS writeback
> xprtrdma: Cache free MRs in each rpcrdma_req
> xprtrdma: Remove rpcrdma_buffer::rb_mrlock
> xprtrdma: Use an llist to manage free rpcrdma_reps
> xprtrdma: Clean up xprt_rdma_set_connect_timeout()
> xprtrdma: Fix bc_max_slots return value
> xprtrdma: Inline XDR chunk encoder functions
> xprtrdma: Optimize rpcrdma_post_recvs()
>
>
> include/linux/sunrpc/sched.h | 3
> include/linux/sunrpc/xprtrdma.h | 4 -
> include/trace/events/rpcrdma.h | 88 ++++++++++++--
> net/sunrpc/sched.c | 27 +---
> net/sunrpc/xdr.c | 2
> net/sunrpc/xprtrdma/backchannel.c | 4 -
> net/sunrpc/xprtrdma/frwr_ops.c | 131 +++++++--------------
> net/sunrpc/xprtrdma/rpc_rdma.c | 63 +++++++---
> net/sunrpc/xprtrdma/transport.c | 12 +-
> net/sunrpc/xprtrdma/verbs.c | 235 ++++++++++++++++-----------
> ----------
> net/sunrpc/xprtrdma/xprt_rdma.h | 58 ++++-----
> 11 files changed, 305 insertions(+), 322 deletions(-)
>
> --
> Chuck Lever