2016-08-23 17:52:05

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 00/22] client-side NFS/RDMA patches proposed for v4.9

The following patch series makes these changes:

- Correct use of DMA API
- Delay DMA mapping to permit device driver unload
- Introduce simple RDMA-CM private message exchange
- Support Remote Invalidation
- Support s/g list when sending RPC calls


Available in the "nfs-rdma-for-4.9" topic branch of this git repo:

git://git.linux-nfs.org/projects/cel/cel-2.6.git


Or for browsing:

http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfs-rdma-for-4.9


Changes since v1:
- Rebased on v4.8-rc3
- Addressed Trond's comments in 02/22 and 03/22
- Addressed kbuild robot warnings
- Gather Send patch (18/22) rewritten

---

Chuck Lever (22):
xprtrdma: Eliminate INLINE_THRESHOLD macros
SUNRPC: Refactor rpc_xdr_buf_init()
SUNRPC: Generalize the RPC buffer allocation API
SUNRPC: Generalize the RPC buffer release API
SUNRPC: Separate buffer pointers for RPC Call and Reply messages
SUNRPC: Add a transport-specific private field in rpc_rqst
xprtrdma: Initialize separate RPC call and reply buffers
xprtrdma: Use smaller buffers for RPC-over-RDMA headers
xprtrdma: Replace DMA_BIDIRECTIONAL
xprtrdma: Delay DMA mapping Send and Receive buffers
xprtrdma: Eliminate "ia" argument in rpcrdma_{alloc,free}_regbuf
xprtrdma: Simplify rpcrdma_ep_post_recv()
xprtrdma: Move send_wr to struct rpcrdma_req
xprtrdma: Move recv_wr to struct rpcrdma_rep
rpcrdma: RDMA/CM private message data structure
xprtrdma: Client-side support for rpcrdma_connect_private
xprtrdma: Basic support for Remote Invalidation
xprtrdma: Use gathered Send for large inline messages
xprtrdma: Support larger inline thresholds
xprtrmda: Report address of frmr, not mw
xprtrdma: Rename rpcrdma_receive_wc()
xprtrdma: Eliminate rpcrdma_receive_worker()


include/linux/sunrpc/rpc_rdma.h | 39 +++
include/linux/sunrpc/sched.h | 4
include/linux/sunrpc/xdr.h | 12 +
include/linux/sunrpc/xprt.h | 12 +
include/linux/sunrpc/xprtrdma.h | 4
net/sunrpc/backchannel_rqst.c | 8 -
net/sunrpc/clnt.c | 36 +--
net/sunrpc/sched.c | 35 ++-
net/sunrpc/xprt.c | 2
net/sunrpc/xprtrdma/backchannel.c | 53 +----
net/sunrpc/xprtrdma/fmr_ops.c | 7 -
net/sunrpc/xprtrdma/frwr_ops.c | 27 ++
net/sunrpc/xprtrdma/rpc_rdma.c | 323 +++++++++++++++++-----------
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 19 +-
net/sunrpc/xprtrdma/transport.c | 202 ++++++++++--------
net/sunrpc/xprtrdma/verbs.c | 237 ++++++++++++---------
net/sunrpc/xprtrdma/xprt_rdma.h | 108 ++++++---
net/sunrpc/xprtsock.c | 23 +-
18 files changed, 677 insertions(+), 474 deletions(-)

--
Chuck Lever


2016-08-23 17:52:13

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 01/22] xprtrdma: Eliminate INLINE_THRESHOLD macros

Clean up: r_xprt is already available everywhere these macros are
invoked, so just dereference that directly.

RPCRDMA_INLINE_PAD_VALUE is no longer used, so it can simply be
removed.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 4 ++--
net/sunrpc/xprtrdma/rpc_rdma.c | 2 +-
net/sunrpc/xprtrdma/transport.c | 6 +++---
net/sunrpc/xprtrdma/xprt_rdma.h | 9 ---------
4 files changed, 6 insertions(+), 15 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 87762d9..5f60ab2 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -46,13 +46,13 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
return PTR_ERR(req);
req->rl_backchannel = true;

- size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+ size = r_xprt->rx_data.inline_wsize;
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;

- size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+ size += r_xprt->rx_data.inline_rsize;
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index a47f170..845586f 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -673,7 +673,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
goto out_unmap;
hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;

- if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+ if (hdrlen + rpclen > r_xprt->rx_data.inline_wsize)
goto out_overflow;

dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 81f0e87..be95ece 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -518,7 +518,7 @@ out:
return req->rl_sendbuf->rg_base;

out_rdmabuf:
- min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ min_size = r_xprt->rx_data.inline_wsize;
rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
if (IS_ERR(rb))
goto out_fail;
@@ -541,8 +541,8 @@ out_sendbuf:
* reply will be large, but slush is provided here to allow
* flexibility when marshaling.
*/
- min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
- min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ min_size = r_xprt->rx_data.inline_rsize;
+ min_size += r_xprt->rx_data.inline_wsize;
if (size < min_size)
size = min_size;

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 670fad5..6d466ab 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -355,15 +355,6 @@ struct rpcrdma_create_data_internal {
unsigned int padding; /* non-rdma write header padding */
};

-#define RPCRDMA_INLINE_READ_THRESHOLD(rq) \
- (rpcx_to_rdmad(rq->rq_xprt).inline_rsize)
-
-#define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\
- (rpcx_to_rdmad(rq->rq_xprt).inline_wsize)
-
-#define RPCRDMA_INLINE_PAD_VALUE(rq)\
- rpcx_to_rdmad(rq->rq_xprt).padding
-
/*
* Statistics for RPCRDMA
*/


2016-08-23 17:52:21

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 02/22] SUNRPC: Refactor rpc_xdr_buf_init()

Clean up: there is some XDR initialization logic that is commoon
to the forward channel and backchannel. Move it to an XDR header
so it can be shared.

rpc_rqst::rq_buffer points to a buffer containing big-endian data.
Update its annotation as part of the clean up.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xdr.h | 12 ++++++++++++
include/linux/sunrpc/xprt.h | 2 +-
net/sunrpc/backchannel_rqst.c | 8 +-------
net/sunrpc/clnt.c | 24 ++++++------------------
net/sunrpc/xprtrdma/backchannel.c | 12 +-----------
5 files changed, 21 insertions(+), 37 deletions(-)

diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
index 70c6b92..56c48c8 100644
--- a/include/linux/sunrpc/xdr.h
+++ b/include/linux/sunrpc/xdr.h
@@ -67,6 +67,18 @@ struct xdr_buf {
len; /* Length of XDR encoded message */
};

+static inline void
+xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
+{
+ buf->head[0].iov_base = start;
+ buf->head[0].iov_len = len;
+ buf->tail[0].iov_len = 0;
+ buf->page_len = 0;
+ buf->flags = 0;
+ buf->len = 0;
+ buf->buflen = len;
+}
+
/*
* pre-xdr'ed macros.
*/
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index a16070d..6f1d41b 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -83,7 +83,7 @@ struct rpc_rqst {
void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
struct list_head rq_list;

- __u32 * rq_buffer; /* XDR encode buffer */
+ void *rq_buffer; /* Call XDR encode buffer */
size_t rq_callsize,
rq_rcvsize;
size_t rq_xmit_bytes_sent; /* total bytes sent */
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 229956b..ac701c2 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -76,13 +76,7 @@ static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
page = alloc_page(gfp_flags);
if (page == NULL)
return -ENOMEM;
- buf->head[0].iov_base = page_address(page);
- buf->head[0].iov_len = PAGE_SIZE;
- buf->tail[0].iov_base = NULL;
- buf->tail[0].iov_len = 0;
- buf->page_len = 0;
- buf->len = 0;
- buf->buflen = PAGE_SIZE;
+ xdr_buf_init(buf, page_address(page), PAGE_SIZE);
return 0;
}

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 7f79fb7..236f9ff 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1748,18 +1748,6 @@ rpc_task_force_reencode(struct rpc_task *task)
task->tk_rqstp->rq_bytes_sent = 0;
}

-static inline void
-rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
-{
- buf->head[0].iov_base = start;
- buf->head[0].iov_len = len;
- buf->tail[0].iov_len = 0;
- buf->page_len = 0;
- buf->flags = 0;
- buf->len = 0;
- buf->buflen = len;
-}
-
/*
* 3. Encode arguments of an RPC call
*/
@@ -1772,12 +1760,12 @@ rpc_xdr_encode(struct rpc_task *task)

dprint_status(task);

- rpc_xdr_buf_init(&req->rq_snd_buf,
- req->rq_buffer,
- req->rq_callsize);
- rpc_xdr_buf_init(&req->rq_rcv_buf,
- (char *)req->rq_buffer + req->rq_callsize,
- req->rq_rcvsize);
+ xdr_buf_init(&req->rq_snd_buf,
+ req->rq_buffer,
+ req->rq_callsize);
+ xdr_buf_init(&req->rq_rcv_buf,
+ (char *)req->rq_buffer + req->rq_callsize,
+ req->rq_rcvsize);

p = rpc_encode_header(task);
if (p == NULL) {
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 5f60ab2..d3cfaf2 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -38,7 +38,6 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
- struct xdr_buf *buf;
size_t size;

req = rpcrdma_create_req(r_xprt);
@@ -60,16 +59,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
req->rl_sendbuf = rb;
/* so that rpcr_to_rdmar works when receiving a request */
rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
-
- buf = &rqst->rq_snd_buf;
- buf->head[0].iov_base = rqst->rq_buffer;
- buf->head[0].iov_len = 0;
- buf->tail[0].iov_base = NULL;
- buf->tail[0].iov_len = 0;
- buf->page_len = 0;
- buf->len = 0;
- buf->buflen = size;
-
+ xdr_buf_init(&rqst->rq_snd_buf, rqst->rq_buffer, size);
return 0;

out_fail:


2016-08-23 17:52:30

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 03/22] SUNRPC: Generalize the RPC buffer allocation API

xprtrdma needs to allocate the Call and Reply buffers separately.
TBH, the reliance on using a single buffer for the pair of XDR
buffers is transport implementation-specific.

Transports that want to allocate separate Call and Reply buffers
will ignore the "size" argument anyway. Don't bother passing it.

The buf_alloc method can't return two pointers. Instead, make the
method's return value an error code, and set the rq_buffer pointer
in the method itself.

This gives call_allocate an opportunity to terminate an RPC instead
of looping forever when a permanent problem occurs. If a request is
just bogus, or the transport is in a state where it can't allocate
resources for any request, there needs to be a way to kill the RPC
right there and not loop.

This immediately fixes a rare problem in the backchannel send path,
which loops if the server happens to send a CB request whose
call+reply size is larger than a page (which it shouldn't do yet).

One more issue: looks like xprt_inject_disconnect was incorrectly
placed in the failure path in call_allocate. It needs to be in the
success path, as it is for other call-sites.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/sched.h | 2 +-
include/linux/sunrpc/xprt.h | 2 +-
net/sunrpc/clnt.c | 12 ++++++++----
net/sunrpc/sched.c | 24 +++++++++++++++---------
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 17 +++++++++--------
net/sunrpc/xprtrdma/transport.c | 26 ++++++++++++++++++--------
net/sunrpc/xprtsock.c | 17 +++++++++++------
7 files changed, 63 insertions(+), 37 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 817af0b..38d4c1b 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -239,7 +239,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
void *);
void rpc_wake_up_status(struct rpc_wait_queue *, int);
void rpc_delay(struct rpc_task *, unsigned long);
-void * rpc_malloc(struct rpc_task *, size_t);
+int rpc_malloc(struct rpc_task *);
void rpc_free(void *);
int rpciod_up(void);
void rpciod_down(void);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 6f1d41b..c01f468 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -127,7 +127,7 @@ struct rpc_xprt_ops {
void (*rpcbind)(struct rpc_task *task);
void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
- void * (*buf_alloc)(struct rpc_task *task, size_t size);
+ int (*buf_alloc)(struct rpc_task *task);
void (*buf_free)(void *buffer);
int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 236f9ff..ab467c0 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1693,6 +1693,7 @@ call_allocate(struct rpc_task *task)
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
+ int status;

dprint_status(task);

@@ -1718,11 +1719,14 @@ call_allocate(struct rpc_task *task)
req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
req->rq_rcvsize <<= 2;

- req->rq_buffer = xprt->ops->buf_alloc(task,
- req->rq_callsize + req->rq_rcvsize);
- if (req->rq_buffer != NULL)
- return;
+ status = xprt->ops->buf_alloc(task);
xprt_inject_disconnect(xprt);
+ if (status == 0)
+ return;
+ if (status != -ENOMEM) {
+ rpc_exit(task, status);
+ return;
+ }

dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);

diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 9ae5885..b964d40 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -849,14 +849,17 @@ static void rpc_async_schedule(struct work_struct *work)
}

/**
- * rpc_malloc - allocate an RPC buffer
- * @task: RPC task that will use this buffer
- * @size: requested byte size
+ * rpc_malloc - allocate RPC buffer resources
+ * @task: RPC task
+ *
+ * A single memory region is allocated, which is split between the
+ * RPC call and RPC reply that this task is being used for. When
+ * this RPC is retired, the memory is released by calling rpc_free.
*
* To prevent rpciod from hanging, this allocator never sleeps,
- * returning NULL and suppressing warning if the request cannot be serviced
- * immediately.
- * The caller can arrange to sleep in a way that is safe for rpciod.
+ * returning -ENOMEM and suppressing warning if the request cannot
+ * be serviced immediately. The caller can arrange to sleep in a
+ * way that is safe for rpciod.
*
* Most requests are 'small' (under 2KiB) and can be serviced from a
* mempool, ensuring that NFS reads and writes can always proceed,
@@ -865,8 +868,10 @@ static void rpc_async_schedule(struct work_struct *work)
* In order to avoid memory starvation triggering more writebacks of
* NFS requests, we avoid using GFP_KERNEL.
*/
-void *rpc_malloc(struct rpc_task *task, size_t size)
+int rpc_malloc(struct rpc_task *task)
{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
struct rpc_buffer *buf;
gfp_t gfp = GFP_NOIO | __GFP_NOWARN;

@@ -880,12 +885,13 @@ void *rpc_malloc(struct rpc_task *task, size_t size)
buf = kmalloc(size, gfp);

if (!buf)
- return NULL;
+ return -ENOMEM;

buf->len = size;
dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
task->tk_pid, size, buf);
- return &buf->data;
+ rqst->rq_buffer = buf->data;
+ return 0;
}
EXPORT_SYMBOL_GPL(rpc_malloc);

diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index a2a7519..124688b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -159,29 +159,30 @@ out_unmap:
/* Server-side transport endpoint wants a whole page for its send
* buffer. The client RPC code constructs the RPC header in this
* buffer before it invokes ->send_request.
- *
- * Returns NULL if there was a temporary allocation failure.
*/
-static void *
-xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
+static int
+xprt_rdma_bc_allocate(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
+ size_t size = rqst->rq_callsize;
struct svcxprt_rdma *rdma;
struct page *page;

rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);

- /* Prevent an infinite loop: try to make this case work */
- if (size > PAGE_SIZE)
+ if (size > PAGE_SIZE) {
WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
size);
+ return -EINVAL;
+ }

page = alloc_page(RPCRDMA_DEF_GFP);
if (!page)
- return NULL;
+ return -ENOMEM;

- return page_address(page);
+ rqst->rq_buffer = page_address(page);
+ return 0;
}

static void
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index be95ece..daa7d4d 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -477,7 +477,15 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
}
}

-/*
+/**
+ * xprt_rdma_allocate - allocate transport resources for an RPC
+ * @task: RPC task
+ *
+ * Return values:
+ * 0: Success; rq_buffer points to RPC buffer to use
+ * ENOMEM: Out of memory, call again later
+ * EIO: A permanent error occurred, do not retry
+ *
* The RDMA allocate/free functions need the task structure as a place
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv
* sequence.
@@ -486,11 +494,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
* (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
* We may register rq_rcv_buf when using reply chunks.
*/
-static void *
-xprt_rdma_allocate(struct rpc_task *task, size_t size)
+static int
+xprt_rdma_allocate(struct rpc_task *task)
{
- struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
size_t min_size;
@@ -498,7 +507,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)

req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (req == NULL)
- return NULL;
+ return -ENOMEM;

flags = RPCRDMA_DEF_GFP;
if (RPC_IS_SWAPPER(task))
@@ -515,7 +524,8 @@ out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
req->rl_task = task;
- return req->rl_sendbuf->rg_base;
+ rqst->rq_buffer = req->rl_sendbuf->rg_base;
+ return 0;

out_rdmabuf:
min_size = r_xprt->rx_data.inline_wsize;
@@ -558,7 +568,7 @@ out_sendbuf:

out_fail:
rpcrdma_buffer_put(req);
- return NULL;
+ return -ENOMEM;
}

/*
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 8ede3bc..54a7b4c 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2533,23 +2533,28 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
* we allocate pages instead doing a kmalloc like rpc_malloc is because we want
* to use the server side send routines.
*/
-static void *bc_malloc(struct rpc_task *task, size_t size)
+static int bc_malloc(struct rpc_task *task)
{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ size_t size = rqst->rq_callsize;
struct page *page;
struct rpc_buffer *buf;

- WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer));
- if (size > PAGE_SIZE - sizeof(struct rpc_buffer))
- return NULL;
+ if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) {
+ WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n",
+ size);
+ return -EINVAL;
+ }

page = alloc_page(GFP_KERNEL);
if (!page)
- return NULL;
+ return -ENOMEM;

buf = page_address(page);
buf->len = PAGE_SIZE;

- return buf->data;
+ rqst->rq_buffer = buf->data;
+ return 0;
}

/*


2016-08-23 17:52:37

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 04/22] SUNRPC: Generalize the RPC buffer release API

xprtrdma needs to allocate the Call and Reply buffers separately.
TBH, the reliance on using a single buffer for the pair of XDR
buffers is transport implementation-specific.

Instead of passing just the rq_buffer into the buf_free method, pass
the task structure and let buf_free take care of freeing both
XDR buffers at once.

There's a micro-optimization here. In the common case, both
xprt_release and the transport's buf_free method were checking if
rq_buffer was NULL. Now the check is done only once per RPC.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/sched.h | 2 +-
include/linux/sunrpc/xprt.h | 2 +-
net/sunrpc/sched.c | 10 ++++------
net/sunrpc/xprt.c | 2 +-
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 2 +-
net/sunrpc/xprtrdma/transport.c | 26 ++++++++++----------------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
net/sunrpc/xprtsock.c | 6 ++----
8 files changed, 20 insertions(+), 31 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 38d4c1b..7ba040c 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -240,7 +240,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
void rpc_wake_up_status(struct rpc_wait_queue *, int);
void rpc_delay(struct rpc_task *, unsigned long);
int rpc_malloc(struct rpc_task *);
-void rpc_free(void *);
+void rpc_free(struct rpc_task *);
int rpciod_up(void);
void rpciod_down(void);
int __rpc_wait_for_completion_task(struct rpc_task *task, wait_bit_action_f *);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index c01f468..72c2aeb 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -128,7 +128,7 @@ struct rpc_xprt_ops {
void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
int (*buf_alloc)(struct rpc_task *task);
- void (*buf_free)(void *buffer);
+ void (*buf_free)(struct rpc_task *task);
int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task);
void (*timer)(struct rpc_xprt *xprt, struct rpc_task *task);
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index b964d40..6690ebc 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -896,18 +896,16 @@ int rpc_malloc(struct rpc_task *task)
EXPORT_SYMBOL_GPL(rpc_malloc);

/**
- * rpc_free - free buffer allocated via rpc_malloc
- * @buffer: buffer to free
+ * rpc_free - free RPC buffer resources allocated via rpc_malloc
+ * @task: RPC task
*
*/
-void rpc_free(void *buffer)
+void rpc_free(struct rpc_task *task)
{
+ void *buffer = task->tk_rqstp->rq_buffer;
size_t size;
struct rpc_buffer *buf;

- if (!buffer)
- return;
-
buf = container_of(buffer, struct rpc_buffer, data);
size = buf->len;

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index ea244b2..685e6d2 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1295,7 +1295,7 @@ void xprt_release(struct rpc_task *task)
xprt_schedule_autodisconnect(xprt);
spin_unlock_bh(&xprt->transport_lock);
if (req->rq_buffer)
- xprt->ops->buf_free(req->rq_buffer);
+ xprt->ops->buf_free(task);
xprt_inject_disconnect(xprt);
if (req->rq_cred != NULL)
put_rpccred(req->rq_cred);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 124688b..fa89350 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -186,7 +186,7 @@ xprt_rdma_bc_allocate(struct rpc_task *task)
}

static void
-xprt_rdma_bc_free(void *buffer)
+xprt_rdma_bc_free(struct rpc_task *task)
{
/* No-op: ctxt and page have already been freed. */
}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index daa7d4d..ebf14ba 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -523,7 +523,6 @@ xprt_rdma_allocate(struct rpc_task *task)
out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
- req->rl_task = task;
rqst->rq_buffer = req->rl_sendbuf->rg_base;
return 0;

@@ -571,31 +570,26 @@ out_fail:
return -ENOMEM;
}

-/*
- * This function returns all RDMA resources to the pool.
+/**
+ * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
+ * @task: RPC task
+ *
+ * Caller guarantees rqst->rq_buffer is non-NULL.
*/
static void
-xprt_rdma_free(void *buffer)
+xprt_rdma_free(struct rpc_task *task)
{
- struct rpcrdma_req *req;
- struct rpcrdma_xprt *r_xprt;
- struct rpcrdma_regbuf *rb;
-
- if (buffer == NULL)
- return;
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);

- rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
- req = rb->rg_owner;
if (req->rl_backchannel)
return;

- r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
-
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);

r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req,
- !RPC_IS_ASYNC(req->rl_task));
-
+ !RPC_IS_ASYNC(task));
rpcrdma_buffer_put(req);
}

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 6d466ab..296d9ab 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -283,7 +283,6 @@ struct rpcrdma_req {
struct list_head rl_free;
unsigned int rl_niovs;
unsigned int rl_connect_cookie;
- struct rpc_task *rl_task;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 54a7b4c..41fae68 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2560,13 +2560,11 @@ static int bc_malloc(struct rpc_task *task)
/*
* Free the space allocated in the bc_alloc routine
*/
-static void bc_free(void *buffer)
+static void bc_free(struct rpc_task *task)
{
+ void *buffer = task->tk_rqstp->rq_buffer;
struct rpc_buffer *buf;

- if (!buffer)
- return;
-
buf = container_of(buffer, struct rpc_buffer, data);
free_page((unsigned long)buf);
}


2016-08-23 17:52:47

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 05/22] SUNRPC: Separate buffer pointers for RPC Call and Reply messages

For xprtrdma, the RPC Call and Reply buffers are involved in real
I/O operations.

To start with, the DMA direction of the I/O for a Call is opposite
that of a Reply.

In the current arrangement, the Reply buffer address is on a
four-byte alignment just past the call buffer. Would be friendlier
on some platforms if that was at a DMA cache alignment instead.

Because the current arrangement allocates a single memory region
which contains both buffers, the RPC Reply buffer often contains a
page boundary in it when the Call buffer is large enough (which is
frequent).

It would be a little nicer for setting up DMA operations (and
possible registration of the Reply buffer) if the two buffers were
separated, well-aligned, and contained as few page boundaries as
possible.

Now, I could just pad out the single memory region used for the pair
of buffers. But frequently that would mean a lot of unused space to
ensure the Reply buffer did not have a page boundary.

Add a separate pointer to rpc_rqst that points right to the RPC
Reply buffer. This makes no difference to xprtsock, but it will help
xprtrdma in subsequent patches.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprt.h | 5 +++--
net/sunrpc/clnt.c | 2 +-
net/sunrpc/sched.c | 1 +
net/sunrpc/xprtrdma/transport.c | 1 +
4 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 72c2aeb..46f069e 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -84,8 +84,9 @@ struct rpc_rqst {
struct list_head rq_list;

void *rq_buffer; /* Call XDR encode buffer */
- size_t rq_callsize,
- rq_rcvsize;
+ size_t rq_callsize;
+ void *rq_rbuffer; /* Reply XDR decode buffer */
+ size_t rq_rcvsize;
size_t rq_xmit_bytes_sent; /* total bytes sent */
size_t rq_reply_bytes_recvd; /* total reply bytes */
/* received */
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index ab467c0..fd389c0 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1768,7 +1768,7 @@ rpc_xdr_encode(struct rpc_task *task)
req->rq_buffer,
req->rq_callsize);
xdr_buf_init(&req->rq_rcv_buf,
- (char *)req->rq_buffer + req->rq_callsize,
+ req->rq_rbuffer,
req->rq_rcvsize);

p = rpc_encode_header(task);
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 6690ebc..5db68b3 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -891,6 +891,7 @@ int rpc_malloc(struct rpc_task *task)
dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
task->tk_pid, size, buf);
rqst->rq_buffer = buf->data;
+ rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
return 0;
}
EXPORT_SYMBOL_GPL(rpc_malloc);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index ebf14ba..136caf3 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -524,6 +524,7 @@ out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
rqst->rq_buffer = req->rl_sendbuf->rg_base;
+ rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_rcvsize;
return 0;

out_rdmabuf:


2016-08-23 17:54:33

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 18/22] xprtrdma: Use gathered Send for large inline messages

An RPC Call message that is sent inline but that has a data payload
(ie, one or more items in rq_snd_buf's page list) must be "pulled
up:"

- call_allocate has to reserve enough RPC Call buffer space to
accommodate the data payload

- call_transmit has to memcopy the rq_snd_buf's page list and tail
into its head iovec before it is sent

As the inline threshold is increased beyond its current 1KB default,
however, this means data payloads of more than a few KB are copied
by the host CPU. For example, if the inline threshold is increased
just to 4KB, then NFS WRITE requests up to 4KB would involve a
memcpy of the NFS WRITE's payload data into the RPC Call buffer.
This is an undesirable amount of participation by the host CPU.

The inline threshold may be much larger than 4KB in the future,
after negotiation with a peer server.

Instead of copying the components of rq_snd_buf into its head iovec,
construct a gather list of these components, and send them all in
place. The same approach is already used in the Linux server's
RPC-over-RDMA reply path.

This mechanism also eliminates the need for rpcrdma_tail_pullup,
which is used to manage the XDR pad and trailing inline content when
a Read list is present.

This requires that the pages in rq_snd_buf's page list be DMA-mapped
during marshaling, and unmapped when a data-bearing RPC is
completed. This is slightly less efficient for very small I/O
payloads, but significantly more efficient as data payload size and
inline threshold increase past a kilobyte.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 33 ----
net/sunrpc/xprtrdma/rpc_rdma.c | 301 +++++++++++++++++++++----------------
net/sunrpc/xprtrdma/transport.c | 18 +-
net/sunrpc/xprtrdma/verbs.c | 13 --
net/sunrpc/xprtrdma/xprt_rdma.h | 27 +++
5 files changed, 207 insertions(+), 185 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 61a58f5..2c472e1 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -206,7 +206,6 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_msg *headerp;
- size_t rpclen;

headerp = rdmab_to_msg(req->rl_rdmabuf);
headerp->rm_xid = rqst->rq_xid;
@@ -218,36 +217,10 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
headerp->rm_body.rm_chunks[1] = xdr_zero;
headerp->rm_body.rm_chunks[2] = xdr_zero;

- rpclen = rqst->rq_svec[0].iov_len;
-
-#ifdef RPCRDMA_BACKCHANNEL_DEBUG
- pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
- __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
- pr_info("RPC: %s: RPC/RDMA: %*ph\n",
- __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
- pr_info("RPC: %s: RPC: %*ph\n",
- __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
-#endif
-
- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
- goto out_map;
- req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
- req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
- req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
-
- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
- goto out_map;
- req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
- req->rl_send_iov[1].length = rpclen;
- req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
-
- req->rl_send_wr.num_sge = 2;
-
+ if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
+ &rqst->rq_snd_buf, rpcrdma_noch))
+ return -EIO;
return 0;
-
-out_map:
- pr_err("rpcrdma: failed to DMA map a Send buffer\n");
- return -EIO;
}

/**
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 31a434d..20115ad 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -53,14 +53,6 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif

-enum rpcrdma_chunktype {
- rpcrdma_noch = 0,
- rpcrdma_readch,
- rpcrdma_areadch,
- rpcrdma_writech,
- rpcrdma_replych
-};
-
static const char transfertypes[][12] = {
"inline", /* no chunks */
"read list", /* some argument via rdma read */
@@ -157,42 +149,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
}

-static int
-rpcrdma_tail_pullup(struct xdr_buf *buf)
-{
- size_t tlen = buf->tail[0].iov_len;
- size_t skip = tlen & 3;
-
- /* Do not include the tail if it is only an XDR pad */
- if (tlen < 4)
- return 0;
-
- /* xdr_write_pages() adds a pad at the beginning of the tail
- * if the content in "buf->pages" is unaligned. Force the
- * tail's actual content to land at the next XDR position
- * after the head instead.
- */
- if (skip) {
- unsigned char *src, *dst;
- unsigned int count;
-
- src = buf->tail[0].iov_base;
- dst = buf->head[0].iov_base;
- dst += buf->head[0].iov_len;
-
- src += skip;
- tlen -= skip;
-
- dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n",
- __func__, skip, dst, src, tlen);
-
- for (count = tlen; count; count--)
- *dst++ = *src++;
- }
-
- return tlen;
-}
-
/* Split "vec" on page boundaries into segments. FMR registers pages,
* not a byte range. Other modes coalesce these segments into a single
* MR when they can.
@@ -503,74 +459,184 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
return iptr;
}

-/*
- * Copy write data inline.
- * This function is used for "small" requests. Data which is passed
- * to RPC via iovecs (or page list) is copied directly into the
- * pre-registered memory buffer for this request. For small amounts
- * of data, this is efficient. The cutoff value is tunable.
+/* Prepare the RPC-over-RDMA header SGE.
*/
-static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
+static bool
+rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+ u32 len)
{
- int i, npages, curlen;
- int copy_len;
- unsigned char *srcp, *destp;
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
- int page_base;
- struct page **ppages;
+ struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
+ struct ib_sge *sge = &req->rl_send_sge[0];
+
+ if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) {
+ if (!__rpcrdma_dma_map_regbuf(ia, rb))
+ return false;
+ sge->addr = rdmab_addr(rb);
+ sge->lkey = rdmab_lkey(rb);
+ }
+ sge->length = len;

- destp = rqst->rq_svec[0].iov_base;
- curlen = rqst->rq_svec[0].iov_len;
- destp += curlen;
+ ib_dma_sync_single_for_device(ia->ri_device, sge->addr,
+ sge->length, DMA_TO_DEVICE);
+ req->rl_send_wr.num_sge++;
+ return true;
+}

- dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n",
- __func__, destp, rqst->rq_slen, curlen);
+/* Prepare the Send SGEs. The head and tail iovec, and each entry
+ * in the page list, gets its own SGE.
+ */
+static bool
+rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+ struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+{
+ unsigned int sge_no, page_base, len, remaining;
+ struct rpcrdma_regbuf *rb = req->rl_sendbuf;
+ struct ib_device *device = ia->ri_device;
+ struct ib_sge *sge = req->rl_send_sge;
+ u32 lkey = ia->ri_pd->local_dma_lkey;
+ struct page *page, **ppages;
+
+ /* The head iovec is straightforward, as it is already
+ * DMA-mapped. Sync the content that has changed.
+ */
+ if (!rpcrdma_dma_map_regbuf(ia, rb))
+ return false;
+ sge_no = 1;
+ sge[sge_no].addr = rdmab_addr(rb);
+ sge[sge_no].length = xdr->head[0].iov_len;
+ sge[sge_no].lkey = rdmab_lkey(rb);
+ ib_dma_sync_single_for_device(device, sge[sge_no].addr,
+ sge[sge_no].length, DMA_TO_DEVICE);
+
+ /* If there is a Read chunk, the page list is being handled
+ * via explicit RDMA, and thus is skipped here. However, the
+ * tail iovec may include an XDR pad for the page list, as
+ * well as additional content, and may not reside in the
+ * same page as the head iovec.
+ */
+ if (rtype == rpcrdma_readch) {
+ len = xdr->tail[0].iov_len;

- copy_len = rqst->rq_snd_buf.page_len;
+ /* Do not include the tail if it is only an XDR pad */
+ if (len < 4)
+ goto out;

- if (rqst->rq_snd_buf.tail[0].iov_len) {
- curlen = rqst->rq_snd_buf.tail[0].iov_len;
- if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
- memmove(destp + copy_len,
- rqst->rq_snd_buf.tail[0].iov_base, curlen);
- r_xprt->rx_stats.pullup_copy_count += curlen;
+ page = virt_to_page(xdr->tail[0].iov_base);
+ page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+
+ /* If the content in the page list is an odd length,
+ * xdr_write_pages() has added a pad at the beginning
+ * of the tail iovec. Force the tail's non-pad content
+ * to land at the next XDR position in the Send message.
+ */
+ page_base += len & 3;
+ len -= len & 3;
+ goto map_tail;
+ }
+
+ /* If there is a page list present, temporarily DMA map
+ * and prepare an SGE for each page to be sent.
+ */
+ if (xdr->page_len) {
+ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+ page_base = xdr->page_base & ~PAGE_MASK;
+ remaining = xdr->page_len;
+ while (remaining) {
+ sge_no++;
+ if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
+ goto out_mapping_overflow;
+
+ len = min_t(u32, PAGE_SIZE - page_base, remaining);
+ sge[sge_no].addr = ib_dma_map_page(device, *ppages,
+ page_base, len,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(device, sge[sge_no].addr))
+ goto out_mapping_err;
+ sge[sge_no].length = len;
+ sge[sge_no].lkey = lkey;
+
+ req->rl_mapped_sges++;
+ ppages++;
+ remaining -= len;
+ page_base = 0;
}
- dprintk("RPC: %s: tail destp 0x%p len %d\n",
- __func__, destp + copy_len, curlen);
- rqst->rq_svec[0].iov_len += curlen;
}
- r_xprt->rx_stats.pullup_copy_count += copy_len;

- page_base = rqst->rq_snd_buf.page_base;
- ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
- page_base &= ~PAGE_MASK;
- npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
- for (i = 0; copy_len && i < npages; i++) {
- curlen = PAGE_SIZE - page_base;
- if (curlen > copy_len)
- curlen = copy_len;
- dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
- __func__, i, destp, copy_len, curlen);
- srcp = kmap_atomic(ppages[i]);
- memcpy(destp, srcp+page_base, curlen);
- kunmap_atomic(srcp);
- rqst->rq_svec[0].iov_len += curlen;
- destp += curlen;
- copy_len -= curlen;
- page_base = 0;
+ /* The tail iovec is not always constructed in the same
+ * page where the head iovec resides (see, for example,
+ * gss_wrap_req_priv). To neatly accomodate that case,
+ * DMA map it separately.
+ */
+ if (xdr->tail[0].iov_len) {
+ page = virt_to_page(xdr->tail[0].iov_base);
+ page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+ len = xdr->tail[0].iov_len;
+
+map_tail:
+ sge_no++;
+ sge[sge_no].addr = ib_dma_map_page(device, page,
+ page_base, len,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(device, sge[sge_no].addr))
+ goto out_mapping_err;
+ sge[sge_no].length = len;
+ sge[sge_no].lkey = lkey;
+ req->rl_mapped_sges++;
}
- /* header now contains entire send message */
+
+out:
+ req->rl_send_wr.num_sge = sge_no + 1;
+ return true;
+
+out_mapping_overflow:
+ pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
+ return false;
+
+out_mapping_err:
+ pr_err("rpcrdma: Send mapping error\n");
+ return false;
+}
+
+bool
+rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+ u32 hdrlen, struct xdr_buf *xdr,
+ enum rpcrdma_chunktype rtype)
+{
+ req->rl_send_wr.num_sge = 0;
+ req->rl_mapped_sges = 0;
+
+ if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen))
+ goto out_map;
+
+ if (rtype != rpcrdma_areadch)
+ if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype))
+ goto out_map;
+
+ return true;
+
+out_map:
+ pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+ return false;
+}
+
+void
+rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+ struct ib_device *device = ia->ri_device;
+ struct ib_sge *sge;
+ int count;
+
+ sge = &req->rl_send_sge[2];
+ for (count = req->rl_mapped_sges; count--; sge++)
+ ib_dma_unmap_page(device, sge->addr, sge->length,
+ DMA_TO_DEVICE);
+ req->rl_mapped_sges = 0;
}

/*
* Marshal a request: the primary job of this routine is to choose
* the transfer modes. See comments below.
*
- * Prepares up to two IOVs per Call message:
- *
- * [0] -- RPC RDMA header
- * [1] -- the RPC header/data
- *
* Returns zero on success, otherwise a negative errno.
*/

@@ -638,12 +704,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
*/
if (rpcrdma_args_inline(r_xprt, rqst)) {
rtype = rpcrdma_noch;
- rpcrdma_inline_pullup(rqst);
- rpclen = rqst->rq_svec[0].iov_len;
+ rpclen = rqst->rq_snd_buf.len;
} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = rpcrdma_readch;
- rpclen = rqst->rq_svec[0].iov_len;
- rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
+ rpclen = rqst->rq_snd_buf.head[0].iov_len +
+ rqst->rq_snd_buf.tail[0].iov_len;
} else {
r_xprt->rx_stats.nomsg_call_count++;
headerp->rm_type = htonl(RDMA_NOMSG);
@@ -685,47 +750,21 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
goto out_unmap;
hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;

- if (hdrlen + rpclen > r_xprt->rx_data.inline_wsize)
- goto out_overflow;
-
dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
rqst->rq_task->tk_pid, __func__,
transfertypes[rtype], transfertypes[wtype],
hdrlen, rpclen);

- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
- goto out_map;
- req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
- req->rl_send_iov[0].length = hdrlen;
- req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
-
- req->rl_send_wr.num_sge = 1;
- if (rtype == rpcrdma_areadch)
- return 0;
-
- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
- goto out_map;
- req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
- req->rl_send_iov[1].length = rpclen;
- req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
-
- req->rl_send_wr.num_sge = 2;
-
+ if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen,
+ &rqst->rq_snd_buf, rtype)) {
+ iptr = ERR_PTR(-EIO);
+ goto out_unmap;
+ }
return 0;

-out_overflow:
- pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
- hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
- iptr = ERR_PTR(-EIO);
-
out_unmap:
r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
return PTR_ERR(iptr);
-
-out_map:
- pr_err("rpcrdma: failed to DMA map a Send buffer\n");
- iptr = ERR_PTR(-EIO);
- goto out_unmap;
}

/*
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 7e11d71..6a358ab 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -499,30 +499,21 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return true;
}

-/* RPC/RDMA marshaling may choose to send payload bearing ops inline,
- * if the resulting Call message is smaller than the inline threshold.
- * The value of the "rq_callsize" argument accounts for RPC header
- * requirements, but not for the data payload in these cases.
- *
- * See rpcrdma_inline_pullup.
- */
static bool
rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
size_t size, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
- size_t min_size;

if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
return true;

- min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
- rb = rpcrdma_alloc_regbuf(min_size, DMA_TO_DEVICE, flags);
+ rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

rpcrdma_free_regbuf(req->rl_sendbuf);
- r_xprt->rx_stats.hardway_register_count += min_size;
+ r_xprt->rx_stats.hardway_register_count += size;
req->rl_sendbuf = rb;
return true;
}
@@ -623,14 +614,15 @@ xprt_rdma_free(struct rpc_task *task)
struct rpc_rqst *rqst = task->tk_rqstp;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;

if (req->rl_backchannel)
return;

dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);

- r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req,
- !RPC_IS_ASYNC(task));
+ ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
+ rpcrdma_unmap_sges(ia, req);
rpcrdma_buffer_put(req);
}

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 03ba529..4b09ccf 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -492,7 +492,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
unsigned int max_qp_wr;
int rc;

- if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
+ if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_SEND_SGES) {
dprintk("RPC: %s: insufficient sge's available\n",
__func__);
return -ENOMEM;
@@ -521,7 +521,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */
- ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
+ ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_SEND_SGES;
ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
@@ -890,7 +890,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
INIT_LIST_HEAD(&req->rl_registered);
req->rl_send_wr.next = NULL;
req->rl_send_wr.wr_cqe = &req->rl_cqe;
- req->rl_send_wr.sg_list = req->rl_send_iov;
+ req->rl_send_wr.sg_list = req->rl_send_sge;
req->rl_send_wr.opcode = IB_WR_SEND;
return req;
}
@@ -1287,11 +1287,9 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
- struct ib_device *device = ia->ri_device;
struct ib_send_wr *send_wr = &req->rl_send_wr;
struct ib_send_wr *send_wr_fail;
- struct ib_sge *sge = req->rl_send_iov;
- int i, rc;
+ int rc;

if (req->rl_reply) {
rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
@@ -1300,9 +1298,6 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
req->rl_reply = NULL;
}

- for (i = 0; i < send_wr->num_sge; i++)
- ib_dma_sync_single_for_device(device, sge[i].addr,
- sge[i].length, DMA_TO_DEVICE);
dprintk("RPC: %s: posting %d s/g entries\n",
__func__, send_wr->num_sge);

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 84cbbe9..0278f38 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -285,16 +285,27 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
char *mr_offset; /* kva if no page, else offset */
};

-#define RPCRDMA_MAX_IOVS (2)
+/* Reserve enough Send SGEs to send a maximum size inline request:
+ * - RPC-over-RDMA header
+ * - xdr_buf head iovec
+ * - RPCRDMA_MAX_INLINE bytes, possibly unaligned, in pages
+ * - xdr_buf tail iovec
+ */
+enum {
+ RPCRDMA_MAX_SEND_PAGES = PAGE_SIZE + RPCRDMA_MAX_INLINE - 1,
+ RPCRDMA_MAX_PAGE_SGES = (RPCRDMA_MAX_SEND_PAGES >> PAGE_SHIFT) + 1,
+ RPCRDMA_MAX_SEND_SGES = 1 + 1 + RPCRDMA_MAX_PAGE_SGES + 1,
+};

struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_free;
+ unsigned int rl_mapped_sges;
unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;
struct ib_send_wr rl_send_wr;
- struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
+ struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES];
struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
@@ -528,6 +539,18 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
+
+enum rpcrdma_chunktype {
+ rpcrdma_noch = 0,
+ rpcrdma_readch,
+ rpcrdma_areadch,
+ rpcrdma_writech,
+ rpcrdma_replych
+};
+
+bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
+ u32, struct xdr_buf *, enum rpcrdma_chunktype);
+void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
int rpcrdma_marshal_req(struct rpc_rqst *);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);



2016-08-23 17:54:46

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 19/22] xprtrdma: Support larger inline thresholds

RPC-over-RDMA Version Two will likely require at least a 4KB inline
threshold by default. The Version One inline threshold is still 1KB,
and it's automatically negotiated down to in
rpcrdma_update_connect_private.

The maximum is somewhat arbitrary. There's no fundamental
architectural limit I'm aware of, but it's good to keep the size of
Receive buffers reasonable. Now that Send can use a s/g list, a
Send buffer is only as large as each RPC requires. Receive buffers
are always the size of the inline threshold, however.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprtrdma.h | 4 ++--
net/sunrpc/xprtrdma/transport.c | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
index 39267dc..221b7a2 100644
--- a/include/linux/sunrpc/xprtrdma.h
+++ b/include/linux/sunrpc/xprtrdma.h
@@ -53,8 +53,8 @@
#define RPCRDMA_MAX_SLOT_TABLE (256U)

#define RPCRDMA_MIN_INLINE (1024) /* min inline thresh */
-#define RPCRDMA_DEF_INLINE (1024) /* default inline thresh */
-#define RPCRDMA_MAX_INLINE (3068) /* max inline thresh */
+#define RPCRDMA_DEF_INLINE (4096) /* default inline thresh */
+#define RPCRDMA_MAX_INLINE (65536) /* max inline thresh */

/* Memory registration strategies, by number.
* This is part of a kernel / user space API. Do not remove. */
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 6a358ab..ed5e285 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -97,7 +97,7 @@ static struct ctl_table xr_tunables_table[] = {
.data = &xprt_rdma_max_inline_read,
.maxlen = sizeof(unsigned int),
.mode = 0644,
- .proc_handler = proc_dointvec,
+ .proc_handler = proc_dointvec_minmax,
.extra1 = &min_inline_size,
.extra2 = &max_inline_size,
},
@@ -106,7 +106,7 @@ static struct ctl_table xr_tunables_table[] = {
.data = &xprt_rdma_max_inline_write,
.maxlen = sizeof(unsigned int),
.mode = 0644,
- .proc_handler = proc_dointvec,
+ .proc_handler = proc_dointvec_minmax,
.extra1 = &min_inline_size,
.extra2 = &max_inline_size,
},


2016-08-23 17:54:48

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 20/22] xprtrmda: Report address of frmr, not mw

Tie frwr debugging messages together by always reporting the address
of the frwr.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index e82d5cf..2e51ef5 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -163,7 +163,7 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
return PTR_ERR(f->fr_mr);
}

- dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
+ dprintk("RPC: %s: recovered FRMR %p\n", __func__, f);
f->fr_state = FRMR_IS_INVALID;
return 0;
}
@@ -397,7 +397,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
goto out_mapmr_err;

dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n",
- __func__, mw, mw->mw_nents, mr->length);
+ __func__, frmr, mw->mw_nents, mr->length);

key = (u8)(mr->rkey & 0x000000FF);
ib_update_fast_reg_key(mr, ++key);
@@ -450,6 +450,9 @@ __frwr_prepare_linv_wr(struct rpcrdma_mw *mw)
struct rpcrdma_frmr *f = &mw->frmr;
struct ib_send_wr *invalidate_wr;

+ dprintk("RPC: %s: invalidating frmr %p\n",
+ __func__, f);
+
f->fr_state = FRMR_IS_INVALID;
invalidate_wr = &f->fr_invwr;

@@ -532,6 +535,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
unmap:
list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
+ dprintk("RPC: %s: unmapping frmr %p\n",
+ __func__, &mw->frmr);
list_del_init(&mw->mw_list);
ib_dma_unmap_sg(ia->ri_device,
mw->mw_sg, mw->mw_nents, mw->mw_dir);


2016-08-23 17:54:56

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 21/22] xprtrdma: Rename rpcrdma_receive_wc()

Clean up: When converting xprtrdma to use the new CQ API, I missed a
spot. The naming convention elsewhere is:

{svc_rdma,rpcrdma}_wc_{operation}

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 4b09ccf..83f7886 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -160,13 +160,13 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
}

/**
- * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
*/
static void
-rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
+rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
{
struct ib_cqe *cqe = wc->wr_cqe;
struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
@@ -916,7 +916,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
}

rep->rr_device = ia->ri_device;
- rep->rr_cqe.done = rpcrdma_receive_wc;
+ rep->rr_cqe.done = rpcrdma_wc_receive;
rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
rep->rr_recv_wr.next = NULL;


2016-08-23 17:54:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 17/22] xprtrdma: Basic support for Remote Invalidation

Have frwr's ro_unmap_sync recognize an invalidated rkey that appears
as part of a Receive completion. Local invalidation can be skipped
for that rkey.

Use an out-of-band signaling mechanism to indicate to the server
that the client is prepared to receive RDMA Send With Invalidate.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 2 ++
net/sunrpc/xprtrdma/frwr_ops.c | 13 +++++++++++++
net/sunrpc/xprtrdma/rpc_rdma.c | 18 ++++++++++++++----
net/sunrpc/xprtrdma/transport.c | 5 +++--
net/sunrpc/xprtrdma/verbs.c | 8 +++++++-
net/sunrpc/xprtrdma/xprt_rdma.h | 5 +++++
6 files changed, 44 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 16690a1..1ebb09e 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -273,6 +273,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
list_for_each_entry(mw, &req->rl_registered, mw_list)
list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
+ r_xprt->rx_stats.local_inv_needed++;
rc = ib_unmap_fmr(&unmap_list);
if (rc)
goto out_reset;
@@ -330,4 +331,5 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_init_mr = fmr_op_init_mr,
.ro_release_mr = fmr_op_release_mr,
.ro_displayname = "fmr",
+ .ro_send_w_inv_ok = 0,
};
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index fcfcf3a..e82d5cf 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -67,6 +67,8 @@
* pending send queue WRs before the transport is reconnected.
*/

+#include <linux/sunrpc/rpc_rdma.h>
+
#include "xprt_rdma.h"

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -471,6 +473,7 @@ static void
frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
+ struct rpcrdma_rep *rep = req->rl_reply;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mw *mw, *tmp;
struct rpcrdma_frmr *f;
@@ -486,6 +489,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
f = NULL;
invalidate_wrs = pos = prev = NULL;
list_for_each_entry(mw, &req->rl_registered, mw_list) {
+ if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
+ (mw->mw_handle == rep->rr_inv_rkey)) {
+ mw->frmr.fr_state = FRMR_IS_INVALID;
+ continue;
+ }
+
pos = __frwr_prepare_linv_wr(mw);

if (!invalidate_wrs)
@@ -495,6 +504,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
prev = pos;
f = &mw->frmr;
}
+ if (!f)
+ goto unmap;

/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
@@ -509,6 +520,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* replaces the QP. The RPC reply handler won't call us
* unless ri_id->qp is a valid pointer.
*/
+ r_xprt->rx_stats.local_inv_needed++;
rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
if (rc)
goto reset_mrs;
@@ -575,4 +587,5 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_init_mr = frwr_op_init_mr,
.ro_release_mr = frwr_op_release_mr,
.ro_displayname = "frwr",
+ .ro_send_w_inv_ok = RPCRDMA_CMP_F_SND_W_INV_OK,
};
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index ea734c2..31a434d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -231,7 +231,8 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)

static int
rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
- enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
+ enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg,
+ bool reminv_expected)
{
int len, n, p, page_base;
struct page **ppages;
@@ -273,6 +274,13 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
if (type == rpcrdma_readch)
return n;

+ /* When encoding the Write list, some servers need to see an extra
+ * segment for odd-length Write chunks. The upper layer provides
+ * space in the tail iovec for this purpose.
+ */
+ if (type == rpcrdma_writech && reminv_expected)
+ return n;
+
if (xdrbuf->tail[0].iov_len) {
/* the rpcrdma protocol allows us to omit any trailing
* xdr pad bytes, saving the server an RDMA operation. */
@@ -329,7 +337,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
if (rtype == rpcrdma_areadch)
pos = 0;
seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, false);
if (nsegs < 0)
return ERR_PTR(nsegs);

@@ -393,7 +401,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
rqst->rq_rcv_buf.head[0].iov_len,
- wtype, seg);
+ wtype, seg,
+ r_xprt->rx_ia.ri_reminv_expected);
if (nsegs < 0)
return ERR_PTR(nsegs);

@@ -458,7 +467,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
}

seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+ r_xprt->rx_ia.ri_reminv_expected);
if (nsegs < 0)
return ERR_PTR(nsegs);

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 5adaa1d..7e11d71 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -730,10 +730,11 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
r_xprt->rx_stats.failed_marshal_count,
r_xprt->rx_stats.bad_reply_count,
r_xprt->rx_stats.nomsg_call_count);
- seq_printf(seq, "%lu %lu %lu\n",
+ seq_printf(seq, "%lu %lu %lu %lu\n",
r_xprt->rx_stats.mrs_recovered,
r_xprt->rx_stats.mrs_orphaned,
- r_xprt->rx_stats.mrs_allocated);
+ r_xprt->rx_stats.mrs_allocated,
+ r_xprt->rx_stats.local_inv_needed);
}

static int
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 7cdfa2a..03ba529 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -184,6 +184,9 @@ rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
__func__, rep, wc->byte_len);

rep->rr_len = wc->byte_len;
+ rep->rr_wc_flags = wc->wc_flags;
+ rep->rr_inv_rkey = wc->ex.invalidate_rkey;
+
ib_dma_sync_single_for_cpu(rep->rr_device,
rdmab_addr(rep->rr_rdmabuf),
rep->rr_len, DMA_FROM_DEVICE);
@@ -211,12 +214,15 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
const struct rpcrdma_connect_private *pmsg = param->private_data;
unsigned int rsize, wsize;

+ /* Default settings for RPC-over-RDMA Version One */
+ r_xprt->rx_ia.ri_reminv_expected = false;
rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
wsize = RPCRDMA_V1_DEF_INLINE_SIZE;

if (pmsg &&
pmsg->cp_magic == rpcrdma_cmp_magic &&
pmsg->cp_version == RPCRDMA_CMP_VERSION) {
+ r_xprt->rx_ia.ri_reminv_expected = true;
rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
}
@@ -567,7 +573,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
/* Prepare RDMA-CM private message */
pmsg->cp_magic = rpcrdma_cmp_magic;
pmsg->cp_version = RPCRDMA_CMP_VERSION;
- pmsg->cp_flags = 0;
+ pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
ep->rep_remote_cma.private_data = pmsg;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index f108518..84cbbe9 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -74,6 +74,7 @@ struct rpcrdma_ia {
unsigned int ri_max_frmr_depth;
unsigned int ri_max_inline_write;
unsigned int ri_max_inline_read;
+ bool ri_reminv_expected;
struct ib_qp_attr ri_qp_attr;
struct ib_qp_init_attr ri_qp_init_attr;
};
@@ -187,6 +188,8 @@ enum {
struct rpcrdma_rep {
struct ib_cqe rr_cqe;
unsigned int rr_len;
+ int rr_wc_flags;
+ u32 rr_inv_rkey;
struct ib_device *rr_device;
struct rpcrdma_xprt *rr_rxprt;
struct work_struct rr_work;
@@ -384,6 +387,7 @@ struct rpcrdma_stats {
unsigned long mrs_recovered;
unsigned long mrs_orphaned;
unsigned long mrs_allocated;
+ unsigned long local_inv_needed;
};

/*
@@ -407,6 +411,7 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_mw *);
void (*ro_release_mr)(struct rpcrdma_mw *);
const char *ro_displayname;
+ const int ro_send_w_inv_ok;
};

extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;


2016-08-23 17:54:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 14/22] xprtrdma: Move recv_wr to struct rpcrdma_rep

Clean up: The fields in the recv_wr do not vary. There is no need to
initialize them before each ib_post_recv(). This removes a large-ish
data structure from the stack.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 13 ++++++-------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index d5e88ee..edc81ac 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -879,6 +879,10 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_cqe.done = rpcrdma_receive_wc;
rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
+ rep->rr_recv_wr.next = NULL;
+ rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
+ rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
+ rep->rr_recv_wr.num_sge = 1;
return rep;

out_free:
@@ -1283,17 +1287,12 @@ int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
struct rpcrdma_rep *rep)
{
- struct ib_recv_wr recv_wr, *recv_wr_fail;
+ struct ib_recv_wr *recv_wr_fail;
int rc;

- recv_wr.next = NULL;
- recv_wr.wr_cqe = &rep->rr_cqe;
- recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
- recv_wr.num_sge = 1;
-
if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
goto out_map;
- rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
+ rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail);
if (rc)
goto out_postrecv;
return 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a6a0336..4ca9cf5 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -189,6 +189,7 @@ struct rpcrdma_rep {
struct rpcrdma_xprt *rr_rxprt;
struct work_struct rr_work;
struct list_head rr_list;
+ struct ib_recv_wr rr_recv_wr;
struct rpcrdma_regbuf *rr_rdmabuf;
};



2016-08-23 17:54:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 16/22] xprtrdma: Client-side support for rpcrdma_connect_private

Send an RDMA-CM private message on connect, and look for one during
a connection-established event.

Both sides can communicate their various implementation limits.
Implementations that don't support this sideband protocol ignore it.

Once the client knows the server's inline threshold maxima, it can
adjust the use of Reply chunks, and eliminate most use of Position
Zero Read chunks. Moderately-sized I/O can be done using a pure
inline RDMA Send instead of RDMA operations that require memory
registration.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 4 ++++
net/sunrpc/xprtrdma/fmr_ops.c | 5 ++---
net/sunrpc/xprtrdma/frwr_ops.c | 5 ++---
net/sunrpc/xprtrdma/rpc_rdma.c | 8 +++++---
net/sunrpc/xprtrdma/verbs.c | 40 ++++++++++++++++++++++++++++++++++++---
net/sunrpc/xprtrdma/xprt_rdma.h | 6 +++---
6 files changed, 53 insertions(+), 15 deletions(-)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index a7da6bf..cfda6ad 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -46,6 +46,10 @@
#define RPCRDMA_VERSION 1
#define rpcrdma_version cpu_to_be32(RPCRDMA_VERSION)

+enum {
+ RPCRDMA_V1_DEF_INLINE_SIZE = 1024,
+};
+
struct rpcrdma_segment {
__be32 rs_handle; /* Registered memory handle */
__be32 rs_length; /* Length of the chunk in bytes */
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 21cb3b1..16690a1 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -160,9 +160,8 @@ static int
fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct rpcrdma_create_data_internal *cdata)
{
- rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
- RPCRDMA_MAX_DATA_SEGS /
- RPCRDMA_MAX_FMR_SGES));
+ ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
+ RPCRDMA_MAX_FMR_SGES);
return 0;
}

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 892b5e1..fcfcf3a 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -242,9 +242,8 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
depth;
}

- rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
- RPCRDMA_MAX_DATA_SEGS /
- ia->ri_max_frmr_depth));
+ ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
+ ia->ri_max_frmr_depth);
return 0;
}

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c2906e3..ea734c2 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -118,10 +118,12 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
return size;
}

-void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
- struct rpcrdma_create_data_internal *cdata,
- unsigned int maxsegs)
+void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ unsigned int maxsegs = ia->ri_max_segs;
+
ia->ri_max_inline_write = cdata->inline_wsize -
rpcrdma_max_call_header_size(maxsegs);
ia->ri_max_inline_read = cdata->inline_rsize -
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index edc81ac..7cdfa2a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -203,6 +203,33 @@ out_fail:
goto out_schedule;
}

+static void
+rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
+ struct rdma_conn_param *param)
+{
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ const struct rpcrdma_connect_private *pmsg = param->private_data;
+ unsigned int rsize, wsize;
+
+ rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
+ wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
+
+ if (pmsg &&
+ pmsg->cp_magic == rpcrdma_cmp_magic &&
+ pmsg->cp_version == RPCRDMA_CMP_VERSION) {
+ rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
+ wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
+ }
+
+ if (rsize < cdata->inline_rsize)
+ cdata->inline_rsize = rsize;
+ if (wsize < cdata->inline_wsize)
+ cdata->inline_wsize = wsize;
+ pr_info("rpcrdma: max send %u, max recv %u\n",
+ cdata->inline_wsize, cdata->inline_rsize);
+ rpcrdma_set_max_header_sizes(r_xprt);
+}
+
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
@@ -243,6 +270,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
" (%d initiator)\n",
__func__, attr->max_dest_rd_atomic,
attr->max_rd_atomic);
+ rpcrdma_update_connect_private(xprt, &event->param.conn);
goto connected;
case RDMA_CM_EVENT_CONNECT_ERROR:
connstate = -ENOTCONN;
@@ -453,6 +481,7 @@ int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct rpcrdma_create_data_internal *cdata)
{
+ struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
struct ib_cq *sendcq, *recvcq;
unsigned int max_qp_wr;
int rc;
@@ -535,9 +564,14 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
/* Initialize cma parameters */
memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));

- /* RPC/RDMA does not use private data */
- ep->rep_remote_cma.private_data = NULL;
- ep->rep_remote_cma.private_data_len = 0;
+ /* Prepare RDMA-CM private message */
+ pmsg->cp_magic = rpcrdma_cmp_magic;
+ pmsg->cp_version = RPCRDMA_CMP_VERSION;
+ pmsg->cp_flags = 0;
+ pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
+ pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
+ ep->rep_remote_cma.private_data = pmsg;
+ ep->rep_remote_cma.private_data_len = sizeof(*pmsg);

/* Client offers RDMA Read but does not initiate */
ep->rep_remote_cma.initiator_depth = 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 4ca9cf5..f108518 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -70,6 +70,7 @@ struct rpcrdma_ia {
struct ib_pd *ri_pd;
struct completion ri_done;
int ri_async_rc;
+ unsigned int ri_max_segs;
unsigned int ri_max_frmr_depth;
unsigned int ri_max_inline_write;
unsigned int ri_max_inline_read;
@@ -87,6 +88,7 @@ struct rpcrdma_ep {
int rep_connected;
struct ib_qp_init_attr rep_attr;
wait_queue_head_t rep_connect_wait;
+ struct rpcrdma_connect_private rep_cm_private;
struct rdma_conn_param rep_remote_cma;
struct sockaddr_storage rep_remote_addr;
struct delayed_work rep_connect_worker;
@@ -522,9 +524,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
int rpcrdma_marshal_req(struct rpc_rqst *);
-void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *,
- struct rpcrdma_create_data_internal *,
- unsigned int);
+void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);

/* RPC/RDMA module init - xprtrdma/transport.c
*/


2016-08-23 17:54:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 15/22] rpcrdma: RDMA/CM private message data structure

Introduce data structure used by both client and server to exchange
implementation details during RDMA/CM connection establishment.

This is an experimental out-of-band exchange between Linux
RPC-over-RDMA Version One implementations, replacing the deprecated
CCP (see RFC 5666bis). The purpose of this extension is to enable
prototyping of features that might be introduced in a subsequent
version of RPC-over-RDMA.

Suggested by Christoph Hellwig and Devesh Sharma.

Signed-off-by: Chuck Lever <[email protected]>
Reviewed-by: Sagi Grimberg <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 35 +++++++++++++++++++++++++++++++++++
1 file changed, 35 insertions(+)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index 3b1ff38..a7da6bf 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -41,6 +41,7 @@
#define _LINUX_SUNRPC_RPC_RDMA_H

#include <linux/types.h>
+#include <linux/bitops.h>

#define RPCRDMA_VERSION 1
#define rpcrdma_version cpu_to_be32(RPCRDMA_VERSION)
@@ -129,4 +130,38 @@ enum rpcrdma_proc {
#define rdma_done cpu_to_be32(RDMA_DONE)
#define rdma_error cpu_to_be32(RDMA_ERROR)

+/*
+ * Private extension to RPC-over-RDMA Version One.
+ * Message passed during RDMA-CM connection set-up.
+ *
+ * Add new fields at the end, and don't permute existing
+ * fields.
+ */
+struct rpcrdma_connect_private {
+ __be32 cp_magic;
+ u8 cp_version;
+ u8 cp_flags;
+ u8 cp_send_size;
+ u8 cp_recv_size;
+} __packed;
+
+#define rpcrdma_cmp_magic __cpu_to_be32(0xf6ab0e18)
+
+enum {
+ RPCRDMA_CMP_VERSION = 1,
+ RPCRDMA_CMP_F_SND_W_INV_OK = BIT(0),
+};
+
+static inline u8
+rpcrdma_encode_buffer_size(unsigned int size)
+{
+ return (size >> 10) - 1;
+}
+
+static inline unsigned int
+rpcrdma_decode_buffer_size(u8 val)
+{
+ return ((unsigned int)val + 1) << 10;
+}
+
#endif /* _LINUX_SUNRPC_RPC_RDMA_H */


2016-08-23 17:54:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 10/22] xprtrdma: Delay DMA mapping Send and Receive buffers

Currently, each regbuf is allocated and DMA mapped at the same time.
This is done during transport creation.

When a device driver is unloaded, every DMA-mapped buffer in use by
a transport has to be unmapped, and then remapped to the new
device if the driver is loaded again. Remapping will have to be done
_after_ the connect worker has set up the new device.

But there's an ordering problem:

call_allocate, which invokes xprt_rdma_allocate which calls
rpcrdma_alloc_regbuf to allocate Send buffers, happens _before_
the connect worker can run to set up the new device.

Instead, at transport creation, allocate each buffer, but leave it
unmapped. Once the RPC carries these buffers into ->send_request, by
which time a transport connection should have been established,
check to see that the RPC's buffers have been DMA mapped. If not,
map them there.

When device driver unplug support is added, it will simply unmap all
the transport's regbufs, but it doesn't have to deallocate the
underlying memory.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 8 ++++
net/sunrpc/xprtrdma/rpc_rdma.c | 9 +++++
net/sunrpc/xprtrdma/verbs.c | 71 +++++++++++++++++++++++--------------
net/sunrpc/xprtrdma/xprt_rdma.h | 16 ++++++++
4 files changed, 78 insertions(+), 26 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index ceae872..8bc249e 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -230,16 +230,24 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
__func__, (int)rpclen, rqst->rq_svec[0].iov_base);
#endif

+ if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
+ goto out_map;
req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);

+ if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
+ goto out_map;
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);

req->rl_niovs = 2;
return 0;
+
+out_map:
+ pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+ return -EIO;
}

/**
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 845586f..68a39c0 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -681,6 +681,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
transfertypes[rtype], transfertypes[wtype],
hdrlen, rpclen);

+ if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
+ goto out_map;
req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
@@ -689,6 +691,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
if (rtype == rpcrdma_areadch)
return 0;

+ if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
+ goto out_map;
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
@@ -704,6 +708,11 @@ out_overflow:
out_unmap:
r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
return PTR_ERR(iptr);
+
+out_map:
+ pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+ iptr = ERR_PTR(-EIO);
+ goto out_unmap;
}

/*
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 0bd2ace..b88bcf4 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1160,9 +1160,8 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
* @direction: direction of data movement
* @flags: GFP flags
*
- * Returns an ERR_PTR, or a pointer to a regbuf, which is a
- * contiguous memory region that is DMA mapped persistently, and
- * is registered for local I/O.
+ * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
+ * can be persistently DMA-mapped for I/O.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
* receiving the payload of RDMA RECV operations. During Long Calls
@@ -1173,32 +1172,50 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size,
enum dma_data_direction direction, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
- struct ib_sge *iov;

rb = kmalloc(sizeof(*rb) + size, flags);
if (rb == NULL)
- goto out;
+ return ERR_PTR(-ENOMEM);

+ rb->rg_device = NULL;
rb->rg_direction = direction;
- iov = &rb->rg_iov;
- iov->length = size;
- iov->lkey = ia->ri_pd->local_dma_lkey;
-
- if (direction != DMA_NONE) {
- iov->addr = ib_dma_map_single(ia->ri_device,
- (void *)rb->rg_base,
- rdmab_length(rb),
- rb->rg_direction);
- if (ib_dma_mapping_error(ia->ri_device, iov->addr))
- goto out_free;
- }
+ rb->rg_iov.length = size;

return rb;
+}

-out_free:
- kfree(rb);
-out:
- return ERR_PTR(-ENOMEM);
+/**
+ * __rpcrdma_map_regbuf - DMA-map a regbuf
+ * @ia: controlling rpcrdma_ia
+ * @rb: regbuf to be mapped
+ */
+bool
+__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+ if (rb->rg_direction == DMA_NONE)
+ return false;
+
+ rb->rg_iov.addr = ib_dma_map_single(ia->ri_device,
+ (void *)rb->rg_base,
+ rdmab_length(rb),
+ rb->rg_direction);
+ if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb)))
+ return false;
+
+ rb->rg_device = ia->ri_device;
+ rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
+ return true;
+}
+
+static void
+rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
+{
+ if (!rpcrdma_regbuf_is_mapped(rb))
+ return;
+
+ ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
+ rdmab_length(rb), rb->rg_direction);
+ rb->rg_device = NULL;
}

/**
@@ -1212,11 +1229,7 @@ rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
if (!rb)
return;

- if (rb->rg_direction != DMA_NONE) {
- ib_dma_unmap_single(ia->ri_device, rdmab_addr(rb),
- rdmab_length(rb), rb->rg_direction);
- }
-
+ rpcrdma_dma_unmap_regbuf(rb);
kfree(rb);
}

@@ -1288,11 +1301,17 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;

+ if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
+ goto out_map;
rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
if (rc)
goto out_postrecv;
return 0;

+out_map:
+ pr_err("rpcrdma: failed to DMA map the Receive buffer\n");
+ return -EIO;
+
out_postrecv:
pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
return -ENOTCONN;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index eea7d9d..66f47b2 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -113,6 +113,7 @@ struct rpcrdma_ep {

struct rpcrdma_regbuf {
struct ib_sge rg_iov;
+ struct ib_device *rg_device;
enum dma_data_direction rg_direction;
__be32 rg_base[0] __attribute__ ((aligned(256)));
};
@@ -479,9 +480,24 @@ void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);
struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
size_t, enum dma_data_direction,
gfp_t);
+bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *);
void rpcrdma_free_regbuf(struct rpcrdma_ia *,
struct rpcrdma_regbuf *);

+static inline bool
+rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_device != NULL;
+}
+
+static inline bool
+rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+ if (likely(rpcrdma_regbuf_is_mapped(rb)))
+ return true;
+ return __rpcrdma_dma_map_regbuf(ia, rb);
+}
+
int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);

int rpcrdma_alloc_wq(void);


2016-08-23 17:54:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 13/22] xprtrdma: Move send_wr to struct rpcrdma_req

Clean up: Most of the fields in each send_wr do not vary. There is
no need to initialize them before each ib_post_send(). This removes
a large-ish data structure from the stack.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 3 ++-
net/sunrpc/xprtrdma/rpc_rdma.c | 5 +++--
net/sunrpc/xprtrdma/verbs.c | 36 +++++++++++++++++-------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 4 ++--
4 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 887ef44..61a58f5 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -241,7 +241,8 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);

- req->rl_niovs = 2;
+ req->rl_send_wr.num_sge = 2;
+
return 0;

out_map:
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 6187cee..c2906e3 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -687,7 +687,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);

- req->rl_niovs = 1;
+ req->rl_send_wr.num_sge = 1;
if (rtype == rpcrdma_areadch)
return 0;

@@ -697,7 +697,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);

- req->rl_niovs = 2;
+ req->rl_send_wr.num_sge = 2;
+
return 0;

out_overflow:
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 81cae65..d5e88ee 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -848,6 +848,10 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
req->rl_cqe.done = rpcrdma_wc_send;
req->rl_buffer = &r_xprt->rx_buf;
INIT_LIST_HEAD(&req->rl_registered);
+ req->rl_send_wr.next = NULL;
+ req->rl_send_wr.wr_cqe = &req->rl_cqe;
+ req->rl_send_wr.sg_list = req->rl_send_iov;
+ req->rl_send_wr.opcode = IB_WR_SEND;
return req;
}

@@ -1112,7 +1116,7 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
struct rpcrdma_buffer *buffers = req->rl_buffer;
struct rpcrdma_rep *rep = req->rl_reply;

- req->rl_niovs = 0;
+ req->rl_send_wr.num_sge = 0;
req->rl_reply = NULL;

spin_lock(&buffers->rb_lock);
@@ -1240,38 +1244,32 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_req *req)
{
struct ib_device *device = ia->ri_device;
- struct ib_send_wr send_wr, *send_wr_fail;
- struct rpcrdma_rep *rep = req->rl_reply;
- struct ib_sge *iov = req->rl_send_iov;
+ struct ib_send_wr *send_wr = &req->rl_send_wr;
+ struct ib_send_wr *send_wr_fail;
+ struct ib_sge *sge = req->rl_send_iov;
int i, rc;

- if (rep) {
- rc = rpcrdma_ep_post_recv(ia, rep);
+ if (req->rl_reply) {
+ rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
if (rc)
return rc;
req->rl_reply = NULL;
}

- send_wr.next = NULL;
- send_wr.wr_cqe = &req->rl_cqe;
- send_wr.sg_list = iov;
- send_wr.num_sge = req->rl_niovs;
- send_wr.opcode = IB_WR_SEND;
-
- for (i = 0; i < send_wr.num_sge; i++)
- ib_dma_sync_single_for_device(device, iov[i].addr,
- iov[i].length, DMA_TO_DEVICE);
+ for (i = 0; i < send_wr->num_sge; i++)
+ ib_dma_sync_single_for_device(device, sge[i].addr,
+ sge[i].length, DMA_TO_DEVICE);
dprintk("RPC: %s: posting %d s/g entries\n",
- __func__, send_wr.num_sge);
+ __func__, send_wr->num_sge);

if (DECR_CQCOUNT(ep) > 0)
- send_wr.send_flags = 0;
+ send_wr->send_flags = 0;
else { /* Provider must take a send completion every now and then */
INIT_CQCOUNT(ep);
- send_wr.send_flags = IB_SEND_SIGNALED;
+ send_wr->send_flags = IB_SEND_SIGNALED;
}

- rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
+ rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
if (rc)
goto out_postsend_err;
return 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 44efff7..a6a0336 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -284,10 +284,10 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_free;
- unsigned int rl_niovs;
unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
- struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
+ struct rpcrdma_rep *rl_reply;
+ struct ib_send_wr rl_send_wr;
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */


2016-08-23 17:54:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 11/22] xprtrdma: Eliminate "ia" argument in rpcrdma_{alloc, free}_regbuf

Clean up. The "ia" argument is no longer used.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 7 +++----
net/sunrpc/xprtrdma/transport.c | 11 +++++------
net/sunrpc/xprtrdma/verbs.c | 28 ++++++++++++----------------
net/sunrpc/xprtrdma/xprt_rdma.h | 8 +++-----
4 files changed, 23 insertions(+), 31 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 8bc249e..a19530d 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -27,7 +27,7 @@ static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
list_del(&req->rl_all);
spin_unlock(&buf->rb_reqslock);

- rpcrdma_destroy_req(&r_xprt->rx_ia, req);
+ rpcrdma_destroy_req(req);

kfree(rqst);
}
@@ -35,7 +35,6 @@ static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
size_t size;
@@ -45,14 +44,14 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
return PTR_ERR(req);
req->rl_backchannel = true;

- rb = rpcrdma_alloc_regbuf(ia, RPCRDMA_HDRBUF_SIZE,
+ rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
DMA_TO_DEVICE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;

size = r_xprt->rx_data.inline_rsize;
- rb = rpcrdma_alloc_regbuf(ia, size, DMA_TO_DEVICE, GFP_KERNEL);
+ rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_sendbuf = rb;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 3424691..5adaa1d 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -490,7 +490,7 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (req->rl_rdmabuf)
return true;

- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, DMA_TO_DEVICE, flags);
+ rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

@@ -517,12 +517,11 @@ rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return true;

min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size,
- DMA_TO_DEVICE, flags);
+ rb = rpcrdma_alloc_regbuf(min_size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

- rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ rpcrdma_free_regbuf(req->rl_sendbuf);
r_xprt->rx_stats.hardway_register_count += min_size;
req->rl_sendbuf = rb;
return true;
@@ -548,11 +547,11 @@ rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
return true;

- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, DMA_NONE, flags);
+ rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
if (IS_ERR(rb))
return false;

- rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_recvbuf);
+ rpcrdma_free_regbuf(req->rl_recvbuf);
r_xprt->rx_stats.hardway_register_count += size;
req->rl_recvbuf = rb;
return true;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index b88bcf4..ba1ca88 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -864,7 +864,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
if (rep == NULL)
goto out;

- rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
+ rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
DMA_FROM_DEVICE, GFP_KERNEL);
if (IS_ERR(rep->rr_rdmabuf)) {
rc = PTR_ERR(rep->rr_rdmabuf);
@@ -965,18 +965,18 @@ rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
}

static void
-rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
+rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
{
- rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
+ rpcrdma_free_regbuf(rep->rr_rdmabuf);
kfree(rep);
}

void
-rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+rpcrdma_destroy_req(struct rpcrdma_req *req)
{
- rpcrdma_free_regbuf(ia, req->rl_recvbuf);
- rpcrdma_free_regbuf(ia, req->rl_sendbuf);
- rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
+ rpcrdma_free_regbuf(req->rl_recvbuf);
+ rpcrdma_free_regbuf(req->rl_sendbuf);
+ rpcrdma_free_regbuf(req->rl_rdmabuf);
kfree(req);
}

@@ -1009,15 +1009,13 @@ rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
- struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-
cancel_delayed_work_sync(&buf->rb_recovery_worker);

while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;

rep = rpcrdma_buffer_get_rep_locked(buf);
- rpcrdma_destroy_rep(ia, rep);
+ rpcrdma_destroy_rep(rep);
}

spin_lock(&buf->rb_reqslock);
@@ -1029,7 +1027,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
list_del(&req->rl_all);

spin_unlock(&buf->rb_reqslock);
- rpcrdma_destroy_req(ia, req);
+ rpcrdma_destroy_req(req);
spin_lock(&buf->rb_reqslock);
}
spin_unlock(&buf->rb_reqslock);
@@ -1155,7 +1153,6 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)

/**
* rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
- * @ia: controlling rpcrdma_ia
* @size: size of buffer to be allocated, in bytes
* @direction: direction of data movement
* @flags: GFP flags
@@ -1168,8 +1165,8 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
* or Replies they may be registered externally via ro_map.
*/
struct rpcrdma_regbuf *
-rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size,
- enum dma_data_direction direction, gfp_t flags)
+rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
+ gfp_t flags)
{
struct rpcrdma_regbuf *rb;

@@ -1220,11 +1217,10 @@ rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)

/**
* rpcrdma_free_regbuf - deregister and free registered buffer
- * @ia: controlling rpcrdma_ia
* @rb: regbuf to be deregistered and freed
*/
void
-rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
{
if (!rb)
return;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 66f47b2..5447586 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -464,7 +464,7 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
*/
struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
-void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
+void rpcrdma_destroy_req(struct rpcrdma_req *);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);

@@ -477,12 +477,10 @@ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);

void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);

-struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
- size_t, enum dma_data_direction,
+struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction,
gfp_t);
bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *);
-void rpcrdma_free_regbuf(struct rpcrdma_ia *,
- struct rpcrdma_regbuf *);
+void rpcrdma_free_regbuf(struct rpcrdma_regbuf *);

static inline bool
rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)


2016-08-23 17:54:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 12/22] xprtrdma: Simplify rpcrdma_ep_post_recv()

Clean up.

Since commit fc66448549bb ("xprtrdma: Split the completion queue"),
rpcrdma_ep_post_recv() no longer uses the "ep" argument.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 2 +-
net/sunrpc/xprtrdma/rpc_rdma.c | 2 +-
net/sunrpc/xprtrdma/verbs.c | 9 ++-------
net/sunrpc/xprtrdma/xprt_rdma.h | 3 +--
4 files changed, 5 insertions(+), 11 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index a19530d..887ef44 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -397,7 +397,7 @@ out_overflow:
out_short:
pr_warn("RPC/RDMA short backward direction call\n");

- if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
xprt_disconnect_done(xprt);
else
pr_warn("RPC: %s: reposting rep %p\n",
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 68a39c0..6187cee 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1141,6 +1141,6 @@ out_duplicate:

repost:
r_xprt->rx_stats.bad_reply_count++;
- if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
rpcrdma_recv_buffer_put(rep);
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ba1ca88..81cae65 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1246,7 +1246,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
int i, rc;

if (rep) {
- rc = rpcrdma_ep_post_recv(ia, ep, rep);
+ rc = rpcrdma_ep_post_recv(ia, rep);
if (rc)
return rc;
req->rl_reply = NULL;
@@ -1281,12 +1281,8 @@ out_postsend_err:
return -ENOTCONN;
}

-/*
- * (Re)post a receive buffer.
- */
int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
- struct rpcrdma_ep *ep,
struct rpcrdma_rep *rep)
{
struct ib_recv_wr recv_wr, *recv_wr_fail;
@@ -1325,7 +1321,6 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_rep *rep;
int rc;

@@ -1336,7 +1331,7 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
rep = rpcrdma_buffer_get_rep_locked(buffers);
spin_unlock(&buffers->rb_lock);

- rc = rpcrdma_ep_post_recv(ia, ep, rep);
+ rc = rpcrdma_ep_post_recv(ia, rep);
if (rc)
goto out_rc;
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 5447586..44efff7 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -456,8 +456,7 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);

int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
struct rpcrdma_req *);
-int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
- struct rpcrdma_rep *);
+int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_rep *);

/*
* Buffer calls - xprtrdma/verbs.c


2016-08-23 17:56:39

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 22/22] xprtrdma: Eliminate rpcrdma_receive_worker()

Clean up: the extra layer of indirection doesn't add value.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 4 +++-
net/sunrpc/xprtrdma/verbs.c | 11 +----------
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +-
3 files changed, 5 insertions(+), 12 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 20115ad..e210fe6 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -977,8 +977,10 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
* allowed to timeout, to discover the errors at that time.
*/
void
-rpcrdma_reply_handler(struct rpcrdma_rep *rep)
+rpcrdma_reply_handler(struct work_struct *work)
{
+ struct rpcrdma_rep *rep =
+ container_of(work, struct rpcrdma_rep, rr_work);
struct rpcrdma_msg *headerp;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 83f7886..4a1b341 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -128,15 +128,6 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
wc->status, wc->vendor_err);
}

-static void
-rpcrdma_receive_worker(struct work_struct *work)
-{
- struct rpcrdma_rep *rep =
- container_of(work, struct rpcrdma_rep, rr_work);
-
- rpcrdma_reply_handler(rep);
-}
-
/* Perform basic sanity checking to avoid using garbage
* to update the credit grant value.
*/
@@ -918,7 +909,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_device = ia->ri_device;
rep->rr_cqe.done = rpcrdma_wc_receive;
rep->rr_rxprt = r_xprt;
- INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
+ INIT_WORK(&rep->rr_work, rpcrdma_reply_handler);
rep->rr_recv_wr.next = NULL;
rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 0278f38..94c7ab5 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -534,7 +534,7 @@ rpcrdma_data_dir(bool writing)
*/
void rpcrdma_connect_worker(struct work_struct *);
void rpcrdma_conn_func(struct rpcrdma_ep *);
-void rpcrdma_reply_handler(struct rpcrdma_rep *);
+void rpcrdma_reply_handler(struct work_struct *);

/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c


2016-08-23 18:05:56

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 06/22] SUNRPC: Add a transport-specific private field in rpc_rqst

Currently there's a hidden and indirect mechanism for finding the
rpcrdma_req that goes with an rpc_rqst. It depends on getting from
the rq_buffer pointer in struct rpc_rqst to the struct
rpcrdma_regbuf that controls that buffer, and then to the struct
rpcrdma_req it goes with.

This was done back in the day to avoid the need to add a per-rqst
pointer or to alter the buf_free API when support for RPC-over-RDMA
was introduced.

I'm about to change the way regbuf's work to support larger inline
thresholds. Now is a good time to replace this indirect mechanism
with something that is more straightforward. I guess this should be
considered a clean up.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/xprtrdma/backchannel.c | 6 ++----
net/sunrpc/xprtrdma/transport.c | 2 +-
net/sunrpc/xprtrdma/verbs.c | 1 -
net/sunrpc/xprtrdma/xprt_rdma.h | 13 +++++++------
5 files changed, 11 insertions(+), 12 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 46f069e..a5da60b 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -83,6 +83,7 @@ struct rpc_rqst {
void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
struct list_head rq_list;

+ void *rq_xprtdata; /* Per-xprt private data */
void *rq_buffer; /* Call XDR encode buffer */
size_t rq_callsize;
void *rq_rbuffer; /* Reply XDR decode buffer */
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index d3cfaf2..c4904f8 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -55,11 +55,9 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
- rb->rg_owner = req;
req->rl_sendbuf = rb;
- /* so that rpcr_to_rdmar works when receiving a request */
- rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
- xdr_buf_init(&rqst->rq_snd_buf, rqst->rq_buffer, size);
+ xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, size);
+ rpcrdma_set_xprtdata(rqst, req);
return 0;

out_fail:
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 136caf3..d83bffa 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -523,6 +523,7 @@ xprt_rdma_allocate(struct rpc_task *task)
out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
+ rpcrdma_set_xprtdata(rqst, req);
rqst->rq_buffer = req->rl_sendbuf->rg_base;
rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_rcvsize;
return 0;
@@ -559,7 +560,6 @@ out_sendbuf:
rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
if (IS_ERR(rb))
goto out_fail;
- rb->rg_owner = req;

r_xprt->rx_stats.hardway_register_count += size;
rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 536d0be..baf7f43 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1191,7 +1191,6 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
iov->length = size;
iov->lkey = ia->ri_pd->local_dma_lkey;
rb->rg_size = size;
- rb->rg_owner = NULL;
return rb;

out_free:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 296d9ab..99b6839 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -113,7 +113,6 @@ struct rpcrdma_ep {

struct rpcrdma_regbuf {
size_t rg_size;
- struct rpcrdma_req *rg_owner;
struct ib_sge rg_iov;
__be32 rg_base[0] __attribute__ ((aligned(256)));
};
@@ -297,14 +296,16 @@ struct rpcrdma_req {
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};

+static inline void
+rpcrdma_set_xprtdata(struct rpc_rqst *rqst, struct rpcrdma_req *req)
+{
+ rqst->rq_xprtdata = req;
+}
+
static inline struct rpcrdma_req *
rpcr_to_rdmar(struct rpc_rqst *rqst)
{
- void *buffer = rqst->rq_buffer;
- struct rpcrdma_regbuf *rb;
-
- rb = container_of(buffer, struct rpcrdma_regbuf, rg_base);
- return rb->rg_owner;
+ return rqst->rq_xprtdata;
}

/*


2016-08-23 18:06:00

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 08/22] xprtrdma: Use smaller buffers for RPC-over-RDMA headers

Commit 949317464bc2 ("xprtrdma: Limit number of RDMA segments in
RPC-over-RDMA headers") capped the number of chunks that may appear
in RPC-over-RDMA headers. The maximum header size can be estimated
and fixed to avoid allocating buffer space that is never used.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 5 ++---
net/sunrpc/xprtrdma/transport.c | 2 +-
net/sunrpc/xprtrdma/xprt_rdma.h | 5 ++++-
3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index c4904f8..60fc991 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -45,13 +45,12 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
return PTR_ERR(req);
req->rl_backchannel = true;

- size = r_xprt->rx_data.inline_wsize;
- rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ rb = rpcrdma_alloc_regbuf(ia, RPCRDMA_HDRBUF_SIZE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;

- size += r_xprt->rx_data.inline_rsize;
+ size = r_xprt->rx_data.inline_rsize;
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index ecdc3ad..94dbfd3 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -484,7 +484,7 @@ static bool
rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
gfp_t flags)
{
- size_t size = r_xprt->rx_data.inline_wsize;
+ size_t size = RPCRDMA_HDRBUF_SIZE;
struct rpcrdma_regbuf *rb;

if (req->rl_rdmabuf)
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 73f915f..c5137f0 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -160,7 +160,10 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
* The smallest inline threshold is 1024 bytes, ensuring that
* at least 750 bytes are available for RPC messages.
*/
-#define RPCRDMA_MAX_HDR_SEGS (8)
+enum {
+ RPCRDMA_MAX_HDR_SEGS = 8,
+ RPCRDMA_HDRBUF_SIZE = 256,
+};

/*
* struct rpcrdma_rep -- this structure encapsulates state required to recv


2016-08-23 18:05:56

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 07/22] xprtrdma: Initialize separate RPC call and reply buffers

RPC-over-RDMA needs to separate its RPC call and reply buffers.

o When an RPC Call is sent, rq_snd_buf is DMA mapped for an RDMA
Send operation using DMA_TO_DEVICE

o If the client expects a large RPC reply, it DMA maps rq_rcv_buf
as part of a Reply chunk using DMA_FROM_DEVICE

The two mappings are for data movement in opposite directions.

DMA-API.txt suggests that if these mappings share a DMA cacheline,
bad things can happen. This could occur in the final bytes of
rq_snd_buf and the first bytes of rq_rcv_buf if the two buffers
happen to share a DMA cacheline.

On x86_64 the cacheline size is typically 8 bytes, and RPC call
messages are usually much smaller than the send buffer, so this
hasn't been a noticeable problem. But the DMA cacheline size can be
larger on other platforms.

Also, often rq_rcv_buf starts most of the way into a page, thus
an additional RDMA segment is needed to map and register the end of
that buffer. Try to avoid that scenario to reduce the cost of
registering and invalidating Reply chunks.

Instead of carrying a single regbuf that covers both rq_snd_buf and
rq_rcv_buf, each struct rpcrdma_req now carries one regbuf for
rq_snd_buf and one regbuf for rq_rcv_buf.

Some incidental changes worth noting:

- To clear out some spaghetti, refactor xprt_rdma_allocate.
- The value stored in rg_size is the same as the value stored in
the iov.length field, so eliminate rg_size

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 150 +++++++++++++++++++++++++--------------
net/sunrpc/xprtrdma/verbs.c | 2 -
net/sunrpc/xprtrdma/xprt_rdma.h | 6 +-
3 files changed, 99 insertions(+), 59 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index d83bffa..ecdc3ad 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -477,6 +477,86 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
}
}

+/* Allocate a fixed-size buffer in which to construct and send the
+ * RPC-over-RDMA header for this request.
+ */
+static bool
+rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ gfp_t flags)
+{
+ size_t size = r_xprt->rx_data.inline_wsize;
+ struct rpcrdma_regbuf *rb;
+
+ if (req->rl_rdmabuf)
+ return true;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ return false;
+
+ r_xprt->rx_stats.hardway_register_count += size;
+ req->rl_rdmabuf = rb;
+ return true;
+}
+
+/* RPC/RDMA marshaling may choose to send payload bearing ops inline,
+ * if the resulting Call message is smaller than the inline threshold.
+ * The value of the "rq_callsize" argument accounts for RPC header
+ * requirements, but not for the data payload in these cases.
+ *
+ * See rpcrdma_inline_pullup.
+ */
+static bool
+rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ size_t size, gfp_t flags)
+{
+ struct rpcrdma_regbuf *rb;
+ size_t min_size;
+
+ if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
+ return true;
+
+ min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+ if (IS_ERR(rb))
+ return false;
+
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ r_xprt->rx_stats.hardway_register_count += min_size;
+ req->rl_sendbuf = rb;
+ return true;
+}
+
+/* The rq_rcv_buf is used only if a Reply chunk is necessary.
+ * The decision to use a Reply chunk is made later in
+ * rpcrdma_marshal_req. This buffer is registered at that time.
+ *
+ * Otherwise, the associated RPC Reply arrives in a separate
+ * Receive buffer, arbitrarily chosen by the HCA. The buffer
+ * allocated here for the RPC Reply is not utilized in that
+ * case. See rpcrdma_inline_fixup.
+ *
+ * A regbuf is used here to remember the buffer size.
+ */
+static bool
+rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ size_t size, gfp_t flags)
+{
+ struct rpcrdma_regbuf *rb;
+
+ if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
+ return true;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ return false;
+
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_recvbuf);
+ r_xprt->rx_stats.hardway_register_count += size;
+ req->rl_recvbuf = rb;
+ return true;
+}
+
/**
* xprt_rdma_allocate - allocate transport resources for an RPC
* @task: RPC task
@@ -487,22 +567,18 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
* EIO: A permanent error occurred, do not retry
*
* The RDMA allocate/free functions need the task structure as a place
- * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence.
+ * to hide the struct rpcrdma_req, which is necessary for the actual
+ * send/recv sequence.
*
- * The RPC layer allocates both send and receive buffers in the same call
- * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
- * We may register rq_rcv_buf when using reply chunks.
+ * xprt_rdma_allocate provides buffers that are already mapped for
+ * DMA, and a local DMA lkey is provided for each.
*/
static int
xprt_rdma_allocate(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
- size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
- struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
- size_t min_size;
gfp_t flags;

req = rpcrdma_buffer_get(&r_xprt->rx_buf);
@@ -513,59 +589,23 @@ xprt_rdma_allocate(struct rpc_task *task)
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;

- if (req->rl_rdmabuf == NULL)
- goto out_rdmabuf;
- if (req->rl_sendbuf == NULL)
- goto out_sendbuf;
- if (size > req->rl_sendbuf->rg_size)
- goto out_sendbuf;
+ if (!rpcrdma_get_rdmabuf(r_xprt, req, flags))
+ goto out_fail;
+ if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
+ goto out_fail;
+ if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
+ goto out_fail;
+
+ dprintk("RPC: %5u %s: send size = %zd, recv size = %zd, req = %p\n",
+ task->tk_pid, __func__, rqst->rq_callsize,
+ rqst->rq_rcvsize, req);

-out:
- dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
rpcrdma_set_xprtdata(rqst, req);
rqst->rq_buffer = req->rl_sendbuf->rg_base;
- rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_rcvsize;
+ rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
return 0;

-out_rdmabuf:
- min_size = r_xprt->rx_data.inline_wsize;
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
- if (IS_ERR(rb))
- goto out_fail;
- req->rl_rdmabuf = rb;
-
-out_sendbuf:
- /* XDR encoding and RPC/RDMA marshaling of this request has not
- * yet occurred. Thus a lower bound is needed to prevent buffer
- * overrun during marshaling.
- *
- * RPC/RDMA marshaling may choose to send payload bearing ops
- * inline, if the result is smaller than the inline threshold.
- * The value of the "size" argument accounts for header
- * requirements but not for the payload in these cases.
- *
- * Likewise, allocate enough space to receive a reply up to the
- * size of the inline threshold.
- *
- * It's unlikely that both the send header and the received
- * reply will be large, but slush is provided here to allow
- * flexibility when marshaling.
- */
- min_size = r_xprt->rx_data.inline_rsize;
- min_size += r_xprt->rx_data.inline_wsize;
- if (size < min_size)
- size = min_size;
-
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
- if (IS_ERR(rb))
- goto out_fail;
-
- r_xprt->rx_stats.hardway_register_count += size;
- rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
- req->rl_sendbuf = rb;
- goto out;
-
out_fail:
rpcrdma_buffer_put(req);
return -ENOMEM;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index baf7f43..9c3e58e 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -974,6 +974,7 @@ rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
void
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
+ rpcrdma_free_regbuf(ia, req->rl_recvbuf);
rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
kfree(req);
@@ -1190,7 +1191,6 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)

iov->length = size;
iov->lkey = ia->ri_pd->local_dma_lkey;
- rb->rg_size = size;
return rb;

out_free:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 99b6839..73f915f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -112,7 +112,6 @@ struct rpcrdma_ep {
*/

struct rpcrdma_regbuf {
- size_t rg_size;
struct ib_sge rg_iov;
__be32 rg_base[0] __attribute__ ((aligned(256)));
};
@@ -285,8 +284,9 @@ struct rpcrdma_req {
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
- struct rpcrdma_regbuf *rl_rdmabuf;
- struct rpcrdma_regbuf *rl_sendbuf;
+ struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
+ struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
+ struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */

struct ib_cqe rl_cqe;
struct list_head rl_all;


2016-08-23 18:10:08

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v2 09/22] xprtrdma: Replace DMA_BIDIRECTIONAL

The use of DMA_BIDIRECTIONAL is discouraged by DMA-API.txt.
Fortunately, xprtrdma now knows which direction I/O is going as
soon as it allocates each regbuf.

The RPC Call and Reply buffers are no longer the same regbuf. They
can each be labeled correctly now. The RPC Reply buffer is never
part of either a Send or Receive WR, but it can be part of Reply
chunk, which is mapped and registered via ->ro_map . So it is not
DMA mapped when it is allocated (DMA_NONE), to avoid a double-
mapping.

Since Receive buffers are no longer DMA_BIDIRECTIONAL and their
contents are never modified by the host CPU, DMA-API-HOWTO.txt
suggests that a DMA sync before posting each buffer should be
unnecessary. (See my_card_interrupt_handler).

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 5 ++-
net/sunrpc/xprtrdma/transport.c | 7 +++--
net/sunrpc/xprtrdma/verbs.c | 55 +++++++++++++++++--------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 4 ++-
4 files changed, 36 insertions(+), 35 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 60fc991..ceae872 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -45,13 +45,14 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
return PTR_ERR(req);
req->rl_backchannel = true;

- rb = rpcrdma_alloc_regbuf(ia, RPCRDMA_HDRBUF_SIZE, GFP_KERNEL);
+ rb = rpcrdma_alloc_regbuf(ia, RPCRDMA_HDRBUF_SIZE,
+ DMA_TO_DEVICE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;

size = r_xprt->rx_data.inline_rsize;
- rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ rb = rpcrdma_alloc_regbuf(ia, size, DMA_TO_DEVICE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_sendbuf = rb;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 94dbfd3..3424691 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -490,7 +490,7 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (req->rl_rdmabuf)
return true;

- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

@@ -517,7 +517,8 @@ rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return true;

min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size,
+ DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

@@ -547,7 +548,7 @@ rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
return true;

- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, DMA_NONE, flags);
if (IS_ERR(rb))
return false;

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 9c3e58e..0bd2ace 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -865,7 +865,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
goto out;

rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
- GFP_KERNEL);
+ DMA_FROM_DEVICE, GFP_KERNEL);
if (IS_ERR(rep->rr_rdmabuf)) {
rc = PTR_ERR(rep->rr_rdmabuf);
goto out_free;
@@ -1153,27 +1153,24 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
spin_unlock(&buffers->rb_lock);
}

-/*
- * Wrappers for internal-use kmalloc memory registration, used by buffer code.
- */
-
/**
- * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
+ * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
* @ia: controlling rpcrdma_ia
* @size: size of buffer to be allocated, in bytes
+ * @direction: direction of data movement
* @flags: GFP flags
*
- * Returns pointer to private header of an area of internally
- * registered memory, or an ERR_PTR. The registered buffer follows
- * the end of the private header.
+ * Returns an ERR_PTR, or a pointer to a regbuf, which is a
+ * contiguous memory region that is DMA mapped persistently, and
+ * is registered for local I/O.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
- * receiving the payload of RDMA RECV operations. regbufs are not
- * used for RDMA READ/WRITE operations, thus are registered only for
- * LOCAL access.
+ * receiving the payload of RDMA RECV operations. During Long Calls
+ * or Replies they may be registered externally via ro_map.
*/
struct rpcrdma_regbuf *
-rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
+rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size,
+ enum dma_data_direction direction, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
struct ib_sge *iov;
@@ -1182,15 +1179,20 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
if (rb == NULL)
goto out;

+ rb->rg_direction = direction;
iov = &rb->rg_iov;
- iov->addr = ib_dma_map_single(ia->ri_device,
- (void *)rb->rg_base, size,
- DMA_BIDIRECTIONAL);
- if (ib_dma_mapping_error(ia->ri_device, iov->addr))
- goto out_free;
-
iov->length = size;
iov->lkey = ia->ri_pd->local_dma_lkey;
+
+ if (direction != DMA_NONE) {
+ iov->addr = ib_dma_map_single(ia->ri_device,
+ (void *)rb->rg_base,
+ rdmab_length(rb),
+ rb->rg_direction);
+ if (ib_dma_mapping_error(ia->ri_device, iov->addr))
+ goto out_free;
+ }
+
return rb;

out_free:
@@ -1207,14 +1209,14 @@ out:
void
rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
- struct ib_sge *iov;
-
if (!rb)
return;

- iov = &rb->rg_iov;
- ib_dma_unmap_single(ia->ri_device,
- iov->addr, iov->length, DMA_BIDIRECTIONAL);
+ if (rb->rg_direction != DMA_NONE) {
+ ib_dma_unmap_single(ia->ri_device, rdmab_addr(rb),
+ rdmab_length(rb), rb->rg_direction);
+ }
+
kfree(rb);
}

@@ -1286,11 +1288,6 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;

- ib_dma_sync_single_for_cpu(ia->ri_device,
- rdmab_addr(rep->rr_rdmabuf),
- rdmab_length(rep->rr_rdmabuf),
- DMA_BIDIRECTIONAL);
-
rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
if (rc)
goto out_postrecv;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c5137f0..eea7d9d 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -113,6 +113,7 @@ struct rpcrdma_ep {

struct rpcrdma_regbuf {
struct ib_sge rg_iov;
+ enum dma_data_direction rg_direction;
__be32 rg_base[0] __attribute__ ((aligned(256)));
};

@@ -476,7 +477,8 @@ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);

struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
- size_t, gfp_t);
+ size_t, enum dma_data_direction,
+ gfp_t);
void rpcrdma_free_regbuf(struct rpcrdma_ia *,
struct rpcrdma_regbuf *);



2016-08-26 21:06:01

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v2 02/22] SUNRPC: Refactor rpc_xdr_buf_init()

On 08/23/2016 01:52 PM, Chuck Lever wrote:
> Clean up: there is some XDR initialization logic that is commoon
^^^^^^^
nit: "common" has an extra "o"

Looks okay to me otherwise,
Anna

> to the forward channel and backchannel. Move it to an XDR header
> so it can be shared.
>
> rpc_rqst::rq_buffer points to a buffer containing big-endian data.
> Update its annotation as part of the clean up.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/xdr.h | 12 ++++++++++++
> include/linux/sunrpc/xprt.h | 2 +-
> net/sunrpc/backchannel_rqst.c | 8 +-------
> net/sunrpc/clnt.c | 24 ++++++------------------
> net/sunrpc/xprtrdma/backchannel.c | 12 +-----------
> 5 files changed, 21 insertions(+), 37 deletions(-)
>
> diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
> index 70c6b92..56c48c8 100644
> --- a/include/linux/sunrpc/xdr.h
> +++ b/include/linux/sunrpc/xdr.h
> @@ -67,6 +67,18 @@ struct xdr_buf {
> len; /* Length of XDR encoded message */
> };
>
> +static inline void
> +xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
> +{
> + buf->head[0].iov_base = start;
> + buf->head[0].iov_len = len;
> + buf->tail[0].iov_len = 0;
> + buf->page_len = 0;
> + buf->flags = 0;
> + buf->len = 0;
> + buf->buflen = len;
> +}
> +
> /*
> * pre-xdr'ed macros.
> */
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index a16070d..6f1d41b 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -83,7 +83,7 @@ struct rpc_rqst {
> void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
> struct list_head rq_list;
>
> - __u32 * rq_buffer; /* XDR encode buffer */
> + void *rq_buffer; /* Call XDR encode buffer */
> size_t rq_callsize,
> rq_rcvsize;
> size_t rq_xmit_bytes_sent; /* total bytes sent */
> diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
> index 229956b..ac701c2 100644
> --- a/net/sunrpc/backchannel_rqst.c
> +++ b/net/sunrpc/backchannel_rqst.c
> @@ -76,13 +76,7 @@ static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
> page = alloc_page(gfp_flags);
> if (page == NULL)
> return -ENOMEM;
> - buf->head[0].iov_base = page_address(page);
> - buf->head[0].iov_len = PAGE_SIZE;
> - buf->tail[0].iov_base = NULL;
> - buf->tail[0].iov_len = 0;
> - buf->page_len = 0;
> - buf->len = 0;
> - buf->buflen = PAGE_SIZE;
> + xdr_buf_init(buf, page_address(page), PAGE_SIZE);
> return 0;
> }
>
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index 7f79fb7..236f9ff 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1748,18 +1748,6 @@ rpc_task_force_reencode(struct rpc_task *task)
> task->tk_rqstp->rq_bytes_sent = 0;
> }
>
> -static inline void
> -rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
> -{
> - buf->head[0].iov_base = start;
> - buf->head[0].iov_len = len;
> - buf->tail[0].iov_len = 0;
> - buf->page_len = 0;
> - buf->flags = 0;
> - buf->len = 0;
> - buf->buflen = len;
> -}
> -
> /*
> * 3. Encode arguments of an RPC call
> */
> @@ -1772,12 +1760,12 @@ rpc_xdr_encode(struct rpc_task *task)
>
> dprint_status(task);
>
> - rpc_xdr_buf_init(&req->rq_snd_buf,
> - req->rq_buffer,
> - req->rq_callsize);
> - rpc_xdr_buf_init(&req->rq_rcv_buf,
> - (char *)req->rq_buffer + req->rq_callsize,
> - req->rq_rcvsize);
> + xdr_buf_init(&req->rq_snd_buf,
> + req->rq_buffer,
> + req->rq_callsize);
> + xdr_buf_init(&req->rq_rcv_buf,
> + (char *)req->rq_buffer + req->rq_callsize,
> + req->rq_rcvsize);
>
> p = rpc_encode_header(task);
> if (p == NULL) {
> diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
> index 5f60ab2..d3cfaf2 100644
> --- a/net/sunrpc/xprtrdma/backchannel.c
> +++ b/net/sunrpc/xprtrdma/backchannel.c
> @@ -38,7 +38,6 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
> struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> struct rpcrdma_regbuf *rb;
> struct rpcrdma_req *req;
> - struct xdr_buf *buf;
> size_t size;
>
> req = rpcrdma_create_req(r_xprt);
> @@ -60,16 +59,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
> req->rl_sendbuf = rb;
> /* so that rpcr_to_rdmar works when receiving a request */
> rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
> -
> - buf = &rqst->rq_snd_buf;
> - buf->head[0].iov_base = rqst->rq_buffer;
> - buf->head[0].iov_len = 0;
> - buf->tail[0].iov_base = NULL;
> - buf->tail[0].iov_len = 0;
> - buf->page_len = 0;
> - buf->len = 0;
> - buf->buflen = size;
> -
> + xdr_buf_init(&rqst->rq_snd_buf, rqst->rq_buffer, size);
> return 0;
>
> out_fail:
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>


2016-08-29 14:24:09

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v2 05/22] SUNRPC: Separate buffer pointers for RPC Call and Reply messages

Hi Chuck,

On 08/23/2016 01:52 PM, Chuck Lever wrote:
> For xprtrdma, the RPC Call and Reply buffers are involved in real
> I/O operations.
>
> To start with, the DMA direction of the I/O for a Call is opposite
> that of a Reply.
>
> In the current arrangement, the Reply buffer address is on a
> four-byte alignment just past the call buffer. Would be friendlier
> on some platforms if that was at a DMA cache alignment instead.
>
> Because the current arrangement allocates a single memory region
> which contains both buffers, the RPC Reply buffer often contains a
> page boundary in it when the Call buffer is large enough (which is
> frequent).
>
> It would be a little nicer for setting up DMA operations (and
> possible registration of the Reply buffer) if the two buffers were
> separated, well-aligned, and contained as few page boundaries as
> possible.
>
> Now, I could just pad out the single memory region used for the pair
> of buffers. But frequently that would mean a lot of unused space to
> ensure the Reply buffer did not have a page boundary.
>
> Add a separate pointer to rpc_rqst that points right to the RPC
> Reply buffer. This makes no difference to xprtsock, but it will help
> xprtrdma in subsequent patches.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/xprt.h | 5 +++--
> net/sunrpc/clnt.c | 2 +-
> net/sunrpc/sched.c | 1 +
> net/sunrpc/xprtrdma/transport.c | 1 +
> 4 files changed, 6 insertions(+), 3 deletions(-)
>
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index 72c2aeb..46f069e 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -84,8 +84,9 @@ struct rpc_rqst {
> struct list_head rq_list;
>
> void *rq_buffer; /* Call XDR encode buffer */
> - size_t rq_callsize,
> - rq_rcvsize;
> + size_t rq_callsize;
> + void *rq_rbuffer; /* Reply XDR decode buffer */
> + size_t rq_rcvsize;

Just a nit-picky question :) Is there any reason that you're adding the buffer between rq_callsize and rq_rcvsize? It seems like you could leave those alone and add the pointer either before or after them.

Thanks,
Anna

> size_t rq_xmit_bytes_sent; /* total bytes sent */
> size_t rq_reply_bytes_recvd; /* total reply bytes */
> /* received */
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index ab467c0..fd389c0 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1768,7 +1768,7 @@ rpc_xdr_encode(struct rpc_task *task)
> req->rq_buffer,
> req->rq_callsize);
> xdr_buf_init(&req->rq_rcv_buf,
> - (char *)req->rq_buffer + req->rq_callsize,
> + req->rq_rbuffer,
> req->rq_rcvsize);
>
> p = rpc_encode_header(task);
> diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
> index 6690ebc..5db68b3 100644
> --- a/net/sunrpc/sched.c
> +++ b/net/sunrpc/sched.c
> @@ -891,6 +891,7 @@ int rpc_malloc(struct rpc_task *task)
> dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
> task->tk_pid, size, buf);
> rqst->rq_buffer = buf->data;
> + rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
> return 0;
> }
> EXPORT_SYMBOL_GPL(rpc_malloc);
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index ebf14ba..136caf3 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -524,6 +524,7 @@ out:
> dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
> req->rl_connect_cookie = 0; /* our reserved value */
> rqst->rq_buffer = req->rl_sendbuf->rg_base;
> + rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_rcvsize;
> return 0;
>
> out_rdmabuf:
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>


2016-08-29 15:34:10

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v2 05/22] SUNRPC: Separate buffer pointers for RPC Call and Reply messages


> On Aug 29, 2016, at 10:23 AM, Anna Schumaker <[email protected]> wrote:
>
> Hi Chuck,
>
> On 08/23/2016 01:52 PM, Chuck Lever wrote:
>> For xprtrdma, the RPC Call and Reply buffers are involved in real
>> I/O operations.
>>
>> To start with, the DMA direction of the I/O for a Call is opposite
>> that of a Reply.
>>
>> In the current arrangement, the Reply buffer address is on a
>> four-byte alignment just past the call buffer. Would be friendlier
>> on some platforms if that was at a DMA cache alignment instead.
>>
>> Because the current arrangement allocates a single memory region
>> which contains both buffers, the RPC Reply buffer often contains a
>> page boundary in it when the Call buffer is large enough (which is
>> frequent).
>>
>> It would be a little nicer for setting up DMA operations (and
>> possible registration of the Reply buffer) if the two buffers were
>> separated, well-aligned, and contained as few page boundaries as
>> possible.
>>
>> Now, I could just pad out the single memory region used for the pair
>> of buffers. But frequently that would mean a lot of unused space to
>> ensure the Reply buffer did not have a page boundary.
>>
>> Add a separate pointer to rpc_rqst that points right to the RPC
>> Reply buffer. This makes no difference to xprtsock, but it will help
>> xprtrdma in subsequent patches.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> include/linux/sunrpc/xprt.h | 5 +++--
>> net/sunrpc/clnt.c | 2 +-
>> net/sunrpc/sched.c | 1 +
>> net/sunrpc/xprtrdma/transport.c | 1 +
>> 4 files changed, 6 insertions(+), 3 deletions(-)
>>
>> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
>> index 72c2aeb..46f069e 100644
>> --- a/include/linux/sunrpc/xprt.h
>> +++ b/include/linux/sunrpc/xprt.h
>> @@ -84,8 +84,9 @@ struct rpc_rqst {
>> struct list_head rq_list;
>>
>> void *rq_buffer; /* Call XDR encode buffer */
>> - size_t rq_callsize,
>> - rq_rcvsize;
>> + size_t rq_callsize;
>> + void *rq_rbuffer; /* Reply XDR decode buffer */
>> + size_t rq_rcvsize;
>
> Just a nit-picky question :) Is there any reason that you're adding the buffer between rq_callsize and rq_rcvsize? It seems like you could leave those alone and add the pointer either before or after them.

Hi Anna-

Keeping related fields together is usually more important than an extra
line or two in a commit. At the very least, the function of these fields
is more clear (to me, anyway) in this order.

Generally it's good practice to keep together structure fields that are
used at the same time. These four fields might appear in the same CPU
cacheline, though that can change as fields are introduced or removed
earlier in struct rpc_rqst.

An argument can be made that the code is just as easy to read this way:

void *rq_buffer, *rq_rbuffer;
size_t rq_callsize, rq_rcvsize;

If that's your preference as maintainer, I will change it in the next
version of this series.


> Thanks,
> Anna
>
>> size_t rq_xmit_bytes_sent; /* total bytes sent */
>> size_t rq_reply_bytes_recvd; /* total reply bytes */
>> /* received */
>> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
>> index ab467c0..fd389c0 100644
>> --- a/net/sunrpc/clnt.c
>> +++ b/net/sunrpc/clnt.c
>> @@ -1768,7 +1768,7 @@ rpc_xdr_encode(struct rpc_task *task)
>> req->rq_buffer,
>> req->rq_callsize);
>> xdr_buf_init(&req->rq_rcv_buf,
>> - (char *)req->rq_buffer + req->rq_callsize,
>> + req->rq_rbuffer,
>> req->rq_rcvsize);
>>
>> p = rpc_encode_header(task);
>> diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
>> index 6690ebc..5db68b3 100644
>> --- a/net/sunrpc/sched.c
>> +++ b/net/sunrpc/sched.c
>> @@ -891,6 +891,7 @@ int rpc_malloc(struct rpc_task *task)
>> dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
>> task->tk_pid, size, buf);
>> rqst->rq_buffer = buf->data;
>> + rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
>> return 0;
>> }
>> EXPORT_SYMBOL_GPL(rpc_malloc);
>> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
>> index ebf14ba..136caf3 100644
>> --- a/net/sunrpc/xprtrdma/transport.c
>> +++ b/net/sunrpc/xprtrdma/transport.c
>> @@ -524,6 +524,7 @@ out:
>> dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
>> req->rl_connect_cookie = 0; /* our reserved value */
>> rqst->rq_buffer = req->rl_sendbuf->rg_base;
>> + rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_rcvsize;
>> return 0;
>>
>> out_rdmabuf:
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever




2016-08-29 15:48:14

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v2 05/22] SUNRPC: Separate buffer pointers for RPC Call and Reply messages

On 08/29/2016 11:33 AM, Chuck Lever wrote:
>
>> On Aug 29, 2016, at 10:23 AM, Anna Schumaker <[email protected]> wrote:
>>
>> Hi Chuck,
>>
>> On 08/23/2016 01:52 PM, Chuck Lever wrote:
>>> For xprtrdma, the RPC Call and Reply buffers are involved in real
>>> I/O operations.
>>>
>>> To start with, the DMA direction of the I/O for a Call is opposite
>>> that of a Reply.
>>>
>>> In the current arrangement, the Reply buffer address is on a
>>> four-byte alignment just past the call buffer. Would be friendlier
>>> on some platforms if that was at a DMA cache alignment instead.
>>>
>>> Because the current arrangement allocates a single memory region
>>> which contains both buffers, the RPC Reply buffer often contains a
>>> page boundary in it when the Call buffer is large enough (which is
>>> frequent).
>>>
>>> It would be a little nicer for setting up DMA operations (and
>>> possible registration of the Reply buffer) if the two buffers were
>>> separated, well-aligned, and contained as few page boundaries as
>>> possible.
>>>
>>> Now, I could just pad out the single memory region used for the pair
>>> of buffers. But frequently that would mean a lot of unused space to
>>> ensure the Reply buffer did not have a page boundary.
>>>
>>> Add a separate pointer to rpc_rqst that points right to the RPC
>>> Reply buffer. This makes no difference to xprtsock, but it will help
>>> xprtrdma in subsequent patches.
>>>
>>> Signed-off-by: Chuck Lever <[email protected]>
>>> ---
>>> include/linux/sunrpc/xprt.h | 5 +++--
>>> net/sunrpc/clnt.c | 2 +-
>>> net/sunrpc/sched.c | 1 +
>>> net/sunrpc/xprtrdma/transport.c | 1 +
>>> 4 files changed, 6 insertions(+), 3 deletions(-)
>>>
>>> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
>>> index 72c2aeb..46f069e 100644
>>> --- a/include/linux/sunrpc/xprt.h
>>> +++ b/include/linux/sunrpc/xprt.h
>>> @@ -84,8 +84,9 @@ struct rpc_rqst {
>>> struct list_head rq_list;
>>>
>>> void *rq_buffer; /* Call XDR encode buffer */
>>> - size_t rq_callsize,
>>> - rq_rcvsize;
>>> + size_t rq_callsize;
>>> + void *rq_rbuffer; /* Reply XDR decode buffer */
>>> + size_t rq_rcvsize;
>>
>> Just a nit-picky question :) Is there any reason that you're adding the buffer between rq_callsize and rq_rcvsize? It seems like you could leave those alone and add the pointer either before or after them.
>
> Hi Anna-
>
> Keeping related fields together is usually more important than an extra
> line or two in a commit. At the very least, the function of these fields
> is more clear (to me, anyway) in this order.
>
> Generally it's good practice to keep together structure fields that are
> used at the same time. These four fields might appear in the same CPU
> cacheline, though that can change as fields are introduced or removed
> earlier in struct rpc_rqst.
>
> An argument can be made that the code is just as easy to read this way:
>
> void *rq_buffer, *rq_rbuffer;
> size_t rq_callsize, rq_rcvsize;
>
> If that's your preference as maintainer, I will change it in the next
> version of this series.

Got it. The cacheline reason is good enough for me, so you don't need to change the patch.

Thanks,
Anna

>
>
>> Thanks,
>> Anna
>>
>>> size_t rq_xmit_bytes_sent; /* total bytes sent */
>>> size_t rq_reply_bytes_recvd; /* total reply bytes */
>>> /* received */
>>> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
>>> index ab467c0..fd389c0 100644
>>> --- a/net/sunrpc/clnt.c
>>> +++ b/net/sunrpc/clnt.c
>>> @@ -1768,7 +1768,7 @@ rpc_xdr_encode(struct rpc_task *task)
>>> req->rq_buffer,
>>> req->rq_callsize);
>>> xdr_buf_init(&req->rq_rcv_buf,
>>> - (char *)req->rq_buffer + req->rq_callsize,
>>> + req->rq_rbuffer,
>>> req->rq_rcvsize);
>>>
>>> p = rpc_encode_header(task);
>>> diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
>>> index 6690ebc..5db68b3 100644
>>> --- a/net/sunrpc/sched.c
>>> +++ b/net/sunrpc/sched.c
>>> @@ -891,6 +891,7 @@ int rpc_malloc(struct rpc_task *task)
>>> dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
>>> task->tk_pid, size, buf);
>>> rqst->rq_buffer = buf->data;
>>> + rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
>>> return 0;
>>> }
>>> EXPORT_SYMBOL_GPL(rpc_malloc);
>>> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
>>> index ebf14ba..136caf3 100644
>>> --- a/net/sunrpc/xprtrdma/transport.c
>>> +++ b/net/sunrpc/xprtrdma/transport.c
>>> @@ -524,6 +524,7 @@ out:
>>> dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
>>> req->rl_connect_cookie = 0; /* our reserved value */
>>> rqst->rq_buffer = req->rl_sendbuf->rg_base;
>>> + rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_rcvsize;
>>> return 0;
>>>
>>> out_rdmabuf:
>>>
>>> --
>>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>>> the body of a message to [email protected]
>>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>>
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
> --
> Chuck Lever
>
>
>


2016-08-29 19:52:30

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v2 19/22] xprtrdma: Support larger inline thresholds

Hi Chuck,

On 08/23/2016 01:54 PM, Chuck Lever wrote:
> RPC-over-RDMA Version Two will likely require at least a 4KB inline
> threshold by default. The Version One inline threshold is still 1KB,
> and it's automatically negotiated down to in
> rpcrdma_update_connect_private.

I'm curious, what is the status of RPCoRDMA v2?

Thanks,
Anna

>
> The maximum is somewhat arbitrary. There's no fundamental
> architectural limit I'm aware of, but it's good to keep the size of
> Receive buffers reasonable. Now that Send can use a s/g list, a
> Send buffer is only as large as each RPC requires. Receive buffers
> are always the size of the inline threshold, however.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/xprtrdma.h | 4 ++--
> net/sunrpc/xprtrdma/transport.c | 4 ++--
> 2 files changed, 4 insertions(+), 4 deletions(-)
>
> diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
> index 39267dc..221b7a2 100644
> --- a/include/linux/sunrpc/xprtrdma.h
> +++ b/include/linux/sunrpc/xprtrdma.h
> @@ -53,8 +53,8 @@
> #define RPCRDMA_MAX_SLOT_TABLE (256U)
>
> #define RPCRDMA_MIN_INLINE (1024) /* min inline thresh */
> -#define RPCRDMA_DEF_INLINE (1024) /* default inline thresh */
> -#define RPCRDMA_MAX_INLINE (3068) /* max inline thresh */
> +#define RPCRDMA_DEF_INLINE (4096) /* default inline thresh */
> +#define RPCRDMA_MAX_INLINE (65536) /* max inline thresh */
>
> /* Memory registration strategies, by number.
> * This is part of a kernel / user space API. Do not remove. */
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index 6a358ab..ed5e285 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -97,7 +97,7 @@ static struct ctl_table xr_tunables_table[] = {
> .data = &xprt_rdma_max_inline_read,
> .maxlen = sizeof(unsigned int),
> .mode = 0644,
> - .proc_handler = proc_dointvec,
> + .proc_handler = proc_dointvec_minmax,
> .extra1 = &min_inline_size,
> .extra2 = &max_inline_size,
> },
> @@ -106,7 +106,7 @@ static struct ctl_table xr_tunables_table[] = {
> .data = &xprt_rdma_max_inline_write,
> .maxlen = sizeof(unsigned int),
> .mode = 0644,
> - .proc_handler = proc_dointvec,
> + .proc_handler = proc_dointvec_minmax,
> .extra1 = &min_inline_size,
> .extra2 = &max_inline_size,
> },
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>


2016-08-29 19:54:25

by Anna Schumaker

[permalink] [raw]
Subject: Re: [PATCH v2 20/22] xprtrmda: Report address of frmr, not mw

Hi Chuck,

On 08/23/2016 01:54 PM, Chuck Lever wrote:
> Tie frwr debugging messages together by always reporting the address
> of the frwr.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/frwr_ops.c | 9 +++++++--
> 1 file changed, 7 insertions(+), 2 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
> index e82d5cf..2e51ef5 100644
> --- a/net/sunrpc/xprtrdma/frwr_ops.c
> +++ b/net/sunrpc/xprtrdma/frwr_ops.c
> @@ -163,7 +163,7 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
> return PTR_ERR(f->fr_mr);
> }
>
> - dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
> + dprintk("RPC: %s: recovered FRMR %p\n", __func__, f);
> f->fr_state = FRMR_IS_INVALID;
> return 0;
> }
> @@ -397,7 +397,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
> goto out_mapmr_err;
>
> dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n",
> - __func__, mw, mw->mw_nents, mr->length);
> + __func__, frmr, mw->mw_nents, mr->length);
>
> key = (u8)(mr->rkey & 0x000000FF);
> ib_update_fast_reg_key(mr, ++key);
> @@ -450,6 +450,9 @@ __frwr_prepare_linv_wr(struct rpcrdma_mw *mw)
> struct rpcrdma_frmr *f = &mw->frmr;
> struct ib_send_wr *invalidate_wr;
>
> + dprintk("RPC: %s: invalidating frmr %p\n",
> + __func__, f);
> +

Any reason this is broken into two lines?

> f->fr_state = FRMR_IS_INVALID;
> invalidate_wr = &f->fr_invwr;
>
> @@ -532,6 +535,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
> */
> unmap:
> list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
> + dprintk("RPC: %s: unmapping frmr %p\n",
> + __func__, &mw->frmr);

Same question here.

Thanks,
Anna
> list_del_init(&mw->mw_list);
> ib_dma_unmap_sg(ia->ri_device,
> mw->mw_sg, mw->mw_nents, mw->mw_dir);
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>


2016-08-29 20:02:36

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v2 19/22] xprtrdma: Support larger inline thresholds


> On Aug 29, 2016, at 3:52 PM, Anna Schumaker <[email protected]> wrote:
>
> Hi Chuck,
>
> On 08/23/2016 01:54 PM, Chuck Lever wrote:
>> RPC-over-RDMA Version Two will likely require at least a 4KB inline
>> threshold by default. The Version One inline threshold is still 1KB,
>> and it's automatically negotiated down to in
>> rpcrdma_update_connect_private.
>
> I'm curious, what is the status of RPCoRDMA v2?

There is a personal draft published:

https://datatracker.ietf.org/doc/draft-cel-nfsv4-rpcrdma-version-two/

And being discussed actively in the nfsv4 Working Group.

The features added in this series are usable with the Linux V1 implementation
too; the point being to experiment with these new facilities before we finish
RPC-over-RDMA Version Two.


> Thanks,
> Anna
>
>>
>> The maximum is somewhat arbitrary. There's no fundamental
>> architectural limit I'm aware of, but it's good to keep the size of
>> Receive buffers reasonable. Now that Send can use a s/g list, a
>> Send buffer is only as large as each RPC requires. Receive buffers
>> are always the size of the inline threshold, however.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> include/linux/sunrpc/xprtrdma.h | 4 ++--
>> net/sunrpc/xprtrdma/transport.c | 4 ++--
>> 2 files changed, 4 insertions(+), 4 deletions(-)
>>
>> diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
>> index 39267dc..221b7a2 100644
>> --- a/include/linux/sunrpc/xprtrdma.h
>> +++ b/include/linux/sunrpc/xprtrdma.h
>> @@ -53,8 +53,8 @@
>> #define RPCRDMA_MAX_SLOT_TABLE (256U)
>>
>> #define RPCRDMA_MIN_INLINE (1024) /* min inline thresh */
>> -#define RPCRDMA_DEF_INLINE (1024) /* default inline thresh */
>> -#define RPCRDMA_MAX_INLINE (3068) /* max inline thresh */
>> +#define RPCRDMA_DEF_INLINE (4096) /* default inline thresh */
>> +#define RPCRDMA_MAX_INLINE (65536) /* max inline thresh */
>>
>> /* Memory registration strategies, by number.
>> * This is part of a kernel / user space API. Do not remove. */
>> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
>> index 6a358ab..ed5e285 100644
>> --- a/net/sunrpc/xprtrdma/transport.c
>> +++ b/net/sunrpc/xprtrdma/transport.c
>> @@ -97,7 +97,7 @@ static struct ctl_table xr_tunables_table[] = {
>> .data = &xprt_rdma_max_inline_read,
>> .maxlen = sizeof(unsigned int),
>> .mode = 0644,
>> - .proc_handler = proc_dointvec,
>> + .proc_handler = proc_dointvec_minmax,
>> .extra1 = &min_inline_size,
>> .extra2 = &max_inline_size,
>> },
>> @@ -106,7 +106,7 @@ static struct ctl_table xr_tunables_table[] = {
>> .data = &xprt_rdma_max_inline_write,
>> .maxlen = sizeof(unsigned int),
>> .mode = 0644,
>> - .proc_handler = proc_dointvec,
>> + .proc_handler = proc_dointvec_minmax,
>> .extra1 = &min_inline_size,
>> .extra2 = &max_inline_size,
>> },
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever




2016-08-29 20:13:19

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v2 20/22] xprtrmda: Report address of frmr, not mw


> On Aug 29, 2016, at 3:54 PM, Anna Schumaker <[email protected]> wrote:
>
> Hi Chuck,
>
> On 08/23/2016 01:54 PM, Chuck Lever wrote:
>> Tie frwr debugging messages together by always reporting the address
>> of the frwr.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> net/sunrpc/xprtrdma/frwr_ops.c | 9 +++++++--
>> 1 file changed, 7 insertions(+), 2 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
>> index e82d5cf..2e51ef5 100644
>> --- a/net/sunrpc/xprtrdma/frwr_ops.c
>> +++ b/net/sunrpc/xprtrdma/frwr_ops.c
>> @@ -163,7 +163,7 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
>> return PTR_ERR(f->fr_mr);
>> }
>>
>> - dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
>> + dprintk("RPC: %s: recovered FRMR %p\n", __func__, f);
>> f->fr_state = FRMR_IS_INVALID;
>> return 0;
>> }
>> @@ -397,7 +397,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
>> goto out_mapmr_err;
>>
>> dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n",
>> - __func__, mw, mw->mw_nents, mr->length);
>> + __func__, frmr, mw->mw_nents, mr->length);
>>
>> key = (u8)(mr->rkey & 0x000000FF);
>> ib_update_fast_reg_key(mr, ++key);
>> @@ -450,6 +450,9 @@ __frwr_prepare_linv_wr(struct rpcrdma_mw *mw)
>> struct rpcrdma_frmr *f = &mw->frmr;
>> struct ib_send_wr *invalidate_wr;
>>
>> + dprintk("RPC: %s: invalidating frmr %p\n",
>> + __func__, f);
>> +
>
> Any reason this is broken into two lines?

No. It can be re-arranged.


>> f->fr_state = FRMR_IS_INVALID;
>> invalidate_wr = &f->fr_invwr;
>>
>> @@ -532,6 +535,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
>> */
>> unmap:
>> list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
>> + dprintk("RPC: %s: unmapping frmr %p\n",
>> + __func__, &mw->frmr);
>
> Same question here.

On one line, the dprintk invocation is longer than 80 characters.
Just avoiding a checkpatch.pl complaint.


> Thanks,
> Anna
>> list_del_init(&mw->mw_list);
>> ib_dma_unmap_sg(ia->ri_device,
>> mw->mw_sg, mw->mw_nents, mw->mw_dir);
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever