2015-01-07 23:11:48

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 00/20] NFS/RDMA client for 3.20

The following series of patches for the Linux NFS client breaks up
the per-transport buffer pool data structures to help them scale
better with the size of SEND/RECV buffers (the inline threshold),
the maximum NFS r/wsize, and the number of RDMA credits (concurrent
RPC requests).

The primary change is that the header send buffers have been split
from struct rpcrdma_req. This has a number of benefits which are
outlined in the patch descriptions.

More pre-requisites are required. Actual changes to the maximum
r/wsize and other limits is left for a future merge window.

---

Chuck Lever (20):
xprtrdma: human-readable completion status
xprtrdma: Modernize htonl and ntohl
xprtrdma: Display XIDs in host byte order
xprtrdma: Clean up hdrlen
xprtrdma: Rename "xprt" and "rdma_connect" fields in struct rpcrdma_xprt
xprtrdma: Remove rpcrdma_ep::rep_ia
xprtrdma: Remove rl_mr field, and the mr_chunk union
xprtrdma: Move credit update to RPC reply handler
xprtrdma: Remove rpcrdma_ep::rep_func and ::rep_xprt
xprtrdma: Free the pd if ib_query_qp() fails
xprtrdma: Take struct ib_device_attr off the stack
xprtrdma: Take struct ib_qp_attr and ib_qp_init_attr off the stack
xprtrdma: Simplify synopsis of rpcrdma_buffer_create()
xprtrdma: Refactor rpcrdma_buffer_create() and rpcrdma_buffer_destroy()
xprtrdma: Add struct rpcrdma_regbuf and helpers
xprtrdma: Allocate RPC send buffer separately from struct rpcrdma_req
xprtrdma: Allocate RDMA/RPC send buffer separately from struct rpcrdma_req
xprtrdma: Allocate RPC/RDMA receive buffer separately from struct rpcrdma_rep
xprtrdma: Allocate zero pad separately from rpcrdma_buffer
xprtrdma: Clean up after adding regbuf management


include/linux/sunrpc/rpc_rdma.h | 14 +
include/linux/sunrpc/svc_rdma.h | 2
net/sunrpc/xprtrdma/rpc_rdma.c | 108 ++++++----
net/sunrpc/xprtrdma/transport.c | 179 +++++++----------
net/sunrpc/xprtrdma/verbs.c | 411 ++++++++++++++++++++++++---------------
net/sunrpc/xprtrdma/xprt_rdma.h | 111 +++++++----
6 files changed, 478 insertions(+), 347 deletions(-)

--
Chuck Lever


2015-01-07 23:11:56

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 01/20] xprtrdma: human-readable completion status

Make it easier to grep the system log for specific error conditions.

The wc.opcode field is not included because opcode numbers are
sparse, and because wc.opcode is not necessarily valid when
completion reports an error.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 70 +++++++++++++++++++++++++++++++++++--------
1 file changed, 57 insertions(+), 13 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c98e406..56f705d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -173,18 +173,54 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
}
}

+static const char * const wc_status[] = {
+ "success",
+ "local length error",
+ "local QP operation error",
+ "local EE context operation error",
+ "local protection error",
+ "WR flushed",
+ "memory management operation error",
+ "bad response error",
+ "local access error",
+ "remote invalid request error",
+ "remote access error",
+ "remote operation error",
+ "transport retry counter exceeded",
+ "RNR retrycounter exceeded",
+ "local RDD violation error",
+ "remove invalid RD request",
+ "operation aborted",
+ "invalid EE context number",
+ "invalid EE context state",
+ "fatal error",
+ "response timeout error",
+ "general error",
+};
+
+#define COMPLETION_MSG(status) \
+ ((status) < ARRAY_SIZE(wc_status) ? \
+ wc_status[(status)] : "unexpected completion error")
+
static void
rpcrdma_sendcq_process_wc(struct ib_wc *wc)
{
- struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+ if (likely(wc->status == IB_WC_SUCCESS))
+ return;

- dprintk("RPC: %s: frmr %p status %X opcode %d\n",
- __func__, frmr, wc->status, wc->opcode);
+ /* WARNING: Only wr_id and status are reliable at this point */
+ if (wc->wr_id == 0ULL) {
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("RPC: %s: SEND: %s\n",
+ __func__, COMPLETION_MSG(wc->status));
+ } else {
+ struct rpcrdma_mw *r;

- if (wc->wr_id == 0ULL)
- return;
- if (wc->status != IB_WC_SUCCESS)
- frmr->r.frmr.fr_state = FRMR_IS_STALE;
+ r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+ r->r.frmr.fr_state = FRMR_IS_STALE;
+ pr_err("RPC: %s: frmr %p (stale): %s\n",
+ __func__, r, COMPLETION_MSG(wc->status));
+ }
}

static int
@@ -248,16 +284,17 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;

- dprintk("RPC: %s: rep %p status %X opcode %X length %u\n",
- __func__, rep, wc->status, wc->opcode, wc->byte_len);
+ /* WARNING: Only wr_id and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS)
+ goto out_fail;

- if (wc->status != IB_WC_SUCCESS) {
- rep->rr_len = ~0U;
- goto out_schedule;
- }
+ /* status == SUCCESS means all fields in wc are trustworthy */
if (wc->opcode != IB_WC_RECV)
return;

+ dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
+ __func__, rep, wc->byte_len);
+
rep->rr_len = wc->byte_len;
ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
@@ -275,6 +312,13 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)

out_schedule:
list_add_tail(&rep->rr_list, sched_list);
+ return;
+out_fail:
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("RPC: %s: rep %p: %s\n",
+ __func__, rep, COMPLETION_MSG(wc->status));
+ rep->rr_len = ~0U;
+ goto out_schedule;
}

static int


2015-01-07 23:12:05

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 02/20] xprtrdma: Modernize htonl and ntohl

Clean up: Replace htonl and ntohl with the be32 equivalents.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 9 +++++++
include/linux/sunrpc/svc_rdma.h | 2 --
net/sunrpc/xprtrdma/rpc_rdma.c | 48 +++++++++++++++++++++------------------
3 files changed, 35 insertions(+), 24 deletions(-)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index b78f16b..1578ed2 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -42,6 +42,9 @@

#include <linux/types.h>

+#define RPCRDMA_VERSION 1
+#define rpcrdma_version cpu_to_be32(RPCRDMA_VERSION)
+
struct rpcrdma_segment {
__be32 rs_handle; /* Registered memory handle */
__be32 rs_length; /* Length of the chunk in bytes */
@@ -115,4 +118,10 @@ enum rpcrdma_proc {
RDMA_ERROR = 4 /* An RPC RDMA encoding error */
};

+#define rdma_msg cpu_to_be32(RDMA_MSG)
+#define rdma_nomsg cpu_to_be32(RDMA_NOMSG)
+#define rdma_msgp cpu_to_be32(RDMA_MSGP)
+#define rdma_done cpu_to_be32(RDMA_DONE)
+#define rdma_error cpu_to_be32(RDMA_ERROR)
+
#endif /* _LINUX_SUNRPC_RPC_RDMA_H */
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 975da75..ddfe88f 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -63,8 +63,6 @@ extern atomic_t rdma_stat_rq_prod;
extern atomic_t rdma_stat_sq_poll;
extern atomic_t rdma_stat_sq_prod;

-#define RPCRDMA_VERSION 1
-
/*
* Contexts are built when an RDMA request is created and are a
* record of the resources that can be recovered when the request
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index df01d12..a6fb30b 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -209,9 +209,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
if (cur_rchunk) { /* read */
cur_rchunk->rc_discrim = xdr_one;
/* all read chunks have the same "position" */
- cur_rchunk->rc_position = htonl(pos);
- cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
- cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
+ cur_rchunk->rc_position = cpu_to_be32(pos);
+ cur_rchunk->rc_target.rs_handle =
+ cpu_to_be32(seg->mr_rkey);
+ cur_rchunk->rc_target.rs_length =
+ cpu_to_be32(seg->mr_len);
xdr_encode_hyper(
(__be32 *)&cur_rchunk->rc_target.rs_offset,
seg->mr_base);
@@ -222,8 +224,10 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
cur_rchunk++;
r_xprt->rx_stats.read_chunk_count++;
} else { /* write/reply */
- cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
- cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
+ cur_wchunk->wc_target.rs_handle =
+ cpu_to_be32(seg->mr_rkey);
+ cur_wchunk->wc_target.rs_length =
+ cpu_to_be32(seg->mr_len);
xdr_encode_hyper(
(__be32 *)&cur_wchunk->wc_target.rs_offset,
seg->mr_base);
@@ -257,7 +261,7 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
*iptr++ = xdr_zero; /* encode a NULL reply chunk */
} else {
warray->wc_discrim = xdr_one;
- warray->wc_nchunks = htonl(nchunks);
+ warray->wc_nchunks = cpu_to_be32(nchunks);
iptr = (__be32 *) cur_wchunk;
if (type == rpcrdma_writech) {
*iptr++ = xdr_zero; /* finish the write chunk list */
@@ -404,11 +408,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)

/* build RDMA header in private area at front */
headerp = (struct rpcrdma_msg *) req->rl_base;
- /* don't htonl XID, it's already done in request */
+ /* don't byte-swap XID, it's already done in request */
headerp->rm_xid = rqst->rq_xid;
- headerp->rm_vers = xdr_one;
- headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
- headerp->rm_type = htonl(RDMA_MSG);
+ headerp->rm_vers = rpcrdma_version;
+ headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
+ headerp->rm_type = rdma_msg;

/*
* Chunks needed for results?
@@ -482,11 +486,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
RPCRDMA_INLINE_PAD_VALUE(rqst));

if (padlen) {
- headerp->rm_type = htonl(RDMA_MSGP);
+ headerp->rm_type = rdma_msgp;
headerp->rm_body.rm_padded.rm_align =
- htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
+ cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
headerp->rm_body.rm_padded.rm_thresh =
- htonl(RPCRDMA_INLINE_PAD_THRESH);
+ cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
@@ -570,7 +574,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
unsigned int i, total_len;
struct rpcrdma_write_chunk *cur_wchunk;

- i = ntohl(**iptrp); /* get array count */
+ i = be32_to_cpu(**iptrp);
if (i > max)
return -1;
cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
@@ -582,11 +586,11 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n",
__func__,
- ntohl(seg->rs_length),
+ be32_to_cpu(seg->rs_length),
(unsigned long long)off,
- ntohl(seg->rs_handle));
+ be32_to_cpu(seg->rs_handle));
}
- total_len += ntohl(seg->rs_length);
+ total_len += be32_to_cpu(seg->rs_length);
++cur_wchunk;
}
/* check and adjust for properly terminated write chunk */
@@ -749,9 +753,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
goto repost;
}
headerp = (struct rpcrdma_msg *) rep->rr_base;
- if (headerp->rm_vers != xdr_one) {
+ if (headerp->rm_vers != rpcrdma_version) {
dprintk("RPC: %s: invalid version %d\n",
- __func__, ntohl(headerp->rm_vers));
+ __func__, be32_to_cpu(headerp->rm_vers));
goto repost;
}

@@ -793,7 +797,7 @@ repost:
/* check for expected message types */
/* The order of some of these tests is important. */
switch (headerp->rm_type) {
- case htonl(RDMA_MSG):
+ case rdma_msg:
/* never expect read chunks */
/* never expect reply chunks (two ways to check) */
/* never expect write chunks without having offered RDMA */
@@ -832,7 +836,7 @@ repost:
rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
break;

- case htonl(RDMA_NOMSG):
+ case rdma_nomsg:
/* never expect read or write chunks, always reply chunks */
if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
headerp->rm_body.rm_chunks[1] != xdr_zero ||
@@ -853,7 +857,7 @@ badheader:
dprintk("%s: invalid rpcrdma reply header (type %d):"
" chunks[012] == %d %d %d"
" expected chunks <= %d\n",
- __func__, ntohl(headerp->rm_type),
+ __func__, be32_to_cpu(headerp->rm_type),
headerp->rm_body.rm_chunks[0],
headerp->rm_body.rm_chunks[1],
headerp->rm_body.rm_chunks[2],


2015-01-07 23:12:13

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 03/20] xprtrdma: Display XIDs in host byte order

xprtsock.c and the backchannel code display XIDs in host byte order.
Follow suit in xprtrdma.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index a6fb30b..150dd76 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -766,7 +766,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
spin_unlock(&xprt->transport_lock);
dprintk("RPC: %s: reply 0x%p failed "
"to match any request xid 0x%08x len %d\n",
- __func__, rep, headerp->rm_xid, rep->rr_len);
+ __func__, rep, be32_to_cpu(headerp->rm_xid),
+ rep->rr_len);
repost:
r_xprt->rx_stats.bad_reply_count++;
rep->rr_func = rpcrdma_reply_handler;
@@ -782,13 +783,14 @@ repost:
spin_unlock(&xprt->transport_lock);
dprintk("RPC: %s: duplicate reply 0x%p to RPC "
"request 0x%p: xid 0x%08x\n", __func__, rep, req,
- headerp->rm_xid);
+ be32_to_cpu(headerp->rm_xid));
goto repost;
}

dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
" RPC request 0x%p xid 0x%08x\n",
- __func__, rep, req, rqst, headerp->rm_xid);
+ __func__, rep, req, rqst,
+ be32_to_cpu(headerp->rm_xid));

/* from here on, the reply is no longer an orphan */
req->rl_reply = rep;


2015-01-07 23:12:21

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 04/20] xprtrdma: Clean up hdrlen

Clean up: Replace naked integers with a documenting macro.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 5 ++++-
net/sunrpc/xprtrdma/rpc_rdma.c | 12 +++++++-----
2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index 1578ed2..f33c5a4 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -98,7 +98,10 @@ struct rpcrdma_msg {
} rm_body;
};

-#define RPCRDMA_HDRLEN_MIN 28
+/*
+ * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
+ */
+#define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7)

enum rpcrdma_errcode {
ERR_VERS = 1,
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 150dd76..dcf5ebc 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -472,7 +472,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
return -EIO;
}

- hdrlen = 28; /*sizeof *headerp;*/
+ hdrlen = RPCRDMA_HDRLEN_MIN;
padlen = 0;

/*
@@ -748,7 +748,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
}
return;
}
- if (rep->rr_len < 28) {
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
}
@@ -830,8 +830,9 @@ repost:
} else {
/* else ordinary inline */
rdmalen = 0;
- iptr = (__be32 *)((unsigned char *)headerp + 28);
- rep->rr_len -= 28; /*sizeof *headerp;*/
+ iptr = (__be32 *)((unsigned char *)headerp +
+ RPCRDMA_HDRLEN_MIN);
+ rep->rr_len -= RPCRDMA_HDRLEN_MIN;
status = rep->rr_len;
}
/* Fix up the rpc results for upper layer */
@@ -845,7 +846,8 @@ repost:
headerp->rm_body.rm_chunks[2] != xdr_one ||
req->rl_nchunks == 0)
goto badheader;
- iptr = (__be32 *)((unsigned char *)headerp + 28);
+ iptr = (__be32 *)((unsigned char *)headerp +
+ RPCRDMA_HDRLEN_MIN);
rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
if (rdmalen < 0)
goto badheader;


2015-01-07 23:12:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 05/20] xprtrdma: Rename "xprt" and "rdma_connect" fields in struct rpcrdma_xprt

Clean up: Use consistent field names in struct rpcrdma_xprt.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 19 ++++++++++---------
net/sunrpc/xprtrdma/xprt_rdma.h | 6 +++---
2 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index bbd6155..ee57513 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -200,9 +200,9 @@ xprt_rdma_free_addresses(struct rpc_xprt *xprt)
static void
xprt_rdma_connect_worker(struct work_struct *work)
{
- struct rpcrdma_xprt *r_xprt =
- container_of(work, struct rpcrdma_xprt, rdma_connect.work);
- struct rpc_xprt *xprt = &r_xprt->xprt;
+ struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
+ rx_connect_worker.work);
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
int rc = 0;

xprt_clear_connected(xprt);
@@ -235,7 +235,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)

dprintk("RPC: %s: called\n", __func__);

- cancel_delayed_work_sync(&r_xprt->rdma_connect);
+ cancel_delayed_work_sync(&r_xprt->rx_connect_worker);

xprt_clear_connected(xprt);

@@ -374,7 +374,8 @@ xprt_setup_rdma(struct xprt_create *args)
* connection loss notification is async. We also catch connection loss
* when reaping receives.
*/
- INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
+ INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
+ xprt_rdma_connect_worker);
new_ep->rep_func = rpcrdma_conn_func;
new_ep->rep_xprt = xprt;

@@ -434,17 +435,17 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)

if (r_xprt->rx_ep.rep_connected != 0) {
/* Reconnect */
- schedule_delayed_work(&r_xprt->rdma_connect,
- xprt->reestablish_timeout);
+ schedule_delayed_work(&r_xprt->rx_connect_worker,
+ xprt->reestablish_timeout);
xprt->reestablish_timeout <<= 1;
if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
} else {
- schedule_delayed_work(&r_xprt->rdma_connect, 0);
+ schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
if (!RPC_IS_ASYNC(task))
- flush_delayed_work(&r_xprt->rdma_connect);
+ flush_delayed_work(&r_xprt->rx_connect_worker);
}
}

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index b799041..9a7aab3 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -318,16 +318,16 @@ struct rpcrdma_stats {
* during unmount.
*/
struct rpcrdma_xprt {
- struct rpc_xprt xprt;
+ struct rpc_xprt rx_xprt;
struct rpcrdma_ia rx_ia;
struct rpcrdma_ep rx_ep;
struct rpcrdma_buffer rx_buf;
struct rpcrdma_create_data_internal rx_data;
- struct delayed_work rdma_connect;
+ struct delayed_work rx_connect_worker;
struct rpcrdma_stats rx_stats;
};

-#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, xprt)
+#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt)
#define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)

/* Setting this to 0 ensures interoperability with early servers.


2015-01-07 23:12:39

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 06/20] xprtrdma: Remove rpcrdma_ep::rep_ia

Clean up: This field is not used.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 1 -
net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
2 files changed, 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 56f705d..56e14b3 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -825,7 +825,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
else if (ep->rep_cqinit <= 2)
ep->rep_cqinit = 0;
INIT_CQCOUNT(ep);
- ep->rep_ia = ia;
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 9a7aab3..5160a84 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -83,7 +83,6 @@ struct rpcrdma_ep {
atomic_t rep_cqcount;
int rep_cqinit;
int rep_connected;
- struct rpcrdma_ia *rep_ia;
struct ib_qp_init_attr rep_attr;
wait_queue_head_t rep_connect_wait;
struct ib_sge rep_pad; /* holds zeroed pad */


2015-01-07 23:12:46

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 07/20] xprtrdma: Remove rl_mr field, and the mr_chunk union

Clean up.

Since commit 0ac531c18323 ("xprtrdma: Remove REGISTER memory
registration mode"), the rl_mr pointer is no longer used anywhere.

After removal, there's only a single member of the mr_chunk union,
so mr_chunk can be removed as well, in favor of a single pointer
field.

Fixes: 0ac531c18323 ("xprtrdma: Remove REGISTER memory ...")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 25 ++++++++++++-------------
net/sunrpc/xprtrdma/xprt_rdma.h | 5 +----
2 files changed, 13 insertions(+), 17 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 56e14b3..1000f63 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1493,8 +1493,8 @@ rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
int i;

for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
- rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
- rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
+ rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
+ rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
}

static void
@@ -1580,7 +1580,7 @@ rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
list_add(&r->mw_list, stale);
continue;
}
- req->rl_segments[i].mr_chunk.rl_mw = r;
+ req->rl_segments[i].rl_mw = r;
if (unlikely(i-- == 0))
return req; /* Success */
}
@@ -1602,7 +1602,7 @@ rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
r = list_entry(buf->rb_mws.next,
struct rpcrdma_mw, mw_list);
list_del(&r->mw_list);
- req->rl_segments[i].mr_chunk.rl_mw = r;
+ req->rl_segments[i].rl_mw = r;
if (unlikely(i-- == 0))
return req; /* Success */
}
@@ -1842,7 +1842,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_mr_seg *seg1 = seg;
- struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
+ struct rpcrdma_mw *mw = seg1->rl_mw;
struct rpcrdma_frmr *frmr = &mw->r.frmr;
struct ib_mr *mr = frmr->fr_mr;
struct ib_send_wr fastreg_wr, *bad_wr;
@@ -1931,12 +1931,12 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
struct ib_send_wr invalidate_wr, *bad_wr;
int rc;

- seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
+ seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;

memset(&invalidate_wr, 0, sizeof invalidate_wr);
- invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
+ invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
invalidate_wr.opcode = IB_WR_LOCAL_INV;
- invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+ invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);

read_lock(&ia->ri_qplock);
@@ -1946,7 +1946,7 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
read_unlock(&ia->ri_qplock);
if (rc) {
/* Force rpcrdma_buffer_get() to retry */
- seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
+ seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
dprintk("RPC: %s: failed ib_post_send for invalidate,"
" status %i\n", __func__, rc);
}
@@ -1978,8 +1978,7 @@ rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
break;
}
- rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
- physaddrs, i, seg1->mr_dma);
+ rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma);
if (rc) {
dprintk("RPC: %s: failed ib_map_phys_fmr "
"%u@0x%llx+%i (%d)... status %i\n", __func__,
@@ -1988,7 +1987,7 @@ rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
while (i--)
rpcrdma_unmap_one(ia, --seg);
} else {
- seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
+ seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
seg1->mr_len = len;
@@ -2005,7 +2004,7 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
LIST_HEAD(l);
int rc;

- list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
+ list_add(&seg1->rl_mw->r.fmr->list, &l);
rc = ib_unmap_fmr(&l);
read_lock(&ia->ri_qplock);
while (seg1->mr_nsegs--)
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 5160a84..532d586 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -210,10 +210,7 @@ struct rpcrdma_mw {
*/

struct rpcrdma_mr_seg { /* chunk descriptors */
- union { /* chunk memory handles */
- struct ib_mr *rl_mr; /* if registered directly */
- struct rpcrdma_mw *rl_mw; /* if registered from region */
- } mr_chunk;
+ struct rpcrdma_mw *rl_mw; /* registered MR */
u64 mr_base; /* registration result */
u32 mr_rkey; /* registration result */
u32 mr_len; /* length of chunk or segment */


2015-01-07 23:12:54

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 08/20] xprtrdma: Move credit update to RPC reply handler

Reduce work in the receive CQ handler, which is run at hardware
interrupt level, by moving the RPC/RDMA credit update logic to the
RPC reply handler.

This has some additional benefits: More header sanity checking is
done before trusting the incoming credit value, and the receive CQ
handler no longer touches the RPC/RDMA header. Finally, no longer
any need to update and read rb_credits atomically, so the rb_credits
field can be removed.

This further extends work begun by commit e7ce710a8802 ("xprtrdma:
Avoid deadlock when credit window is reset").

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 10 ++++++++--
net/sunrpc/xprtrdma/verbs.c | 15 ++-------------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
3 files changed, 10 insertions(+), 16 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index dcf5ebc..d731010 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -736,7 +736,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpc_xprt *xprt = rep->rr_xprt;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
__be32 *iptr;
- int rdmalen, status;
+ int credits, rdmalen, status;
unsigned long cwnd;

/* Check status. If bad, signal disconnect and return rep to pool */
@@ -871,8 +871,14 @@ badheader:
break;
}

+ credits = be32_to_cpu(headerp->rm_credit);
+ if (credits == 0)
+ credits = 1; /* don't deadlock */
+ else if (credits > r_xprt->rx_buf.rb_max_requests)
+ credits = r_xprt->rx_buf.rb_max_requests;
+
cwnd = xprt->cwnd;
- xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
+ xprt->cwnd = credits << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 1000f63..71a071a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -49,6 +49,7 @@

#include <linux/interrupt.h>
#include <linux/slab.h>
+#include <linux/prefetch.h>
#include <asm/bitops.h>

#include "xprt_rdma.h"
@@ -298,17 +299,7 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
rep->rr_len = wc->byte_len;
ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
-
- if (rep->rr_len >= 16) {
- struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
- unsigned int credits = ntohl(p->rm_credit);
-
- if (credits == 0)
- credits = 1; /* don't deadlock */
- else if (credits > rep->rr_buffer->rb_max_requests)
- credits = rep->rr_buffer->rb_max_requests;
- atomic_set(&rep->rr_buffer->rb_credits, credits);
- }
+ prefetch(rep->rr_base);

out_schedule:
list_add_tail(&rep->rr_list, sched_list);
@@ -480,7 +471,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
case RDMA_CM_EVENT_DEVICE_REMOVAL:
connstate = -ENODEV;
connected:
- atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
dprintk("RPC: %s: %sconnected\n",
__func__, connstate > 0 ? "" : "dis");
ep->rep_connected = connstate;
@@ -1186,7 +1176,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,

buf->rb_max_requests = cdata->max_requests;
spin_lock_init(&buf->rb_lock);
- atomic_set(&buf->rb_credits, 1);

/* Need to allocate:
* 1. arrays for send and recv pointers
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 532d586..3fcc92b 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -248,7 +248,6 @@ struct rpcrdma_req {
*/
struct rpcrdma_buffer {
spinlock_t rb_lock; /* protects indexes */
- atomic_t rb_credits; /* most recent server credits */
int rb_max_requests;/* client max requests */
struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */
struct list_head rb_all;


2015-01-08 15:53:25

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v1 08/20] xprtrdma: Move credit update to RPC reply handler

Hey Chuck,

On 01/07/2015 06:12 PM, Chuck Lever wrote:
> Reduce work in the receive CQ handler, which is run at hardware
> interrupt level, by moving the RPC/RDMA credit update logic to the
> RPC reply handler.
>
> This has some additional benefits: More header sanity checking is
> done before trusting the incoming credit value, and the receive CQ
> handler no longer touches the RPC/RDMA header. Finally, no longer
> any need to update and read rb_credits atomically, so the rb_credits
> field can be removed.
>
> This further extends work begun by commit e7ce710a8802 ("xprtrdma:
> Avoid deadlock when credit window is reset").
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/rpc_rdma.c | 10 ++++++++--
> net/sunrpc/xprtrdma/verbs.c | 15 ++-------------
> net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
> 3 files changed, 10 insertions(+), 16 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index dcf5ebc..d731010 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -736,7 +736,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> struct rpc_xprt *xprt = rep->rr_xprt;
> struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> __be32 *iptr;
> - int rdmalen, status;
> + int credits, rdmalen, status;
> unsigned long cwnd;
>
> /* Check status. If bad, signal disconnect and return rep to pool */
> @@ -871,8 +871,14 @@ badheader:
> break;
> }
>
> + credits = be32_to_cpu(headerp->rm_credit);
> + if (credits == 0)
> + credits = 1; /* don't deadlock */
> + else if (credits > r_xprt->rx_buf.rb_max_requests)
> + credits = r_xprt->rx_buf.rb_max_requests;

Can rb_max_requests ever drop to 0?

Anna

> +
> cwnd = xprt->cwnd;
> - xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
> + xprt->cwnd = credits << RPC_CWNDSHIFT;
> if (xprt->cwnd > cwnd)
> xprt_release_rqst_cong(rqst->rq_task);
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 1000f63..71a071a 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -49,6 +49,7 @@
>
> #include <linux/interrupt.h>
> #include <linux/slab.h>
> +#include <linux/prefetch.h>
> #include <asm/bitops.h>
>
> #include "xprt_rdma.h"
> @@ -298,17 +299,7 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
> rep->rr_len = wc->byte_len;
> ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
> rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
> -
> - if (rep->rr_len >= 16) {
> - struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
> - unsigned int credits = ntohl(p->rm_credit);
> -
> - if (credits == 0)
> - credits = 1; /* don't deadlock */
> - else if (credits > rep->rr_buffer->rb_max_requests)
> - credits = rep->rr_buffer->rb_max_requests;
> - atomic_set(&rep->rr_buffer->rb_credits, credits);
> - }
> + prefetch(rep->rr_base);
>
> out_schedule:
> list_add_tail(&rep->rr_list, sched_list);
> @@ -480,7 +471,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
> case RDMA_CM_EVENT_DEVICE_REMOVAL:
> connstate = -ENODEV;
> connected:
> - atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
> dprintk("RPC: %s: %sconnected\n",
> __func__, connstate > 0 ? "" : "dis");
> ep->rep_connected = connstate;
> @@ -1186,7 +1176,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
>
> buf->rb_max_requests = cdata->max_requests;
> spin_lock_init(&buf->rb_lock);
> - atomic_set(&buf->rb_credits, 1);
>
> /* Need to allocate:
> * 1. arrays for send and recv pointers
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index 532d586..3fcc92b 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -248,7 +248,6 @@ struct rpcrdma_req {
> */
> struct rpcrdma_buffer {
> spinlock_t rb_lock; /* protects indexes */
> - atomic_t rb_credits; /* most recent server credits */
> int rb_max_requests;/* client max requests */
> struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */
> struct list_head rb_all;
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>


2015-01-08 16:10:40

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v1 08/20] xprtrdma: Move credit update to RPC reply handler


On Jan 8, 2015, at 10:53 AM, Anna Schumaker <[email protected]> wrote:

> Hey Chuck,
>
> On 01/07/2015 06:12 PM, Chuck Lever wrote:
>> Reduce work in the receive CQ handler, which is run at hardware
>> interrupt level, by moving the RPC/RDMA credit update logic to the
>> RPC reply handler.
>>
>> This has some additional benefits: More header sanity checking is
>> done before trusting the incoming credit value, and the receive CQ
>> handler no longer touches the RPC/RDMA header. Finally, no longer
>> any need to update and read rb_credits atomically, so the rb_credits
>> field can be removed.
>>
>> This further extends work begun by commit e7ce710a8802 ("xprtrdma:
>> Avoid deadlock when credit window is reset").
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> net/sunrpc/xprtrdma/rpc_rdma.c | 10 ++++++++--
>> net/sunrpc/xprtrdma/verbs.c | 15 ++-------------
>> net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
>> 3 files changed, 10 insertions(+), 16 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
>> index dcf5ebc..d731010 100644
>> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
>> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
>> @@ -736,7 +736,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>> struct rpc_xprt *xprt = rep->rr_xprt;
>> struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
>> __be32 *iptr;
>> - int rdmalen, status;
>> + int credits, rdmalen, status;
>> unsigned long cwnd;
>>
>> /* Check status. If bad, signal disconnect and return rep to pool */
>> @@ -871,8 +871,14 @@ badheader:
>> break;
>> }
>>
>> + credits = be32_to_cpu(headerp->rm_credit);
>> + if (credits == 0)
>> + credits = 1; /* don't deadlock */
>> + else if (credits > r_xprt->rx_buf.rb_max_requests)
>> + credits = r_xprt->rx_buf.rb_max_requests;
>
> Can rb_max_requests ever drop to 0?

rb_max_requests is set at mount time to xprt_rdma_slot_table_entries.
rb_max_requests is not changed again.

xprt_rdma_slot_table_entries is set to a compile-time constant,
but can be changed via a /proc file setting. The lower and upper
bounds are min_slot_table_size and max_slot_table_size, both set
to compile-time constants.

IMO there?s no practical danger rb_max_requests will ever be zero
unless someone makes a coding mistake.

Did you want me to reverse the order of the if conditions as a
defensive coding measure?


> Anna
>
>> +
>> cwnd = xprt->cwnd;
>> - xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
>> + xprt->cwnd = credits << RPC_CWNDSHIFT;
>> if (xprt->cwnd > cwnd)
>> xprt_release_rqst_cong(rqst->rq_task);
>>
>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>> index 1000f63..71a071a 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -49,6 +49,7 @@
>>
>> #include <linux/interrupt.h>
>> #include <linux/slab.h>
>> +#include <linux/prefetch.h>
>> #include <asm/bitops.h>
>>
>> #include "xprt_rdma.h"
>> @@ -298,17 +299,7 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
>> rep->rr_len = wc->byte_len;
>> ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
>> rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
>> -
>> - if (rep->rr_len >= 16) {
>> - struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
>> - unsigned int credits = ntohl(p->rm_credit);
>> -
>> - if (credits == 0)
>> - credits = 1; /* don't deadlock */
>> - else if (credits > rep->rr_buffer->rb_max_requests)
>> - credits = rep->rr_buffer->rb_max_requests;
>> - atomic_set(&rep->rr_buffer->rb_credits, credits);
>> - }
>> + prefetch(rep->rr_base);
>>
>> out_schedule:
>> list_add_tail(&rep->rr_list, sched_list);
>> @@ -480,7 +471,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
>> case RDMA_CM_EVENT_DEVICE_REMOVAL:
>> connstate = -ENODEV;
>> connected:
>> - atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
>> dprintk("RPC: %s: %sconnected\n",
>> __func__, connstate > 0 ? "" : "dis");
>> ep->rep_connected = connstate;
>> @@ -1186,7 +1176,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
>>
>> buf->rb_max_requests = cdata->max_requests;
>> spin_lock_init(&buf->rb_lock);
>> - atomic_set(&buf->rb_credits, 1);
>>
>> /* Need to allocate:
>> * 1. arrays for send and recv pointers
>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>> index 532d586..3fcc92b 100644
>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>> @@ -248,7 +248,6 @@ struct rpcrdma_req {
>> */
>> struct rpcrdma_buffer {
>> spinlock_t rb_lock; /* protects indexes */
>> - atomic_t rb_credits; /* most recent server credits */
>> int rb_max_requests;/* client max requests */
>> struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */
>> struct list_head rb_all;
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
>

--
Chuck Lever
chuck[dot]lever[at]oracle[dot]com




2015-01-08 17:49:09

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v1 08/20] xprtrdma: Move credit update to RPC reply handler

On 01/08/2015 11:10 AM, Chuck Lever wrote:
>
> On Jan 8, 2015, at 10:53 AM, Anna Schumaker <[email protected]> wrote:
>
>> Hey Chuck,
>>
>> On 01/07/2015 06:12 PM, Chuck Lever wrote:
>>> Reduce work in the receive CQ handler, which is run at hardware
>>> interrupt level, by moving the RPC/RDMA credit update logic to the
>>> RPC reply handler.
>>>
>>> This has some additional benefits: More header sanity checking is
>>> done before trusting the incoming credit value, and the receive CQ
>>> handler no longer touches the RPC/RDMA header. Finally, no longer
>>> any need to update and read rb_credits atomically, so the rb_credits
>>> field can be removed.
>>>
>>> This further extends work begun by commit e7ce710a8802 ("xprtrdma:
>>> Avoid deadlock when credit window is reset").
>>>
>>> Signed-off-by: Chuck Lever <[email protected]>
>>> ---
>>> net/sunrpc/xprtrdma/rpc_rdma.c | 10 ++++++++--
>>> net/sunrpc/xprtrdma/verbs.c | 15 ++-------------
>>> net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
>>> 3 files changed, 10 insertions(+), 16 deletions(-)
>>>
>>> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
>>> index dcf5ebc..d731010 100644
>>> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
>>> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
>>> @@ -736,7 +736,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>>> struct rpc_xprt *xprt = rep->rr_xprt;
>>> struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
>>> __be32 *iptr;
>>> - int rdmalen, status;
>>> + int credits, rdmalen, status;
>>> unsigned long cwnd;
>>>
>>> /* Check status. If bad, signal disconnect and return rep to pool */
>>> @@ -871,8 +871,14 @@ badheader:
>>> break;
>>> }
>>>
>>> + credits = be32_to_cpu(headerp->rm_credit);
>>> + if (credits == 0)
>>> + credits = 1; /* don't deadlock */
>>> + else if (credits > r_xprt->rx_buf.rb_max_requests)
>>> + credits = r_xprt->rx_buf.rb_max_requests;
>>
>> Can rb_max_requests ever drop to 0?
>
> rb_max_requests is set at mount time to xprt_rdma_slot_table_entries.
> rb_max_requests is not changed again.
>
> xprt_rdma_slot_table_entries is set to a compile-time constant,
> but can be changed via a /proc file setting. The lower and upper
> bounds are min_slot_table_size and max_slot_table_size, both set
> to compile-time constants.
>
> IMO there�s no practical danger rb_max_requests will ever be zero
> unless someone makes a coding mistake.
>
> Did you want me to reverse the order of the if conditions as a
> defensive coding measure?

If there is no practical danger then it's probably fine the way it is. Thanks for double checking!

Anna
>
>
>> Anna
>>
>>> +
>>> cwnd = xprt->cwnd;
>>> - xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
>>> + xprt->cwnd = credits << RPC_CWNDSHIFT;
>>> if (xprt->cwnd > cwnd)
>>> xprt_release_rqst_cong(rqst->rq_task);
>>>
>>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>>> index 1000f63..71a071a 100644
>>> --- a/net/sunrpc/xprtrdma/verbs.c
>>> +++ b/net/sunrpc/xprtrdma/verbs.c
>>> @@ -49,6 +49,7 @@
>>>
>>> #include <linux/interrupt.h>
>>> #include <linux/slab.h>
>>> +#include <linux/prefetch.h>
>>> #include <asm/bitops.h>
>>>
>>> #include "xprt_rdma.h"
>>> @@ -298,17 +299,7 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
>>> rep->rr_len = wc->byte_len;
>>> ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
>>> rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
>>> -
>>> - if (rep->rr_len >= 16) {
>>> - struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
>>> - unsigned int credits = ntohl(p->rm_credit);
>>> -
>>> - if (credits == 0)
>>> - credits = 1; /* don't deadlock */
>>> - else if (credits > rep->rr_buffer->rb_max_requests)
>>> - credits = rep->rr_buffer->rb_max_requests;
>>> - atomic_set(&rep->rr_buffer->rb_credits, credits);
>>> - }
>>> + prefetch(rep->rr_base);
>>>
>>> out_schedule:
>>> list_add_tail(&rep->rr_list, sched_list);
>>> @@ -480,7 +471,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
>>> case RDMA_CM_EVENT_DEVICE_REMOVAL:
>>> connstate = -ENODEV;
>>> connected:
>>> - atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
>>> dprintk("RPC: %s: %sconnected\n",
>>> __func__, connstate > 0 ? "" : "dis");
>>> ep->rep_connected = connstate;
>>> @@ -1186,7 +1176,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
>>>
>>> buf->rb_max_requests = cdata->max_requests;
>>> spin_lock_init(&buf->rb_lock);
>>> - atomic_set(&buf->rb_credits, 1);
>>>
>>> /* Need to allocate:
>>> * 1. arrays for send and recv pointers
>>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>>> index 532d586..3fcc92b 100644
>>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>>> @@ -248,7 +248,6 @@ struct rpcrdma_req {
>>> */
>>> struct rpcrdma_buffer {
>>> spinlock_t rb_lock; /* protects indexes */
>>> - atomic_t rb_credits; /* most recent server credits */
>>> int rb_max_requests;/* client max requests */
>>> struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */
>>> struct list_head rb_all;
>>>
>>> --
>>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>>> the body of a message to [email protected]
>>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>>
>>
>
> --
> Chuck Lever
> chuck[dot]lever[at]oracle[dot]com
>
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>


2015-01-07 23:13:03

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 09/20] xprtrdma: Remove rpcrdma_ep::rep_func and ::rep_xprt

Clean up: The rep_func field always refers to rpcrdma_conn_func().

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 4 +++-
net/sunrpc/xprtrdma/transport.c | 2 --
net/sunrpc/xprtrdma/verbs.c | 6 +++---
net/sunrpc/xprtrdma/xprt_rdma.h | 2 --
4 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index d731010..f2eda15 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -695,7 +695,9 @@ rpcrdma_connect_worker(struct work_struct *work)
{
struct rpcrdma_ep *ep =
container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
- struct rpc_xprt *xprt = ep->rep_xprt;
+ struct rpcrdma_xprt *r_xprt =
+ container_of(ep, struct rpcrdma_xprt, rx_ep);
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;

spin_lock_bh(&xprt->transport_lock);
if (++xprt->connect_cookie == 0) /* maintain a reserved value */
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index ee57513..a487bde 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -376,8 +376,6 @@ xprt_setup_rdma(struct xprt_create *args)
*/
INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
xprt_rdma_connect_worker);
- new_ep->rep_func = rpcrdma_conn_func;
- new_ep->rep_xprt = xprt;

xprt_rdma_format_addresses(xprt);
xprt->max_payload = rpcrdma_max_payload(new_xprt);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 71a071a..c61bb61 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -154,7 +154,7 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
event->device->name, context);
if (ep->rep_connected == 1) {
ep->rep_connected = -EIO;
- ep->rep_func(ep);
+ rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
}
}
@@ -169,7 +169,7 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
event->device->name, context);
if (ep->rep_connected == 1) {
ep->rep_connected = -EIO;
- ep->rep_func(ep);
+ rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
}
}
@@ -474,7 +474,7 @@ connected:
dprintk("RPC: %s: %sconnected\n",
__func__, connstate > 0 ? "" : "dis");
ep->rep_connected = connstate;
- ep->rep_func(ep);
+ rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
/*FALLTHROUGH*/
default:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3fcc92b..657c370 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -87,8 +87,6 @@ struct rpcrdma_ep {
wait_queue_head_t rep_connect_wait;
struct ib_sge rep_pad; /* holds zeroed pad */
struct ib_mr *rep_pad_mr; /* holds zeroed pad */
- void (*rep_func)(struct rpcrdma_ep *);
- struct rpc_xprt *rep_xprt; /* for rep_func */
struct rdma_conn_param rep_remote_cma;
struct sockaddr_storage rep_remote_addr;
struct delayed_work rep_connect_worker;


2015-01-07 23:13:11

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 10/20] xprtrdma: Free the pd if ib_query_qp() fails

If ib_query_qp() fails or the memory registration mode isn't
supported, don't leak the PD. An orphaned IB/core resource will
cause IB module removal to hang.

Fixes: bd7ed1d13304 ("RPC/RDMA: check selected memory registration ...")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c61bb61..aa012a3 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -614,7 +614,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
if (rc) {
dprintk("RPC: %s: ib_query_device failed %d\n",
__func__, rc);
- goto out2;
+ goto out3;
}

if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
@@ -672,14 +672,14 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
"phys register failed with %lX\n",
__func__, PTR_ERR(ia->ri_bind_mem));
rc = -ENOMEM;
- goto out2;
+ goto out3;
}
break;
default:
printk(KERN_ERR "RPC: Unsupported memory "
"registration mode: %d\n", memreg);
rc = -ENOMEM;
- goto out2;
+ goto out3;
}
dprintk("RPC: %s: memory registration strategy is %d\n",
__func__, memreg);
@@ -689,6 +689,10 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)

rwlock_init(&ia->ri_qplock);
return 0;
+
+out3:
+ ib_dealloc_pd(ia->ri_pd);
+ ia->ri_pd = NULL;
out2:
rdma_destroy_id(ia->ri_id);
ia->ri_id = NULL;


2015-01-07 23:13:19

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 11/20] xprtrdma: Take struct ib_device_attr off the stack

Device attributes are large, and are used in more than one place.
Stash a copy in dynamically allocated memory.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 37 +++++++++++++------------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
2 files changed, 14 insertions(+), 24 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index aa012a3..123bb04 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -588,8 +588,8 @@ int
rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
{
int rc, mem_priv;
- struct ib_device_attr devattr;
struct rpcrdma_ia *ia = &xprt->rx_ia;
+ struct ib_device_attr *devattr = &ia->ri_devattr;

ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
if (IS_ERR(ia->ri_id)) {
@@ -605,26 +605,21 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
goto out2;
}

- /*
- * Query the device to determine if the requested memory
- * registration strategy is supported. If it isn't, set the
- * strategy to a globally supported model.
- */
- rc = ib_query_device(ia->ri_id->device, &devattr);
+ rc = ib_query_device(ia->ri_id->device, devattr);
if (rc) {
dprintk("RPC: %s: ib_query_device failed %d\n",
__func__, rc);
goto out3;
}

- if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
+ if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
ia->ri_have_dma_lkey = 1;
ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
}

if (memreg == RPCRDMA_FRMR) {
/* Requires both frmr reg and local dma lkey */
- if ((devattr.device_cap_flags &
+ if ((devattr->device_cap_flags &
(IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
(IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
dprintk("RPC: %s: FRMR registration "
@@ -634,7 +629,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
/* Mind the ia limit on FRMR page list depth */
ia->ri_max_frmr_depth = min_t(unsigned int,
RPCRDMA_MAX_DATA_SEGS,
- devattr.max_fast_reg_page_list_len);
+ devattr->max_fast_reg_page_list_len);
}
}
if (memreg == RPCRDMA_MTHCAFMR) {
@@ -736,20 +731,13 @@ int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct rpcrdma_create_data_internal *cdata)
{
- struct ib_device_attr devattr;
+ struct ib_device_attr *devattr = &ia->ri_devattr;
struct ib_cq *sendcq, *recvcq;
int rc, err;

- rc = ib_query_device(ia->ri_id->device, &devattr);
- if (rc) {
- dprintk("RPC: %s: ib_query_device failed %d\n",
- __func__, rc);
- return rc;
- }
-
/* check provider's send/recv wr limits */
- if (cdata->max_requests > devattr.max_qp_wr)
- cdata->max_requests = devattr.max_qp_wr;
+ if (cdata->max_requests > devattr->max_qp_wr)
+ cdata->max_requests = devattr->max_qp_wr;

ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
ep->rep_attr.qp_context = ep;
@@ -784,8 +772,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,

}
ep->rep_attr.cap.max_send_wr *= depth;
- if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
- cdata->max_requests = devattr.max_qp_wr / depth;
+ if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) {
+ cdata->max_requests = devattr->max_qp_wr / depth;
if (!cdata->max_requests)
return -EINVAL;
ep->rep_attr.cap.max_send_wr = cdata->max_requests *
@@ -868,10 +856,11 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,

/* Client offers RDMA Read but does not initiate */
ep->rep_remote_cma.initiator_depth = 0;
- if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
+ if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */
ep->rep_remote_cma.responder_resources = 32;
else
- ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
+ ep->rep_remote_cma.responder_resources =
+ devattr->max_qp_rd_atom;

ep->rep_remote_cma.retry_count = 7;
ep->rep_remote_cma.flow_control = 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 657c370..ec596ce 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -70,6 +70,7 @@ struct rpcrdma_ia {
int ri_async_rc;
enum rpcrdma_memreg ri_memreg_strategy;
unsigned int ri_max_frmr_depth;
+ struct ib_device_attr ri_devattr;
};

/*


2015-01-07 23:13:27

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 12/20] xprtrdma: Take struct ib_qp_attr and ib_qp_init_attr off the stack

Reduce stack footprint of the connection upcall handler function.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 15 ++++++++-------
net/sunrpc/xprtrdma/xprt_rdma.h | 2 ++
2 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 123bb04..958b372 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -425,8 +425,8 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
#endif
- struct ib_qp_attr attr;
- struct ib_qp_init_attr iattr;
+ struct ib_qp_attr *attr = &ia->ri_qp_attr;
+ struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
int connstate = 0;

switch (event->event) {
@@ -449,12 +449,13 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
break;
case RDMA_CM_EVENT_ESTABLISHED:
connstate = 1;
- ib_query_qp(ia->ri_id->qp, &attr,
- IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
- &iattr);
+ ib_query_qp(ia->ri_id->qp, attr,
+ IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
+ iattr);
dprintk("RPC: %s: %d responder resources"
" (%d initiator)\n",
- __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
+ __func__, attr->max_dest_rd_atomic,
+ attr->max_rd_atomic);
goto connected;
case RDMA_CM_EVENT_CONNECT_ERROR:
connstate = -ENOTCONN;
@@ -487,7 +488,7 @@ connected:

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
if (connstate == 1) {
- int ird = attr.max_dest_rd_atomic;
+ int ird = attr->max_dest_rd_atomic;
int tird = ep->rep_remote_cma.responder_resources;
printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
"on %s, memreg %d slots %d ird %d%s\n",
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index ec596ce..2b4e778 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -71,6 +71,8 @@ struct rpcrdma_ia {
enum rpcrdma_memreg ri_memreg_strategy;
unsigned int ri_max_frmr_depth;
struct ib_device_attr ri_devattr;
+ struct ib_qp_attr ri_qp_attr;
+ struct ib_qp_init_attr ri_qp_init_attr;
};

/*


2015-01-07 23:13:35

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 13/20] xprtrdma: Simplify synopsis of rpcrdma_buffer_create()

Clean up: There is one call site for rpcrdma_buffer_create(). All of
the arguments there are fields of an rpcrdma_xprt.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 3 +--
net/sunrpc/xprtrdma/verbs.c | 7 +++++--
net/sunrpc/xprtrdma/xprt_rdma.h | 4 +---
3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index a487bde..808b3c5 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -364,8 +364,7 @@ xprt_setup_rdma(struct xprt_create *args)
* any inline data. Also specify any padding which will be provided
* from a preregistered zero buffer.
*/
- rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
- &new_xprt->rx_data);
+ rc = rpcrdma_buffer_create(new_xprt);
if (rc)
goto out3;

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 958b372..fd71501 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1161,9 +1161,11 @@ out_free:
}

int
-rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
- struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
+rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
char *p;
size_t len, rlen, wlen;
int i, rc;
@@ -1200,6 +1202,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
* Register the zeroed pad buffer, if any.
*/
if (cdata->padding) {
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
rc = rpcrdma_register_internal(ia, p, cdata->padding,
&ep->rep_pad_mr, &ep->rep_pad);
if (rc)
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2b4e778..5c2fac3 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -354,9 +354,7 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
/*
* Buffer calls - xprtrdma/verbs.c
*/
-int rpcrdma_buffer_create(struct rpcrdma_buffer *, struct rpcrdma_ep *,
- struct rpcrdma_ia *,
- struct rpcrdma_create_data_internal *);
+int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);

struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);


2015-01-07 23:13:43

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 14/20] xprtrdma: Refactor rpcrdma_buffer_create() and rpcrdma_buffer_destroy()

Move the details of how to create and destroy rpcrdma_req and
rpcrdma_rep structures into helper functions.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 148 ++++++++++++++++++++++++++++---------------
1 file changed, 95 insertions(+), 53 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index fd71501..24ea6dd 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1075,6 +1075,69 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
}
}

+static struct rpcrdma_req *
+rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ size_t wlen = 1 << fls(cdata->inline_wsize +
+ sizeof(struct rpcrdma_req));
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_req *req;
+ int rc;
+
+ rc = -ENOMEM;
+ req = kmalloc(wlen, GFP_KERNEL);
+ if (req == NULL)
+ goto out;
+ memset(req, 0, sizeof(struct rpcrdma_req));
+
+ rc = rpcrdma_register_internal(ia, req->rl_base, wlen -
+ offsetof(struct rpcrdma_req, rl_base),
+ &req->rl_handle, &req->rl_iov);
+ if (rc)
+ goto out_free;
+
+ req->rl_size = wlen - sizeof(struct rpcrdma_req);
+ req->rl_buffer = &r_xprt->rx_buf;
+ return req;
+
+out_free:
+ kfree(req);
+out:
+ return ERR_PTR(rc);
+}
+
+static struct rpcrdma_rep *
+rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ size_t rlen = 1 << fls(cdata->inline_rsize +
+ sizeof(struct rpcrdma_rep));
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_rep *rep;
+ int rc;
+
+ rc = -ENOMEM;
+ rep = kmalloc(rlen, GFP_KERNEL);
+ if (rep == NULL)
+ goto out;
+ memset(rep, 0, sizeof(struct rpcrdma_rep));
+
+ rc = rpcrdma_register_internal(ia, rep->rr_base, rlen -
+ offsetof(struct rpcrdma_rep, rr_base),
+ &rep->rr_handle, &rep->rr_iov);
+ if (rc)
+ goto out_free;
+
+ rep->rr_buffer = &r_xprt->rx_buf;
+ return rep;
+
+out_free:
+ kfree(rep);
+out:
+ return ERR_PTR(rc);
+}
+
static int
rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
{
@@ -1167,7 +1230,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
char *p;
- size_t len, rlen, wlen;
+ size_t len;
int i, rc;

buf->rb_max_requests = cdata->max_requests;
@@ -1227,62 +1290,29 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
break;
}

- /*
- * Allocate/init the request/reply buffers. Doing this
- * using kmalloc for now -- one for each buf.
- */
- wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
- rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
- dprintk("RPC: %s: wlen = %zu, rlen = %zu\n",
- __func__, wlen, rlen);
-
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
struct rpcrdma_rep *rep;

- req = kmalloc(wlen, GFP_KERNEL);
- if (req == NULL) {
+ req = rpcrdma_create_req(r_xprt);
+ if (IS_ERR(req)) {
dprintk("RPC: %s: request buffer %d alloc"
" failed\n", __func__, i);
- rc = -ENOMEM;
+ rc = PTR_ERR(req);
goto out;
}
- memset(req, 0, sizeof(struct rpcrdma_req));
buf->rb_send_bufs[i] = req;
- buf->rb_send_bufs[i]->rl_buffer = buf;
-
- rc = rpcrdma_register_internal(ia, req->rl_base,
- wlen - offsetof(struct rpcrdma_req, rl_base),
- &buf->rb_send_bufs[i]->rl_handle,
- &buf->rb_send_bufs[i]->rl_iov);
- if (rc)
- goto out;

- buf->rb_send_bufs[i]->rl_size = wlen -
- sizeof(struct rpcrdma_req);
-
- rep = kmalloc(rlen, GFP_KERNEL);
- if (rep == NULL) {
+ rep = rpcrdma_create_rep(r_xprt);
+ if (IS_ERR(rep)) {
dprintk("RPC: %s: reply buffer %d alloc failed\n",
__func__, i);
- rc = -ENOMEM;
+ rc = PTR_ERR(rep);
goto out;
}
- memset(rep, 0, sizeof(struct rpcrdma_rep));
buf->rb_recv_bufs[i] = rep;
- buf->rb_recv_bufs[i]->rr_buffer = buf;
-
- rc = rpcrdma_register_internal(ia, rep->rr_base,
- rlen - offsetof(struct rpcrdma_rep, rr_base),
- &buf->rb_recv_bufs[i]->rr_handle,
- &buf->rb_recv_bufs[i]->rr_iov);
- if (rc)
- goto out;
-
}
- dprintk("RPC: %s: max_requests %d\n",
- __func__, buf->rb_max_requests);
- /* done */
+
return 0;
out:
rpcrdma_buffer_destroy(buf);
@@ -1290,6 +1320,26 @@ out:
}

static void
+rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
+{
+ if (!rep)
+ return;
+
+ rpcrdma_deregister_internal(ia, rep->rr_handle, &rep->rr_iov);
+ kfree(rep);
+}
+
+static void
+rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+ if (!req)
+ return;
+
+ rpcrdma_deregister_internal(ia, req->rl_handle, &req->rl_iov);
+ kfree(req);
+}
+
+static void
rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
{
struct rpcrdma_mw *r;
@@ -1344,18 +1394,10 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
dprintk("RPC: %s: entering\n", __func__);

for (i = 0; i < buf->rb_max_requests; i++) {
- if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
- rpcrdma_deregister_internal(ia,
- buf->rb_recv_bufs[i]->rr_handle,
- &buf->rb_recv_bufs[i]->rr_iov);
- kfree(buf->rb_recv_bufs[i]);
- }
- if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
- rpcrdma_deregister_internal(ia,
- buf->rb_send_bufs[i]->rl_handle,
- &buf->rb_send_bufs[i]->rl_iov);
- kfree(buf->rb_send_bufs[i]);
- }
+ if (buf->rb_recv_bufs)
+ rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
+ if (buf->rb_send_bufs)
+ rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
}

switch (ia->ri_memreg_strategy) {


2015-01-07 23:13:51

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 15/20] xprtrdma: Add struct rpcrdma_regbuf and helpers

There are several spots that allocate a buffer via kmalloc (usually
contiguously with another data structure) and then register that
buffer internally. I'd like to split the buffers out of these data
structures to allow the data structures to scale.

Start by adding functions that can kmalloc and register a buffer,
and can manage/preserve the buffer's associated ib_sge and ib_mr
fields.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 55 +++++++++++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 43 ++++++++++++++++++++++++++++++
2 files changed, 98 insertions(+)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 24ea6dd..cdd6aac 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1828,6 +1828,61 @@ rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
return rc;
}

+/**
+ * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
+ * @ia: controlling rpcrdma_ia
+ * @size: size of buffer to be allocated, in bytes
+ * @flags: GFP flags
+ *
+ * Returns pointer to private header of an area of internally
+ * registered memory, or an ERR_PTR. The registered buffer follows
+ * the end of the private header.
+ *
+ * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
+ * receiving the payload of RDMA RECV operations. regbufs are not
+ * used for RDMA READ/WRITE operations, thus are registered only for
+ * LOCAL access.
+ */
+struct rpcrdma_regbuf *
+rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
+{
+ struct rpcrdma_regbuf *rb;
+ int rc;
+
+ rc = -ENOMEM;
+ rb = kmalloc(sizeof(*rb) + size, flags);
+ if (rb == NULL)
+ goto out;
+
+ rb->rg_size = size;
+ rb->rg_owner = NULL;
+ rc = rpcrdma_register_internal(ia, rb->rg_base, size,
+ &rb->rg_mr, &rb->rg_iov);
+ if (rc)
+ goto out_free;
+
+ return rb;
+
+out_free:
+ kfree(rb);
+out:
+ return ERR_PTR(rc);
+}
+
+/**
+ * rpcrdma_free_regbuf - deregister and free registered buffer
+ * @ia: controlling rpcrdma_ia
+ * @rb: regbuf to be deregistered and freed
+ */
+void
+rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+ if (rb) {
+ rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
+ kfree(rb);
+ }
+}
+
/*
* Wrappers for chunk registration, shared by read/write chunk code.
*/
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 5c2fac3..36c37c6 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -106,6 +106,44 @@ struct rpcrdma_ep {
#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)

+/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
+ *
+ * The below structure appears at the front of a large region of kmalloc'd
+ * memory, which always starts on a good alignment boundary.
+ */
+
+struct rpcrdma_regbuf {
+ size_t rg_size;
+ struct rpcrdma_req *rg_owner;
+ struct ib_mr *rg_mr;
+ struct ib_sge rg_iov;
+ __be32 rg_base[0] __attribute__ ((aligned(256)));
+};
+
+static inline u64
+rdmab_addr(struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_iov.addr;
+}
+
+static inline u32
+rdmab_length(struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_iov.length;
+}
+
+static inline u32
+rdmab_lkey(struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_iov.lkey;
+}
+
+static inline struct rpcrdma_msg *
+rdmab_to_msg(struct rpcrdma_regbuf *rb)
+{
+ return (struct rpcrdma_msg *)rb->rg_base;
+}
+
enum rpcrdma_chunktype {
rpcrdma_noch = 0,
rpcrdma_readch,
@@ -372,6 +410,11 @@ int rpcrdma_register_external(struct rpcrdma_mr_seg *,
int rpcrdma_deregister_external(struct rpcrdma_mr_seg *,
struct rpcrdma_xprt *);

+struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
+ size_t, gfp_t);
+void rpcrdma_free_regbuf(struct rpcrdma_ia *,
+ struct rpcrdma_regbuf *);
+
/*
* RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
*/


2015-01-07 23:13:59

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 16/20] xprtrdma: Allocate RPC send buffer separately from struct rpcrdma_req

Because internal memory registration is an expensive and synchronous
operation, xprtrdma pre-registers send and receive buffers at mount
time, and then re-uses them for each RPC.

A "hardway" allocation is a memory allocation and registration that
replaces a send buffer during the processing of an RPC. Hardway must
be done if the RPC send buffer is too small to accommodate an RPC's
call or reply header.

For xprtrdma, each RPC send buffer is currently part of struct
rpcrdma_req so that xprt_rdma_free(), which is passed nothing but
the address of an RPC send buffer, can find its matching struct
rpcrdma_req and rpcrdma_rep quickly (via container_of / offsetof).

That means that hardway currently has to replace a whole rpcrmda_req
when it replaces an RPC send buffer. This is often a fairly hefty
chunk of contiguous memory due to the size of the rl_segments array.

Some obscure re-use of fields in rpcrdma_req is done so that
xprt_rdma_free() can detect replaced buffers, and free them. The
original buffer and rpcrdma_req is restored in the process.

This commit breaks apart the RPC send buffer and struct rpcrdma_req
so that increasing the size of the rl_segments array does not change
the alignment of each RPC send buffer. (Increasing rl_segments is
needed to bump up the maximum r/wsize for NFS/RDMA).

This change opens up some interesting possibilities for improving
the design of xprt_rdma_allocate().

xprt_rdma_allocate() is now the one place where RPC send buffers
are allocated or re-allocated, and they are now always left in place
by xprt_rdma_free().

A large re-allocation that includes both the rl_segments array and
the RPC send buffer is no longer needed. Send buffer re-allocation
becomes quite rare. Good send buffer alignment is guaranteed no
matter what the size of the rl_segments array is.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 6 +-
net/sunrpc/xprtrdma/transport.c | 146 ++++++++++++++++-----------------------
net/sunrpc/xprtrdma/verbs.c | 16 ++--
net/sunrpc/xprtrdma/xprt_rdma.h | 14 +++-
4 files changed, 78 insertions(+), 104 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index f2eda15..8a6bdbd 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -541,9 +541,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = req->rl_iov.lkey;

- req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
+ req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
- req->rl_send_iov[1].lkey = req->rl_iov.lkey;
+ req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);

req->rl_niovs = 2;

@@ -556,7 +556,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)

req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
- req->rl_send_iov[3].lkey = req->rl_iov.lkey;
+ req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);

req->rl_niovs = 4;
}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 808b3c5..a9d5662 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -449,77 +449,72 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
/*
* The RDMA allocate/free functions need the task structure as a place
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence. For this reason, the recv buffers are attached to send
- * buffers for portions of the RPC. Note that the RPC layer allocates
- * both send and receive buffers in the same call. We may register
- * the receive buffer portion when using reply chunks.
+ * sequence.
+ *
+ * The RPC layer allocates both send and receive buffers in the same call
+ * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
+ * We may register rq_rcv_buf when using reply chunks.
*/
static void *
xprt_rdma_allocate(struct rpc_task *task, size_t size)
{
struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
- struct rpcrdma_req *req, *nreq;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_regbuf *rb;
+ struct rpcrdma_req *req;
+ size_t min_size;
+ gfp_t flags = task->tk_flags & RPC_TASK_SWAPPER ?
+ GFP_ATOMIC : GFP_NOFS;

- req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+ req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (req == NULL)
return NULL;

- if (size > req->rl_size) {
- dprintk("RPC: %s: size %zd too large for buffer[%zd]: "
- "prog %d vers %d proc %d\n",
- __func__, size, req->rl_size,
- task->tk_client->cl_prog, task->tk_client->cl_vers,
- task->tk_msg.rpc_proc->p_proc);
- /*
- * Outgoing length shortage. Our inline write max must have
- * been configured to perform direct i/o.
- *
- * This is therefore a large metadata operation, and the
- * allocate call was made on the maximum possible message,
- * e.g. containing long filename(s) or symlink data. In
- * fact, while these metadata operations *might* carry
- * large outgoing payloads, they rarely *do*. However, we
- * have to commit to the request here, so reallocate and
- * register it now. The data path will never require this
- * reallocation.
- *
- * If the allocation or registration fails, the RPC framework
- * will (doggedly) retry.
- */
- if (task->tk_flags & RPC_TASK_SWAPPER)
- nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
- else
- nreq = kmalloc(sizeof *req + size, GFP_NOFS);
- if (nreq == NULL)
- goto outfail;
-
- if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
- nreq->rl_base, size + sizeof(struct rpcrdma_req)
- - offsetof(struct rpcrdma_req, rl_base),
- &nreq->rl_handle, &nreq->rl_iov)) {
- kfree(nreq);
- goto outfail;
- }
- rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
- nreq->rl_size = size;
- nreq->rl_niovs = 0;
- nreq->rl_nchunks = 0;
- nreq->rl_buffer = (struct rpcrdma_buffer *)req;
- nreq->rl_reply = req->rl_reply;
- memcpy(nreq->rl_segments,
- req->rl_segments, sizeof nreq->rl_segments);
- /* flag the swap with an unused field */
- nreq->rl_iov.length = 0;
- req->rl_reply = NULL;
- req = nreq;
- }
+ if (req->rl_sendbuf == NULL)
+ goto out_sendbuf;
+ if (size > req->rl_sendbuf->rg_size)
+ goto out_sendbuf;
+
+out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
- return req->rl_xdr_buf;
-
-outfail:
+ return req->rl_sendbuf->rg_base;
+
+out_sendbuf:
+ /* XDR encoding and RPC/RDMA marshaling of this request has not
+ * yet occurred. Thus a lower bound is needed to prevent buffer
+ * overrun during marshaling.
+ *
+ * RPC/RDMA marshaling may choose to send payload bearing ops
+ * inline, if the result is smaller than the inline threshold.
+ * The value of the "size" argument accounts for header
+ * requirements but not for the payload in these cases.
+ *
+ * Likewise, allocate enough space to receive a reply up to the
+ * size of the inline threshold.
+ *
+ * It's unlikely that both the send header and the received
+ * reply will be large, but slush is provided here to allow
+ * flexibility when marshaling.
+ */
+ min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
+ min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ if (size < min_size)
+ size = min_size;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ goto out_fail;
+ rb->rg_owner = req;
+
+ r_xprt->rx_stats.hardway_register_count += size;
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ req->rl_sendbuf = rb;
+ goto out;
+
+out_fail:
rpcrdma_buffer_put(req);
- rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+ r_xprt->rx_stats.failed_marshal_count++;
return NULL;
}

@@ -531,47 +526,24 @@ xprt_rdma_free(void *buffer)
{
struct rpcrdma_req *req;
struct rpcrdma_xprt *r_xprt;
- struct rpcrdma_rep *rep;
+ struct rpcrdma_regbuf *rb;
int i;

if (buffer == NULL)
return;

- req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
- if (req->rl_iov.length == 0) { /* see allocate above */
- r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
- struct rpcrdma_xprt, rx_buf);
- } else
- r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
- rep = req->rl_reply;
+ rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
+ req = rb->rg_owner;
+ r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);

- dprintk("RPC: %s: called on 0x%p%s\n",
- __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
+ dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);

- /*
- * Finish the deregistration. The process is considered
- * complete when the rr_func vector becomes NULL - this
- * was put in place during rpcrdma_reply_handler() - the wait
- * call below will not block if the dereg is "done". If
- * interrupted, our framework will clean up.
- */
for (i = 0; req->rl_nchunks;) {
--req->rl_nchunks;
i += rpcrdma_deregister_external(
&req->rl_segments[i], r_xprt);
}

- if (req->rl_iov.length == 0) { /* see allocate above */
- struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
- oreq->rl_reply = req->rl_reply;
- (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
- req->rl_handle,
- &req->rl_iov);
- kfree(req);
- req = oreq;
- }
-
- /* Put back request+reply buffers */
rpcrdma_buffer_put(req);
}

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index cdd6aac..4089440 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1079,25 +1079,22 @@ static struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- size_t wlen = 1 << fls(cdata->inline_wsize +
- sizeof(struct rpcrdma_req));
+ size_t wlen = cdata->inline_wsize;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_req *req;
int rc;

rc = -ENOMEM;
- req = kmalloc(wlen, GFP_KERNEL);
+ req = kmalloc(sizeof(*req) + wlen, GFP_KERNEL);
if (req == NULL)
goto out;
- memset(req, 0, sizeof(struct rpcrdma_req));
+ memset(req, 0, sizeof(*req));

- rc = rpcrdma_register_internal(ia, req->rl_base, wlen -
- offsetof(struct rpcrdma_req, rl_base),
+ rc = rpcrdma_register_internal(ia, req->rl_base, wlen,
&req->rl_handle, &req->rl_iov);
if (rc)
goto out_free;

- req->rl_size = wlen - sizeof(struct rpcrdma_req);
req->rl_buffer = &r_xprt->rx_buf;
return req;

@@ -1121,7 +1118,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep = kmalloc(rlen, GFP_KERNEL);
if (rep == NULL)
goto out;
- memset(rep, 0, sizeof(struct rpcrdma_rep));
+ memset(rep, 0, sizeof(*rep));

rc = rpcrdma_register_internal(ia, rep->rr_base, rlen -
offsetof(struct rpcrdma_rep, rr_base),
@@ -1335,6 +1332,7 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
if (!req)
return;

+ rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_deregister_internal(ia, req->rl_handle, &req->rl_iov);
kfree(req);
}
@@ -1729,8 +1727,6 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
struct rpcrdma_buffer *buffers = req->rl_buffer;
unsigned long flags;

- if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
- buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
spin_lock_irqsave(&buffers->rb_lock, flags);
if (buffers->rb_recv_index < buffers->rb_max_requests) {
req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 36c37c6..aa82f8d 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -262,7 +262,6 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
};

struct rpcrdma_req {
- size_t rl_size; /* actual length of buffer */
unsigned int rl_niovs; /* 0, 2 or 4 */
unsigned int rl_nchunks; /* non-zero if chunks */
unsigned int rl_connect_cookie; /* retry detection */
@@ -271,13 +270,20 @@ struct rpcrdma_req {
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
struct ib_sge rl_send_iov[4]; /* for active requests */
+ struct rpcrdma_regbuf *rl_sendbuf;
struct ib_sge rl_iov; /* for posting */
struct ib_mr *rl_handle; /* handle for mem in rl_iov */
char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
- __u32 rl_xdr_buf[0]; /* start of returned rpc rq_buffer */
};
-#define rpcr_to_rdmar(r) \
- container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0])
+
+static inline struct rpcrdma_req *
+rpcr_to_rdmar(struct rpc_rqst *rqst)
+{
+ struct rpcrdma_regbuf *rb = container_of(rqst->rq_buffer,
+ struct rpcrdma_regbuf,
+ rg_base[0]);
+ return rb->rg_owner;
+}

/*
* struct rpcrdma_buffer -- holds list/queue of pre-registered memory for


2015-01-07 23:14:07

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 17/20] xprtrdma: Allocate RDMA/RPC send buffer separately from struct rpcrdma_req

The rl_base field is the buffer where each RPC/RDMA send header is
built.

Pre-posted send buffers are supposed to be the same size on the
client and server. For Solaris and Linux, that size is currently
1024 bytes, the inline threshold.

The size of the rl_base buffer is currently dependent on
RPCRDMA_MAX_DATA_SEGS. When the client constructs a chunk list in
the RPC/RDMA header, each segment in the list takes up a little
room in the buffer.

If we want a large r/wsize maximum, MAX_SEGS will grow
significantly, but notice that the inline threshold size is not
supposed to change (since it should match on the client and server).

Therefore the inline size is the real limit on the size of the
RPC/RDMA send header.

No longer use RPCRDMA_MAX_DATA_SEGS to determine the size or
placement of the RPC/RDMA header send buffer. The buffer size should
always be the same as the inline threshold size.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 11 +++++------
net/sunrpc/xprtrdma/transport.c | 9 +++++++++
net/sunrpc/xprtrdma/verbs.c | 22 +++-------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 6 ++----
4 files changed, 19 insertions(+), 29 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 8a6bdbd..c1d4a09 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -294,7 +294,7 @@ ssize_t
rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
{
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
- struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base;
+ struct rpcrdma_msg *headerp = rdmab_to_msg(req->rl_rdmabuf);

if (req->rl_rtype != rpcrdma_noch)
result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
@@ -406,8 +406,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
base = rqst->rq_svec[0].iov_base;
rpclen = rqst->rq_svec[0].iov_len;

- /* build RDMA header in private area at front */
- headerp = (struct rpcrdma_msg *) req->rl_base;
+ headerp = rdmab_to_msg(req->rl_rdmabuf);
/* don't byte-swap XID, it's already done in request */
headerp->rm_xid = rqst->rq_xid;
headerp->rm_vers = rpcrdma_version;
@@ -528,7 +527,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
" headerp 0x%p base 0x%p lkey 0x%x\n",
__func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
- headerp, base, req->rl_iov.lkey);
+ headerp, base, rdmab_lkey(req->rl_rdmabuf));

/*
* initialize send_iov's - normally only two: rdma chunk header and
@@ -537,9 +536,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* header and any write data. In all non-rdma cases, any following
* data has been copied into the RPC header buffer.
*/
- req->rl_send_iov[0].addr = req->rl_iov.addr;
+ req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = hdrlen;
- req->rl_send_iov[0].lkey = req->rl_iov.lkey;
+ req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);

req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index a9d5662..2c2fabe 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -470,6 +470,8 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
if (req == NULL)
return NULL;

+ if (req->rl_rdmabuf == NULL)
+ goto out_rdmabuf;
if (req->rl_sendbuf == NULL)
goto out_sendbuf;
if (size > req->rl_sendbuf->rg_size)
@@ -480,6 +482,13 @@ out:
req->rl_connect_cookie = 0; /* our reserved value */
return req->rl_sendbuf->rg_base;

+out_rdmabuf:
+ min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+ if (IS_ERR(rb))
+ goto out_fail;
+ req->rl_rdmabuf = rb;
+
out_sendbuf:
/* XDR encoding and RPC/RDMA marshaling of this request has not
* yet occurred. Thus a lower bound is needed to prevent buffer
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 4089440..c81749b 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1078,30 +1078,14 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
static struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- size_t wlen = cdata->inline_wsize;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_req *req;
- int rc;

- rc = -ENOMEM;
- req = kmalloc(sizeof(*req) + wlen, GFP_KERNEL);
+ req = kzalloc(sizeof(*req), GFP_KERNEL);
if (req == NULL)
- goto out;
- memset(req, 0, sizeof(*req));
-
- rc = rpcrdma_register_internal(ia, req->rl_base, wlen,
- &req->rl_handle, &req->rl_iov);
- if (rc)
- goto out_free;
+ return ERR_PTR(-ENOMEM);

req->rl_buffer = &r_xprt->rx_buf;
return req;
-
-out_free:
- kfree(req);
-out:
- return ERR_PTR(rc);
}

static struct rpcrdma_rep *
@@ -1333,7 +1317,7 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
return;

rpcrdma_free_regbuf(ia, req->rl_sendbuf);
- rpcrdma_deregister_internal(ia, req->rl_handle, &req->rl_iov);
+ rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
kfree(req);
}

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index aa82f8d..84ad863 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -268,12 +268,10 @@ struct rpcrdma_req {
enum rpcrdma_chunktype rl_rtype, rl_wtype;
struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
- struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
struct ib_sge rl_send_iov[4]; /* for active requests */
+ struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
- struct ib_sge rl_iov; /* for posting */
- struct ib_mr *rl_handle; /* handle for mem in rl_iov */
- char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
+ struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};

static inline struct rpcrdma_req *


2015-01-07 23:14:15

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 18/20] xprtrdma: Allocate RPC/RDMA receive buffer separately from struct rpcrdma_rep

The rr_base field is the buffer where each RPC/RDMA reply header
lands.

In some cases the RPC reply header also lands in this buffer, just
after the RPC/RDMA header.

The pre-posted receive buffers are supposed to be the same size
on the client and server. For Solaris and Linux, that size is
supposed to be 1024 bytes, the inline threshold.

The size of the rr_base buffer is currently dependent on
RPCRDMA_MAX_DATA_SEGS. When the server constructs a chunk list in
the RPC/RDMA header, each segment in the list takes up a little
room in the buffer.

If we want a large r/wsize maximum, MAX_SEGS will grow
significantly, but notice that the inline threshold size won't
change.

Therefore the inline size is the real limit on the size of the
RPC/RDMA header. The largest RPC reply the client can receive via
RDMA SEND is also no bigger than the inline size.

Thus the size of the pre-posted receive buffer should be exactly the
inline size * 2. The MAX_RPCRDMAHDR term should be replaced, and
rounding up ( 1 << fls(yada) ) is not necessary.

RPC replies received via RDMA WRITE (long replies) are caught in
rq_rcv_buf, which is the second half of the RPC send buffer. Ie,
such replies are not involved in any way with rr_base.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 5 +++--
net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++-------------
net/sunrpc/xprtrdma/xprt_rdma.h | 14 ++++++--------
3 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c1d4a09..02efcaa 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -572,6 +572,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
{
unsigned int i, total_len;
struct rpcrdma_write_chunk *cur_wchunk;
+ char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);

i = be32_to_cpu(**iptrp);
if (i > max)
@@ -599,7 +600,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
return -1;
cur_wchunk = (struct rpcrdma_write_chunk *) w;
}
- if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
+ if ((char *)cur_wchunk > base + rep->rr_len)
return -1;

*iptrp = (__be32 *) cur_wchunk;
@@ -753,7 +754,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
}
- headerp = (struct rpcrdma_msg *) rep->rr_base;
+ headerp = rdmab_to_msg(rep->rr_rdmabuf);
if (headerp->rm_vers != rpcrdma_version) {
dprintk("RPC: %s: invalid version %d\n",
__func__, be32_to_cpu(headerp->rm_vers));
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c81749b..7aac422 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -298,8 +298,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)

rep->rr_len = wc->byte_len;
ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
- rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
- prefetch(rep->rr_base);
+ rdmab_addr(rep->rr_rdmabuf),
+ rep->rr_len, DMA_FROM_DEVICE);
+ prefetch(rdmab_to_msg(rep->rr_rdmabuf));

out_schedule:
list_add_tail(&rep->rr_list, sched_list);
@@ -1092,23 +1093,21 @@ static struct rpcrdma_rep *
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- size_t rlen = 1 << fls(cdata->inline_rsize +
- sizeof(struct rpcrdma_rep));
+ size_t rlen = cdata->inline_rsize << 1;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_rep *rep;
int rc;

rc = -ENOMEM;
- rep = kmalloc(rlen, GFP_KERNEL);
+ rep = kzalloc(sizeof(*rep), GFP_KERNEL);
if (rep == NULL)
goto out;
- memset(rep, 0, sizeof(*rep));

- rc = rpcrdma_register_internal(ia, rep->rr_base, rlen -
- offsetof(struct rpcrdma_rep, rr_base),
- &rep->rr_handle, &rep->rr_iov);
- if (rc)
+ rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, rlen, GFP_KERNEL);
+ if (IS_ERR(rep->rr_rdmabuf)) {
+ rc = PTR_ERR(rep->rr_rdmabuf);
goto out_free;
+ }

rep->rr_buffer = &r_xprt->rx_buf;
return rep;
@@ -1306,7 +1305,7 @@ rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
if (!rep)
return;

- rpcrdma_deregister_internal(ia, rep->rr_handle, &rep->rr_iov);
+ rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
kfree(rep);
}

@@ -2209,11 +2208,13 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,

recv_wr.next = NULL;
recv_wr.wr_id = (u64) (unsigned long) rep;
- recv_wr.sg_list = &rep->rr_iov;
+ recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;

ib_dma_sync_single_for_cpu(ia->ri_id->device,
- rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
+ rdmab_addr(rep->rr_rdmabuf),
+ rdmab_length(rep->rr_rdmabuf),
+ DMA_BIDIRECTIONAL);

rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 84ad863..2b69316 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -180,14 +180,12 @@ enum rpcrdma_chunktype {
struct rpcrdma_buffer;

struct rpcrdma_rep {
- unsigned int rr_len; /* actual received reply length */
- struct rpcrdma_buffer *rr_buffer; /* home base for this structure */
- struct rpc_xprt *rr_xprt; /* needed for request/reply matching */
- void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */
- struct list_head rr_list; /* tasklet list */
- struct ib_sge rr_iov; /* for posting */
- struct ib_mr *rr_handle; /* handle for mem in rr_iov */
- char rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */
+ unsigned int rr_len;
+ struct rpcrdma_buffer *rr_buffer;
+ struct rpc_xprt *rr_xprt;
+ void (*rr_func)(struct rpcrdma_rep *);
+ struct list_head rr_list;
+ struct rpcrdma_regbuf *rr_rdmabuf;
};

/*


2015-01-07 23:14:23

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 19/20] xprtrdma: Allocate zero pad separately from rpcrdma_buffer

Use the new rpcrdma_alloc_regbuf() API to shrink the amount of
contiguous memory needed for a buffer pool by moving the zero
pad buffer into a regbuf.

This is for consistency with the other uses of internally
registered memory.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 4 ++--
net/sunrpc/xprtrdma/verbs.c | 29 ++++++++++-------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 3 +--
3 files changed, 13 insertions(+), 23 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 02efcaa..7e9acd9 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -549,9 +549,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
if (padlen) {
struct rpcrdma_ep *ep = &r_xprt->rx_ep;

- req->rl_send_iov[2].addr = ep->rep_pad.addr;
+ req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
req->rl_send_iov[2].length = padlen;
- req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
+ req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);

req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 7aac422..669f68a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -794,6 +794,14 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_attr.qp_type = IB_QPT_RC;
ep->rep_attr.port_num = ~0;

+ if (cdata->padding) {
+ ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
+ GFP_KERNEL);
+ if (IS_ERR(ep->rep_padbuf))
+ return PTR_ERR(ep->rep_padbuf);
+ } else
+ ep->rep_padbuf = NULL;
+
dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
"iovs: send %d recv %d\n",
__func__,
@@ -876,6 +884,7 @@ out2:
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
__func__, err);
out1:
+ rpcrdma_free_regbuf(ia, ep->rep_padbuf);
return rc;
}

@@ -902,11 +911,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ia->ri_id->qp = NULL;
}

- /* padding - could be done in rpcrdma_buffer_destroy... */
- if (ep->rep_pad_mr) {
- rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
- ep->rep_pad_mr = NULL;
- }
+ rpcrdma_free_regbuf(ia, ep->rep_padbuf);

rpcrdma_clean_cq(ep->rep_attr.recv_cq);
rc = ib_destroy_cq(ep->rep_attr.recv_cq);
@@ -1220,12 +1225,10 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
* 1. arrays for send and recv pointers
* 2. arrays of struct rpcrdma_req to fill in pointers
* 3. array of struct rpcrdma_rep for replies
- * 4. padding, if any
* Send/recv buffers in req/rep need to be registered
*/
len = buf->rb_max_requests *
(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
- len += cdata->padding;

p = kzalloc(len, GFP_KERNEL);
if (p == NULL) {
@@ -1241,18 +1244,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];

- /*
- * Register the zeroed pad buffer, if any.
- */
- if (cdata->padding) {
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- rc = rpcrdma_register_internal(ia, p, cdata->padding,
- &ep->rep_pad_mr, &ep->rep_pad);
- if (rc)
- goto out;
- }
- p += cdata->padding;
-
INIT_LIST_HEAD(&buf->rb_mws);
INIT_LIST_HEAD(&buf->rb_all);
switch (ia->ri_memreg_strategy) {
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2b69316..5630353 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -88,8 +88,7 @@ struct rpcrdma_ep {
int rep_connected;
struct ib_qp_init_attr rep_attr;
wait_queue_head_t rep_connect_wait;
- struct ib_sge rep_pad; /* holds zeroed pad */
- struct ib_mr *rep_pad_mr; /* holds zeroed pad */
+ struct rpcrdma_regbuf *rep_padbuf;
struct rdma_conn_param rep_remote_cma;
struct sockaddr_storage rep_remote_addr;
struct delayed_work rep_connect_worker;


2015-01-07 23:14:31

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 20/20] xprtrdma: Clean up after adding regbuf management

rpcrdma_{de}register_internal() are used only in verbs.c now.

MAX_RPCRDMAHDR is no longer used and can be removed.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 4 ++--
net/sunrpc/xprtrdma/xprt_rdma.h | 9 ---------
2 files changed, 2 insertions(+), 11 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 669f68a..f7c3d10 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1729,7 +1729,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
* Wrappers for internal-use kmalloc memory registration, used by buffer code.
*/

-int
+static int
rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
struct ib_mr **mrp, struct ib_sge *iov)
{
@@ -1780,7 +1780,7 @@ rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
return rc;
}

-int
+static int
rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
struct ib_mr *mr, struct ib_sge *iov)
{
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 5630353..c9d2a02 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -171,10 +171,6 @@ enum rpcrdma_chunktype {
/* temporary static scatter/gather max */
#define RPCRDMA_MAX_DATA_SEGS (64) /* max scatter/gather */
#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
-#define MAX_RPCRDMAHDR (\
- /* max supported RPC/RDMA header */ \
- sizeof(struct rpcrdma_msg) + (2 * sizeof(u32)) + \
- (sizeof(struct rpcrdma_read_chunk) * RPCRDMA_MAX_SEGS) + sizeof(u32))

struct rpcrdma_buffer;

@@ -401,11 +397,6 @@ void rpcrdma_buffer_put(struct rpcrdma_req *);
void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);

-int rpcrdma_register_internal(struct rpcrdma_ia *, void *, int,
- struct ib_mr **, struct ib_sge *);
-int rpcrdma_deregister_internal(struct rpcrdma_ia *,
- struct ib_mr *, struct ib_sge *);
-
int rpcrdma_register_external(struct rpcrdma_mr_seg *,
int, int, struct rpcrdma_xprt *);
int rpcrdma_deregister_external(struct rpcrdma_mr_seg *,