2016-08-15 20:50:16

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 00/22] client-side NFS/RDMA patches proposed for v4.9

Posted for review, the following patch series makes these changes:

- Correct use of DMA API
- Delay DMA mapping to permit device driver unload
- Introduce simple RDMA-CM private message exchange
- Support Remote Invalidation
- Support s/g list when sending RPC calls


Available in the "nfs-rdma-for-4.9" topic branch of this git repo:

git://git.linux-nfs.org/projects/cel/cel-2.6.git


Or for browsing:

http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfs-rdma-for-4.9


== Performance results ==

This is NFSv3 / RDMA, CX-3 Pro (FRWR) on a 12-core dual-socket
client and an 8-core single-socket server. The exported fs is a
tmpfs. Note that iozone reports latency for a system call, not
RPC round-trip.

Test #1: The inline threshold is set to 1KB, and Remote Invalidation
is disabled (RPC-over-RDMA Version One baseline).

O_DIRECT feature enabled
Microseconds/op Mode. Output is in microseconds per operation.
Command line used: /home/cel/bin/iozone -i0 -i1 -s128m -y1k -az -I -N

KB reclen write rewrite read reread
131072 1 61 62 51 51
131072 2 63 62 51 51
131072 4 64 63 52 51
131072 8 67 66 54 52
131072 16 71 70 56 56
131072 32 83 80 63 63
131072 64 104 100 83 82

O_DIRECT feature enabled
OPS Mode. Output is in operations per second.
Command line used: /home/cel/bin/iozone -i0 -i1 -s16m -r4k -t12 -I -O
Throughput test with 12 processes
Each process writes a 16384 Kbyte file in 4 Kbyte records

Children see throughput for 12 readers = 84198.24 ops/sec
Parent sees throughput for 12 readers = 84065.36 ops/sec
Min throughput per process = 5925.38 ops/sec
Max throughput per process = 7346.19 ops/sec
Avg throughput per process = 7016.52 ops/sec
Min xfer = 3300.00 ops

Test #2: The inline threshold is set to 4KB, and Remote Invalidation
is enabled. This means I/O payloads smaller than about 3.9KB do not
use explicit RDMA at all, and no LOCAL_INV WR is needed for operations
that do use RDMA.

O_DIRECT feature enabled
Microseconds/op Mode. Output is in microseconds per operation.
Command line used: /home/cel/bin/iozone -i0 -i1 -s128m -y1k -az -I -N

KB reclen write rewrite read reread
131072 1 41 43 37 37
131072 2 44 44 37 37
131072 4 61 59 41 41
131072 8 63 62 43 43
131072 16 68 66 47 47
131072 32 76 72 53 53
131072 64 100 95 70 70

O_DIRECT feature enabled
OPS Mode. Output is in operations per second.
Command line used: /home/cel/bin/iozone -i0 -i1 -s16m -r4k -t12 -I -O
Throughput test with 12 processes
Each process writes a 16384 Kbyte file in 4 Kbyte records

Children see throughput for 12 readers = 111520.52 ops/sec
Parent sees throughput for 12 readers = 111250.80 ops/sec
Min throughput per process = 8463.72 ops/sec
Max throughput per process = 9658.81 ops/sec
Avg throughput per process = 9293.38 ops/sec
Min xfer = 3596.00 ops

== Analysis ==

To understand these results, note that:

Typical round-trip latency in this configuration for LOOKUP, ACCESS
and GETATTR (which bear no data payload) is 30-35us.

- An NFS READ call is a pure inline RDMA Send
- A small NFS READ reply is a pure inline RDMA Send
- A large NFS READ reply is an RDMA Write followed by an RDMA Send

- A small NFS WRITE call is a pure inline RDMA Send
- A large NFS WRITE call is an RDMA Send followed by the
server doing an RDMA Read
- An NFS WRITE reply is a pure inline RDMA Send

In Test #2, the 1KB and 2KB I/Os are all pure inline. No explicit
RDMA operation is involved. At 4KB and above, explicit RDMA is used
with a single STag. The server invalidates each RPC's STag, so no
LOCAL_INV WR is needed on the client for Test #2.

The key take-aways are that:

- For small payloads, NFS READ using RDMA Write with Remote
Invalidation is nearly as fast as pure inline; both modes take
about 40usec per RPC

- The NFS READ improvement with Remote Invalidation enabled is
effective even at 8KB payloads and above, but 10us is relatively
small compared to other transmission costs

- For small payloads, the RDMA Read round-trip still adds
significant per-WRITE latency

---

Chuck Lever (22):
xprtrdma: Eliminate INLINE_THRESHOLD macros
SUNRPC: Refactor rpc_xdr_buf_init()
SUNRPC: Generalize the RPC buffer allocation API
SUNRPC: Generalize the RPC buffer release API
SUNRPC: Separate buffer pointers for RPC Call and Reply messages
SUNRPC: Add a transport-specific private field in rpc_rqst
xprtrdma: Initialize separate RPC call and reply buffers
xprtrdma: Use smaller buffers for RPC-over-RDMA headers
xprtrdma: Replace DMA_BIDIRECTIONAL
xprtrdma: Delay DMA mapping Send and Receive buffers
xprtrdma: Eliminate "ia" argument in rpcrdma_{alloc,free}_regbuf
xprtrdma: Simplify rpcrdma_ep_post_recv()
xprtrdma: Move send_wr to struct rpcrdma_req
xprtrdma: Move recv_wr to struct rpcrdma_rep
xprtrmda: Report address of frmr, not mw
rpcrdma: RDMA/CM private message data structure
xprtrdma: Client-side support for rpcrdma_connect_private
xprtrdma: Basic support for Remote Invalidation
xprtrdma: Use gathered Send for large inline messages
xprtrdma: Support larger inline thresholds
xprtrdma: Rename rpcrdma_receive_wc()
xprtrdma: Eliminate rpcrdma_receive_worker()


include/linux/sunrpc/rpc_rdma.h | 39 ++++
include/linux/sunrpc/sched.h | 4
include/linux/sunrpc/xdr.h | 10 +
include/linux/sunrpc/xprt.h | 12 +
include/linux/sunrpc/xprtrdma.h | 4
net/sunrpc/backchannel_rqst.c | 8 -
net/sunrpc/clnt.c | 36 +--
net/sunrpc/sched.c | 36 ++-
net/sunrpc/sunrpc.h | 2
net/sunrpc/xprt.c | 2
net/sunrpc/xprtrdma/backchannel.c | 48 ++--
net/sunrpc/xprtrdma/fmr_ops.c | 7 -
net/sunrpc/xprtrdma/frwr_ops.c | 27 ++-
net/sunrpc/xprtrdma/rpc_rdma.c | 299 ++++++++++++++++++++--------
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 19 +-
net/sunrpc/xprtrdma/transport.c | 201 +++++++++++--------
net/sunrpc/xprtrdma/verbs.c | 238 +++++++++++++---------
net/sunrpc/xprtrdma/xprt_rdma.h | 102 ++++++----
net/sunrpc/xprtsock.c | 23 +-
19 files changed, 700 insertions(+), 417 deletions(-)

--
Chuck Lever


2016-08-15 20:50:24

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 01/22] xprtrdma: Eliminate INLINE_THRESHOLD macros

Clean up: r_xprt is already available everywhere these macros are
invoked, so just dereference that directly.

RPCRDMA_INLINE_PAD_VALUE is no longer used, so it can simply be
removed.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 4 ++--
net/sunrpc/xprtrdma/rpc_rdma.c | 2 +-
net/sunrpc/xprtrdma/transport.c | 6 +++---
net/sunrpc/xprtrdma/xprt_rdma.h | 9 ---------
4 files changed, 6 insertions(+), 15 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 87762d9..5f60ab2 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -46,13 +46,13 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
return PTR_ERR(req);
req->rl_backchannel = true;

- size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+ size = r_xprt->rx_data.inline_wsize;
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;

- size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+ size += r_xprt->rx_data.inline_rsize;
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index a47f170..845586f 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -673,7 +673,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
goto out_unmap;
hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;

- if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+ if (hdrlen + rpclen > r_xprt->rx_data.inline_wsize)
goto out_overflow;

dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 81f0e87..be95ece 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -518,7 +518,7 @@ out:
return req->rl_sendbuf->rg_base;

out_rdmabuf:
- min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ min_size = r_xprt->rx_data.inline_wsize;
rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
if (IS_ERR(rb))
goto out_fail;
@@ -541,8 +541,8 @@ out_sendbuf:
* reply will be large, but slush is provided here to allow
* flexibility when marshaling.
*/
- min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
- min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ min_size = r_xprt->rx_data.inline_rsize;
+ min_size += r_xprt->rx_data.inline_wsize;
if (size < min_size)
size = min_size;

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 670fad5..6d466ab 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -355,15 +355,6 @@ struct rpcrdma_create_data_internal {
unsigned int padding; /* non-rdma write header padding */
};

-#define RPCRDMA_INLINE_READ_THRESHOLD(rq) \
- (rpcx_to_rdmad(rq->rq_xprt).inline_rsize)
-
-#define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\
- (rpcx_to_rdmad(rq->rq_xprt).inline_wsize)
-
-#define RPCRDMA_INLINE_PAD_VALUE(rq)\
- rpcx_to_rdmad(rq->rq_xprt).padding
-
/*
* Statistics for RPCRDMA
*/


2016-08-15 20:50:32

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 02/22] SUNRPC: Refactor rpc_xdr_buf_init()

Clean up: there is some XDR initialization logic that is commoon
to the forward channel and backchannel. Move it to an XDR header
so it can be shared.

rpc_rqst::rq_buffer points to a buffer containing big-endian data.
Update its annotation as part of the clean up.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xdr.h | 10 ++++++++++
include/linux/sunrpc/xprt.h | 2 +-
net/sunrpc/backchannel_rqst.c | 8 +-------
net/sunrpc/clnt.c | 24 ++++++------------------
net/sunrpc/xprtrdma/backchannel.c | 12 +-----------
5 files changed, 19 insertions(+), 37 deletions(-)

diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
index 70c6b92..551569c 100644
--- a/include/linux/sunrpc/xdr.h
+++ b/include/linux/sunrpc/xdr.h
@@ -67,6 +67,16 @@ struct xdr_buf {
len; /* Length of XDR encoded message */
};

+static inline void
+xdr_buf_init(struct xdr_buf *buf, __be32 *start, size_t len)
+{
+ memset(buf, 0, sizeof(*buf));
+
+ buf->head[0].iov_base = start;
+ buf->head[0].iov_len = len;
+ buf->buflen = len;
+}
+
/*
* pre-xdr'ed macros.
*/
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index a16070d..559dad3 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -83,7 +83,7 @@ struct rpc_rqst {
void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
struct list_head rq_list;

- __u32 * rq_buffer; /* XDR encode buffer */
+ __be32 *rq_buffer; /* Call XDR encode buffer */
size_t rq_callsize,
rq_rcvsize;
size_t rq_xmit_bytes_sent; /* total bytes sent */
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 229956b..ac701c2 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -76,13 +76,7 @@ static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
page = alloc_page(gfp_flags);
if (page == NULL)
return -ENOMEM;
- buf->head[0].iov_base = page_address(page);
- buf->head[0].iov_len = PAGE_SIZE;
- buf->tail[0].iov_base = NULL;
- buf->tail[0].iov_len = 0;
- buf->page_len = 0;
- buf->len = 0;
- buf->buflen = PAGE_SIZE;
+ xdr_buf_init(buf, page_address(page), PAGE_SIZE);
return 0;
}

diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 7f79fb7..0e75369 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1748,18 +1748,6 @@ rpc_task_force_reencode(struct rpc_task *task)
task->tk_rqstp->rq_bytes_sent = 0;
}

-static inline void
-rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
-{
- buf->head[0].iov_base = start;
- buf->head[0].iov_len = len;
- buf->tail[0].iov_len = 0;
- buf->page_len = 0;
- buf->flags = 0;
- buf->len = 0;
- buf->buflen = len;
-}
-
/*
* 3. Encode arguments of an RPC call
*/
@@ -1772,12 +1760,12 @@ rpc_xdr_encode(struct rpc_task *task)

dprint_status(task);

- rpc_xdr_buf_init(&req->rq_snd_buf,
- req->rq_buffer,
- req->rq_callsize);
- rpc_xdr_buf_init(&req->rq_rcv_buf,
- (char *)req->rq_buffer + req->rq_callsize,
- req->rq_rcvsize);
+ xdr_buf_init(&req->rq_snd_buf,
+ req->rq_buffer,
+ req->rq_callsize);
+ xdr_buf_init(&req->rq_rcv_buf,
+ (__be32 *)((char *)req->rq_buffer + req->rq_callsize),
+ req->rq_rcvsize);

p = rpc_encode_header(task);
if (p == NULL) {
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 5f60ab2..d3cfaf2 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -38,7 +38,6 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
- struct xdr_buf *buf;
size_t size;

req = rpcrdma_create_req(r_xprt);
@@ -60,16 +59,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
req->rl_sendbuf = rb;
/* so that rpcr_to_rdmar works when receiving a request */
rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
-
- buf = &rqst->rq_snd_buf;
- buf->head[0].iov_base = rqst->rq_buffer;
- buf->head[0].iov_len = 0;
- buf->tail[0].iov_base = NULL;
- buf->tail[0].iov_len = 0;
- buf->page_len = 0;
- buf->len = 0;
- buf->buflen = size;
-
+ xdr_buf_init(&rqst->rq_snd_buf, rqst->rq_buffer, size);
return 0;

out_fail:


2016-08-15 20:50:41

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 03/22] SUNRPC: Generalize the RPC buffer allocation API

xprtrdma needs to allocate the Call and Reply buffers separately.
TBH, the reliance on using a single buffer for the pair of XDR
buffers is transport implementation-specific.

Transports that want to allocate separate Call and Reply buffers
will ignore the "size" argument anyway. Don't bother passing it.

The buf_alloc method can't return two pointers. Instead, make the
method's return value an error code, and set the rq_buffer pointer
in the method itself.

This gives call_allocate an opportunity to terminate an RPC instead
of looping forever when a permanent problem occurs. If a request is
just bogus, or the transport is in a state where it can't allocate
resources for any request, there needs to be a way to kill the RPC
right there and not loop.

This immediately fixes a rare problem in the backchannel send path,
which loops if the server happens to send a CB request whose
call+reply size is larger than a page (which it shouldn't do yet).

One more issue: looks like xprt_inject_disconnect was incorrectly
placed in the failure path in call_allocate. It needs to be in the
success path, as it is for other call-sites.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/sched.h | 2 +-
include/linux/sunrpc/xprt.h | 2 +-
net/sunrpc/clnt.c | 12 ++++++++----
net/sunrpc/sched.c | 24 +++++++++++++++---------
net/sunrpc/sunrpc.h | 2 +-
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 17 +++++++++--------
net/sunrpc/xprtrdma/transport.c | 26 ++++++++++++++++++--------
net/sunrpc/xprtsock.c | 17 +++++++++++------
8 files changed, 64 insertions(+), 38 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 817af0b..38d4c1b 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -239,7 +239,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
void *);
void rpc_wake_up_status(struct rpc_wait_queue *, int);
void rpc_delay(struct rpc_task *, unsigned long);
-void * rpc_malloc(struct rpc_task *, size_t);
+int rpc_malloc(struct rpc_task *);
void rpc_free(void *);
int rpciod_up(void);
void rpciod_down(void);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 559dad3..6f0f796 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -127,7 +127,7 @@ struct rpc_xprt_ops {
void (*rpcbind)(struct rpc_task *task);
void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
- void * (*buf_alloc)(struct rpc_task *task, size_t size);
+ int (*buf_alloc)(struct rpc_task *task);
void (*buf_free)(void *buffer);
int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 0e75369..601b3ca 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1693,6 +1693,7 @@ call_allocate(struct rpc_task *task)
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
+ int status;

dprint_status(task);

@@ -1718,11 +1719,14 @@ call_allocate(struct rpc_task *task)
req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
req->rq_rcvsize <<= 2;

- req->rq_buffer = xprt->ops->buf_alloc(task,
- req->rq_callsize + req->rq_rcvsize);
- if (req->rq_buffer != NULL)
- return;
+ status = xprt->ops->buf_alloc(task);
xprt_inject_disconnect(xprt);
+ if (status == 0)
+ return;
+ if (status == -EIO) {
+ rpc_exit(task, -EIO);
+ return;
+ }

dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);

diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 9ae5885..b964d40 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -849,14 +849,17 @@ static void rpc_async_schedule(struct work_struct *work)
}

/**
- * rpc_malloc - allocate an RPC buffer
- * @task: RPC task that will use this buffer
- * @size: requested byte size
+ * rpc_malloc - allocate RPC buffer resources
+ * @task: RPC task
+ *
+ * A single memory region is allocated, which is split between the
+ * RPC call and RPC reply that this task is being used for. When
+ * this RPC is retired, the memory is released by calling rpc_free.
*
* To prevent rpciod from hanging, this allocator never sleeps,
- * returning NULL and suppressing warning if the request cannot be serviced
- * immediately.
- * The caller can arrange to sleep in a way that is safe for rpciod.
+ * returning -ENOMEM and suppressing warning if the request cannot
+ * be serviced immediately. The caller can arrange to sleep in a
+ * way that is safe for rpciod.
*
* Most requests are 'small' (under 2KiB) and can be serviced from a
* mempool, ensuring that NFS reads and writes can always proceed,
@@ -865,8 +868,10 @@ static void rpc_async_schedule(struct work_struct *work)
* In order to avoid memory starvation triggering more writebacks of
* NFS requests, we avoid using GFP_KERNEL.
*/
-void *rpc_malloc(struct rpc_task *task, size_t size)
+int rpc_malloc(struct rpc_task *task)
{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
struct rpc_buffer *buf;
gfp_t gfp = GFP_NOIO | __GFP_NOWARN;

@@ -880,12 +885,13 @@ void *rpc_malloc(struct rpc_task *task, size_t size)
buf = kmalloc(size, gfp);

if (!buf)
- return NULL;
+ return -ENOMEM;

buf->len = size;
dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
task->tk_pid, size, buf);
- return &buf->data;
+ rqst->rq_buffer = buf->data;
+ return 0;
}
EXPORT_SYMBOL_GPL(rpc_malloc);

diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h
index f2b7cb5..a4f44ca 100644
--- a/net/sunrpc/sunrpc.h
+++ b/net/sunrpc/sunrpc.h
@@ -34,7 +34,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
struct rpc_buffer {
size_t len;
- char data[];
+ __be32 data[];
};

static inline int rpc_reply_expected(struct rpc_task *task)
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index a2a7519..b7c698f7f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -159,29 +159,30 @@ out_unmap:
/* Server-side transport endpoint wants a whole page for its send
* buffer. The client RPC code constructs the RPC header in this
* buffer before it invokes ->send_request.
- *
- * Returns NULL if there was a temporary allocation failure.
*/
-static void *
-xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
+static int
+xprt_rdma_bc_allocate(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
+ size_t size = rqst->rq_callsize;
struct svcxprt_rdma *rdma;
struct page *page;

rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);

- /* Prevent an infinite loop: try to make this case work */
- if (size > PAGE_SIZE)
+ if (size > PAGE_SIZE) {
WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
size);
+ return -EIO;
+ }

page = alloc_page(RPCRDMA_DEF_GFP);
if (!page)
- return NULL;
+ return -ENOMEM;

- return page_address(page);
+ rqst->rq_buffer = page_address(page);
+ return 0;
}

static void
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index be95ece..daa7d4d 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -477,7 +477,15 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
}
}

-/*
+/**
+ * xprt_rdma_allocate - allocate transport resources for an RPC
+ * @task: RPC task
+ *
+ * Return values:
+ * 0: Success; rq_buffer points to RPC buffer to use
+ * ENOMEM: Out of memory, call again later
+ * EIO: A permanent error occurred, do not retry
+ *
* The RDMA allocate/free functions need the task structure as a place
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv
* sequence.
@@ -486,11 +494,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
* (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
* We may register rq_rcv_buf when using reply chunks.
*/
-static void *
-xprt_rdma_allocate(struct rpc_task *task, size_t size)
+static int
+xprt_rdma_allocate(struct rpc_task *task)
{
- struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
size_t min_size;
@@ -498,7 +507,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)

req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (req == NULL)
- return NULL;
+ return -ENOMEM;

flags = RPCRDMA_DEF_GFP;
if (RPC_IS_SWAPPER(task))
@@ -515,7 +524,8 @@ out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
req->rl_task = task;
- return req->rl_sendbuf->rg_base;
+ rqst->rq_buffer = req->rl_sendbuf->rg_base;
+ return 0;

out_rdmabuf:
min_size = r_xprt->rx_data.inline_wsize;
@@ -558,7 +568,7 @@ out_sendbuf:

out_fail:
rpcrdma_buffer_put(req);
- return NULL;
+ return -ENOMEM;
}

/*
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 8ede3bc..d6db5cf 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2533,23 +2533,28 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
* we allocate pages instead doing a kmalloc like rpc_malloc is because we want
* to use the server side send routines.
*/
-static void *bc_malloc(struct rpc_task *task, size_t size)
+static int bc_malloc(struct rpc_task *task)
{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ size_t size = rqst->rq_callsize;
struct page *page;
struct rpc_buffer *buf;

- WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer));
- if (size > PAGE_SIZE - sizeof(struct rpc_buffer))
- return NULL;
+ if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) {
+ WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n",
+ size);
+ return -EIO;
+ }

page = alloc_page(GFP_KERNEL);
if (!page)
- return NULL;
+ return -ENOMEM;

buf = page_address(page);
buf->len = PAGE_SIZE;

- return buf->data;
+ rqst->rq_buffer = buf->data;
+ return 0;
}

/*


2016-08-15 20:50:49

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 04/22] SUNRPC: Generalize the RPC buffer release API

xprtrdma needs to allocate the Call and Reply buffers separately.
TBH, the reliance on using a single buffer for the pair of XDR
buffers is transport implementation-specific.

Instead of passing just the rq_buffer into the buf_free method, pass
the task structure and let buf_free take care of freeing both
XDR buffers at once.

There's a micro-optimization here. In the common case, both
xprt_release and the transport's buf_free method were checking if
rq_buffer was NULL. Now the check is done only once per RPC.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/sched.h | 2 +-
include/linux/sunrpc/xprt.h | 2 +-
net/sunrpc/sched.c | 10 ++++------
net/sunrpc/xprt.c | 2 +-
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 2 +-
net/sunrpc/xprtrdma/transport.c | 27 ++++++++++-----------------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 -
net/sunrpc/xprtsock.c | 6 ++----
8 files changed, 20 insertions(+), 32 deletions(-)

diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 38d4c1b..7ba040c 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -240,7 +240,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
void rpc_wake_up_status(struct rpc_wait_queue *, int);
void rpc_delay(struct rpc_task *, unsigned long);
int rpc_malloc(struct rpc_task *);
-void rpc_free(void *);
+void rpc_free(struct rpc_task *);
int rpciod_up(void);
void rpciod_down(void);
int __rpc_wait_for_completion_task(struct rpc_task *task, wait_bit_action_f *);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 6f0f796..272c108 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -128,7 +128,7 @@ struct rpc_xprt_ops {
void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
int (*buf_alloc)(struct rpc_task *task);
- void (*buf_free)(void *buffer);
+ void (*buf_free)(struct rpc_task *task);
int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task);
void (*timer)(struct rpc_xprt *xprt, struct rpc_task *task);
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index b964d40..6690ebc 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -896,18 +896,16 @@ int rpc_malloc(struct rpc_task *task)
EXPORT_SYMBOL_GPL(rpc_malloc);

/**
- * rpc_free - free buffer allocated via rpc_malloc
- * @buffer: buffer to free
+ * rpc_free - free RPC buffer resources allocated via rpc_malloc
+ * @task: RPC task
*
*/
-void rpc_free(void *buffer)
+void rpc_free(struct rpc_task *task)
{
+ void *buffer = task->tk_rqstp->rq_buffer;
size_t size;
struct rpc_buffer *buf;

- if (!buffer)
- return;
-
buf = container_of(buffer, struct rpc_buffer, data);
size = buf->len;

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index ea244b2..685e6d2 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1295,7 +1295,7 @@ void xprt_release(struct rpc_task *task)
xprt_schedule_autodisconnect(xprt);
spin_unlock_bh(&xprt->transport_lock);
if (req->rq_buffer)
- xprt->ops->buf_free(req->rq_buffer);
+ xprt->ops->buf_free(task);
xprt_inject_disconnect(xprt);
if (req->rq_cred != NULL)
put_rpccred(req->rq_cred);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index b7c698f7f..78de515 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -186,7 +186,7 @@ xprt_rdma_bc_allocate(struct rpc_task *task)
}

static void
-xprt_rdma_bc_free(void *buffer)
+xprt_rdma_bc_free(struct rpc_task *task)
{
/* No-op: ctxt and page have already been freed. */
}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index daa7d4d..e7f222f 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -523,7 +523,6 @@ xprt_rdma_allocate(struct rpc_task *task)
out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
- req->rl_task = task;
rqst->rq_buffer = req->rl_sendbuf->rg_base;
return 0;

@@ -571,31 +570,25 @@ out_fail:
return -ENOMEM;
}

-/*
- * This function returns all RDMA resources to the pool.
+/**
+ * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
+ * @task: RPC task
+ *
+ * Caller guarantees rqst->rq_buffer is non-NULL.
*/
static void
-xprt_rdma_free(void *buffer)
+xprt_rdma_free(struct rpc_task *task)
{
- struct rpcrdma_req *req;
- struct rpcrdma_xprt *r_xprt;
- struct rpcrdma_regbuf *rb;
-
- if (buffer == NULL)
- return;
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);

- rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
- req = rb->rg_owner;
if (req->rl_backchannel)
return;

- r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
-
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);

- r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req,
- !RPC_IS_ASYNC(req->rl_task));
-
+ r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
rpcrdma_buffer_put(req);
}

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 6d466ab..296d9ab 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -283,7 +283,6 @@ struct rpcrdma_req {
struct list_head rl_free;
unsigned int rl_niovs;
unsigned int rl_connect_cookie;
- struct rpc_task *rl_task;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index d6db5cf..512cf6a 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2560,13 +2560,11 @@ static int bc_malloc(struct rpc_task *task)
/*
* Free the space allocated in the bc_alloc routine
*/
-static void bc_free(void *buffer)
+static void bc_free(struct rpc_task *task)
{
+ void *buffer = task->tk_rqstp->rq_buffer;
struct rpc_buffer *buf;

- if (!buffer)
- return;
-
buf = container_of(buffer, struct rpc_buffer, data);
free_page((unsigned long)buf);
}


2016-08-15 20:50:57

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 05/22] SUNRPC: Separate buffer pointers for RPC Call and Reply messages

For xprtrdma, the RPC Call and Reply buffers are involved in real
I/O operations.

To start with, the DMA direction of the I/O for a Call is opposite
that of a Reply.

In the current arrangement, the Reply buffer address is on a
four-byte alignment just past the call buffer. Would be friendlier
on some platforms if that was at a DMA cache alignment instead.

Because the current arrangement allocates a single memory region
which contains both buffers, the RPC Reply buffer often contains a
page boundary in it when the Call buffer is large enough (which is
frequent).

It would be a little nicer for setting up DMA operations (and
possible registration of the Reply buffer) if the two buffers were
separated, well-aligned, and contained as few page boundaries as
possible.

Now, I could just pad out the single memory region used for the pair
of buffers. But frequently that would mean a lot of unused space to
ensure the Reply buffer did not have a page boundary.

Add a separate pointer to rpc_rqst that points right to the RPC
Reply buffer. This makes no difference to xprtsock, but it will help
xprtrdma in subsequent patches.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprt.h | 5 +++--
net/sunrpc/clnt.c | 2 +-
net/sunrpc/sched.c | 2 ++
net/sunrpc/xprtrdma/transport.c | 2 ++
4 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 272c108..8e3f292 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -84,8 +84,9 @@ struct rpc_rqst {
struct list_head rq_list;

__be32 *rq_buffer; /* Call XDR encode buffer */
- size_t rq_callsize,
- rq_rcvsize;
+ size_t rq_callsize;
+ __be32 *rq_rbuffer; /* Reply XDR decode buffer */
+ size_t rq_rcvsize;
size_t rq_xmit_bytes_sent; /* total bytes sent */
size_t rq_reply_bytes_recvd; /* total reply bytes */
/* received */
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 601b3ca..4c933a8 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1768,7 +1768,7 @@ rpc_xdr_encode(struct rpc_task *task)
req->rq_buffer,
req->rq_callsize);
xdr_buf_init(&req->rq_rcv_buf,
- (__be32 *)((char *)req->rq_buffer + req->rq_callsize),
+ req->rq_rbuffer,
req->rq_rcvsize);

p = rpc_encode_header(task);
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 6690ebc..8ed0c87 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -891,6 +891,8 @@ int rpc_malloc(struct rpc_task *task)
dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
task->tk_pid, size, buf);
rqst->rq_buffer = buf->data;
+ rqst->rq_rbuffer = (__be32 *)((char *)rqst->rq_buffer +
+ rqst->rq_callsize);
return 0;
}
EXPORT_SYMBOL_GPL(rpc_malloc);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index e7f222f..1f5dc19 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -524,6 +524,8 @@ out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
rqst->rq_buffer = req->rl_sendbuf->rg_base;
+ rqst->rq_rbuffer = (__be32 *)((char *)rqst->rq_buffer +
+ rqst->rq_rcvsize);
return 0;

out_rdmabuf:


2016-08-15 20:51:05

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 06/22] SUNRPC: Add a transport-specific private field in rpc_rqst

Currently there's a hidden and indirect mechanism for finding the
rpcrdma_req that goes with an rpc_rqst. It depends on getting from
the rq_buffer pointer in struct rpc_rqst to the struct
rpcrdma_regbuf that controls that buffer, and then to the struct
rpcrdma_req it goes with.

This was done back in the day to avoid the need to add a per-rqst
pointer or to alter the buf_free API when support for RPC-over-RDMA
was introduced.

I'm about to change the way regbuf's work to support larger inline
thresholds. Now is a good time to replace this indirect mechanism
with something that is more straightforward. I guess this should be
considered a clean up.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/xprtrdma/backchannel.c | 6 ++----
net/sunrpc/xprtrdma/transport.c | 2 +-
net/sunrpc/xprtrdma/verbs.c | 1 -
net/sunrpc/xprtrdma/xprt_rdma.h | 13 +++++++------
5 files changed, 11 insertions(+), 12 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 8e3f292..6080482 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -83,6 +83,7 @@ struct rpc_rqst {
void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
struct list_head rq_list;

+ void *rq_xprtdata; /* Per-xprt private data */
__be32 *rq_buffer; /* Call XDR encode buffer */
size_t rq_callsize;
__be32 *rq_rbuffer; /* Reply XDR decode buffer */
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index d3cfaf2..c4904f8 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -55,11 +55,9 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
- rb->rg_owner = req;
req->rl_sendbuf = rb;
- /* so that rpcr_to_rdmar works when receiving a request */
- rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
- xdr_buf_init(&rqst->rq_snd_buf, rqst->rq_buffer, size);
+ xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, size);
+ rpcrdma_set_xprtdata(rqst, req);
return 0;

out_fail:
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 1f5dc19..343bb31 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -523,6 +523,7 @@ xprt_rdma_allocate(struct rpc_task *task)
out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
+ rpcrdma_set_xprtdata(rqst, req);
rqst->rq_buffer = req->rl_sendbuf->rg_base;
rqst->rq_rbuffer = (__be32 *)((char *)rqst->rq_buffer +
rqst->rq_rcvsize);
@@ -560,7 +561,6 @@ out_sendbuf:
rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
if (IS_ERR(rb))
goto out_fail;
- rb->rg_owner = req;

r_xprt->rx_stats.hardway_register_count += size;
rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 536d0be..baf7f43 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1191,7 +1191,6 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
iov->length = size;
iov->lkey = ia->ri_pd->local_dma_lkey;
rb->rg_size = size;
- rb->rg_owner = NULL;
return rb;

out_free:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 296d9ab..d8ba118 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -113,7 +113,6 @@ struct rpcrdma_ep {

struct rpcrdma_regbuf {
size_t rg_size;
- struct rpcrdma_req *rg_owner;
struct ib_sge rg_iov;
__be32 rg_base[0] __attribute__ ((aligned(256)));
};
@@ -297,14 +296,16 @@ struct rpcrdma_req {
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};

+static inline void
+rpcrdma_set_xprtdata(struct rpc_rqst *rqst, struct rpcrdma_req *req)
+{
+ rqst->rq_xprtdata = (void *)req;
+}
+
static inline struct rpcrdma_req *
rpcr_to_rdmar(struct rpc_rqst *rqst)
{
- void *buffer = rqst->rq_buffer;
- struct rpcrdma_regbuf *rb;
-
- rb = container_of(buffer, struct rpcrdma_regbuf, rg_base);
- return rb->rg_owner;
+ return (struct rpcrdma_req *)rqst->rq_xprtdata;
}

/*


2016-08-15 20:51:13

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 07/22] xprtrdma: Initialize separate RPC call and reply buffers

RPC-over-RDMA needs to separate its RPC call and reply buffers.

o When an RPC Call is sent, rq_snd_buf is DMA mapped for an RDMA
Send operation using DMA_TO_DEVICE

o If the client expects a large RPC reply, it DMA maps rq_rcv_buf
as part of a Reply chunk using DMA_FROM_DEVICE

The two mappings are for data movement in opposite directions.

DMA-API.txt suggests that if these mappings share a DMA cacheline,
bad things can happen. This could occur in the final bytes of
rq_snd_buf and the first bytes of rq_rcv_buf if the two buffers
happen to share a DMA cacheline.

On x86_64 the cacheline size is typically 8 bytes, and RPC call
messages are usually much smaller than the send buffer, so this
hasn't been a noticeable problem. But the DMA cacheline size can be
larger on other platforms.

Also, often rq_rcv_buf starts most of the way into a page, thus
an additional RDMA segment is needed to map and register the end of
that buffer. Try to avoid that scenario to reduce the cost of
registering and invalidating Reply chunks.

Instead of carrying a single regbuf that covers both rq_snd_buf and
rq_rcv_buf, each struct rpcrdma_req now carries one regbuf for
rq_snd_buf and one regbuf for rq_rcv_buf.

Some incidental changes worth noting:

- To clear out some spaghetti, refactor xprt_rdma_allocate.
- The value stored in rg_size is the same as the value stored in
the iov.length field, so eliminate rg_size

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 151 +++++++++++++++++++++++++--------------
net/sunrpc/xprtrdma/verbs.c | 2 -
net/sunrpc/xprtrdma/xprt_rdma.h | 6 +-
3 files changed, 99 insertions(+), 60 deletions(-)

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 343bb31..4d2d068 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -477,6 +477,86 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
}
}

+/* Allocate a fixed-size buffer in which to construct and send the
+ * RPC-over-RDMA header for this request.
+ */
+static bool
+rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ gfp_t flags)
+{
+ size_t size = r_xprt->rx_data.inline_wsize;
+ struct rpcrdma_regbuf *rb;
+
+ if (req->rl_rdmabuf)
+ return true;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ return false;
+
+ r_xprt->rx_stats.hardway_register_count += size;
+ req->rl_rdmabuf = rb;
+ return true;
+}
+
+/* RPC/RDMA marshaling may choose to send payload bearing ops inline,
+ * if the resulting Call message is smaller than the inline threshold.
+ * The value of the "rq_callsize" argument accounts for RPC header
+ * requirements, but not for the data payload in these cases.
+ *
+ * See rpcrdma_inline_pullup.
+ */
+static bool
+rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ size_t size, gfp_t flags)
+{
+ struct rpcrdma_regbuf *rb;
+ size_t min_size;
+
+ if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
+ return true;
+
+ min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+ if (IS_ERR(rb))
+ return false;
+
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ r_xprt->rx_stats.hardway_register_count += min_size;
+ req->rl_sendbuf = rb;
+ return true;
+}
+
+/* The rq_rcv_buf is used only if a Reply chunk is necessary.
+ * The decision to use a Reply chunk is made later in
+ * rpcrdma_marshal_req. This buffer is registered at that time.
+ *
+ * Otherwise, the associated RPC Reply arrives in a separate
+ * Receive buffer, arbitrarily chosen by the HCA. The buffer
+ * allocated here for the RPC Reply is not utilized in that
+ * case. See rpcrdma_inline_fixup.
+ *
+ * A regbuf is used here to remember the buffer size.
+ */
+static bool
+rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ size_t size, gfp_t flags)
+{
+ struct rpcrdma_regbuf *rb;
+
+ if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
+ return true;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ return false;
+
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_recvbuf);
+ r_xprt->rx_stats.hardway_register_count += size;
+ req->rl_recvbuf = rb;
+ return true;
+}
+
/**
* xprt_rdma_allocate - allocate transport resources for an RPC
* @task: RPC task
@@ -487,22 +567,18 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
* EIO: A permanent error occurred, do not retry
*
* The RDMA allocate/free functions need the task structure as a place
- * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence.
+ * to hide the struct rpcrdma_req, which is necessary for the actual
+ * send/recv sequence.
*
- * The RPC layer allocates both send and receive buffers in the same call
- * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
- * We may register rq_rcv_buf when using reply chunks.
+ * xprt_rdma_allocate provides buffers that are already mapped for
+ * DMA, and a local DMA lkey is provided for each.
*/
static int
xprt_rdma_allocate(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
- size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
- struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
- size_t min_size;
gfp_t flags;

req = rpcrdma_buffer_get(&r_xprt->rx_buf);
@@ -513,60 +589,23 @@ xprt_rdma_allocate(struct rpc_task *task)
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;

- if (req->rl_rdmabuf == NULL)
- goto out_rdmabuf;
- if (req->rl_sendbuf == NULL)
- goto out_sendbuf;
- if (size > req->rl_sendbuf->rg_size)
- goto out_sendbuf;
+ if (!rpcrdma_get_rdmabuf(r_xprt, req, flags))
+ goto out_fail;
+ if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
+ goto out_fail;
+ if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
+ goto out_fail;
+
+ dprintk("RPC: %5u %s: send size = %zd, recv size = %zd, req = %p\n",
+ task->tk_pid, __func__, rqst->rq_callsize,
+ rqst->rq_rcvsize, req);

-out:
- dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
rpcrdma_set_xprtdata(rqst, req);
rqst->rq_buffer = req->rl_sendbuf->rg_base;
- rqst->rq_rbuffer = (__be32 *)((char *)rqst->rq_buffer +
- rqst->rq_rcvsize);
+ rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
return 0;

-out_rdmabuf:
- min_size = r_xprt->rx_data.inline_wsize;
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
- if (IS_ERR(rb))
- goto out_fail;
- req->rl_rdmabuf = rb;
-
-out_sendbuf:
- /* XDR encoding and RPC/RDMA marshaling of this request has not
- * yet occurred. Thus a lower bound is needed to prevent buffer
- * overrun during marshaling.
- *
- * RPC/RDMA marshaling may choose to send payload bearing ops
- * inline, if the result is smaller than the inline threshold.
- * The value of the "size" argument accounts for header
- * requirements but not for the payload in these cases.
- *
- * Likewise, allocate enough space to receive a reply up to the
- * size of the inline threshold.
- *
- * It's unlikely that both the send header and the received
- * reply will be large, but slush is provided here to allow
- * flexibility when marshaling.
- */
- min_size = r_xprt->rx_data.inline_rsize;
- min_size += r_xprt->rx_data.inline_wsize;
- if (size < min_size)
- size = min_size;
-
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
- if (IS_ERR(rb))
- goto out_fail;
-
- r_xprt->rx_stats.hardway_register_count += size;
- rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
- req->rl_sendbuf = rb;
- goto out;
-
out_fail:
rpcrdma_buffer_put(req);
return -ENOMEM;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index baf7f43..9c3e58e 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -974,6 +974,7 @@ rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
void
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
+ rpcrdma_free_regbuf(ia, req->rl_recvbuf);
rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
kfree(req);
@@ -1190,7 +1191,6 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)

iov->length = size;
iov->lkey = ia->ri_pd->local_dma_lkey;
- rb->rg_size = size;
return rb;

out_free:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index d8ba118..3f707a6 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -112,7 +112,6 @@ struct rpcrdma_ep {
*/

struct rpcrdma_regbuf {
- size_t rg_size;
struct ib_sge rg_iov;
__be32 rg_base[0] __attribute__ ((aligned(256)));
};
@@ -285,8 +284,9 @@ struct rpcrdma_req {
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
- struct rpcrdma_regbuf *rl_rdmabuf;
- struct rpcrdma_regbuf *rl_sendbuf;
+ struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
+ struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
+ struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */

struct ib_cqe rl_cqe;
struct list_head rl_all;


2016-08-15 20:51:21

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 08/22] xprtrdma: Use smaller buffers for RPC-over-RDMA headers

Commit 949317464bc2 ("xprtrdma: Limit number of RDMA segments in
RPC-over-RDMA headers") capped the number of chunks that may appear
in RPC-over-RDMA headers. The maximum header size can be estimated
and fixed to avoid allocating buffer space that is never used.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 5 ++---
net/sunrpc/xprtrdma/transport.c | 2 +-
net/sunrpc/xprtrdma/xprt_rdma.h | 5 ++++-
3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index c4904f8..60fc991 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -45,13 +45,12 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
return PTR_ERR(req);
req->rl_backchannel = true;

- size = r_xprt->rx_data.inline_wsize;
- rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ rb = rpcrdma_alloc_regbuf(ia, RPCRDMA_HDRBUF_SIZE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;

- size += r_xprt->rx_data.inline_rsize;
+ size = r_xprt->rx_data.inline_rsize;
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 4d2d068..9c90590 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -484,7 +484,7 @@ static bool
rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
gfp_t flags)
{
- size_t size = r_xprt->rx_data.inline_wsize;
+ size_t size = RPCRDMA_HDRBUF_SIZE;
struct rpcrdma_regbuf *rb;

if (req->rl_rdmabuf)
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3f707a6..4c77886 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -160,7 +160,10 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
* The smallest inline threshold is 1024 bytes, ensuring that
* at least 750 bytes are available for RPC messages.
*/
-#define RPCRDMA_MAX_HDR_SEGS (8)
+enum {
+ RPCRDMA_MAX_HDR_SEGS = 8,
+ RPCRDMA_HDRBUF_SIZE = 256,
+};

/*
* struct rpcrdma_rep -- this structure encapsulates state required to recv


2016-08-15 20:51:29

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 09/22] xprtrdma: Replace DMA_BIDIRECTIONAL

The use of DMA_BIDIRECTIONAL is discouraged by DMA-API.txt.
Fortunately, xprtrdma now knows which direction I/O is going as
soon as it allocates each regbuf.

The RPC Call and Reply buffers are no longer the same regbuf. They
can each be labeled correctly now. The RPC Reply buffer is never
part of either a Send or Receive WR, but it can be part of Reply
chunk, which is mapped and registered via ->ro_map . So it is not
DMA mapped (DMA_NONE), avoiding a double-mapping.

Since Receive buffers are no longer DMA_BIDIRECTIONAL and their
contents are never modified by the host CPU, DMA-API-HOWTO.txt
suggests that a DMA sync before posting each buffer should be
unnecessary. (See my_card_interrupt_handler).

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 5 ++-
net/sunrpc/xprtrdma/transport.c | 7 +++--
net/sunrpc/xprtrdma/verbs.c | 55 +++++++++++++++++--------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 4 ++-
4 files changed, 36 insertions(+), 35 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 60fc991..ceae872 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -45,13 +45,14 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
return PTR_ERR(req);
req->rl_backchannel = true;

- rb = rpcrdma_alloc_regbuf(ia, RPCRDMA_HDRBUF_SIZE, GFP_KERNEL);
+ rb = rpcrdma_alloc_regbuf(ia, RPCRDMA_HDRBUF_SIZE,
+ DMA_TO_DEVICE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;

size = r_xprt->rx_data.inline_rsize;
- rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ rb = rpcrdma_alloc_regbuf(ia, size, DMA_TO_DEVICE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_sendbuf = rb;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 9c90590..37f5b22 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -490,7 +490,7 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (req->rl_rdmabuf)
return true;

- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

@@ -517,7 +517,8 @@ rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return true;

min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size,
+ DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

@@ -547,7 +548,7 @@ rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
return true;

- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, DMA_NONE, flags);
if (IS_ERR(rb))
return false;

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 9c3e58e..0bd2ace 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -865,7 +865,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
goto out;

rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
- GFP_KERNEL);
+ DMA_FROM_DEVICE, GFP_KERNEL);
if (IS_ERR(rep->rr_rdmabuf)) {
rc = PTR_ERR(rep->rr_rdmabuf);
goto out_free;
@@ -1153,27 +1153,24 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
spin_unlock(&buffers->rb_lock);
}

-/*
- * Wrappers for internal-use kmalloc memory registration, used by buffer code.
- */
-
/**
- * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
+ * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
* @ia: controlling rpcrdma_ia
* @size: size of buffer to be allocated, in bytes
+ * @direction: direction of data movement
* @flags: GFP flags
*
- * Returns pointer to private header of an area of internally
- * registered memory, or an ERR_PTR. The registered buffer follows
- * the end of the private header.
+ * Returns an ERR_PTR, or a pointer to a regbuf, which is a
+ * contiguous memory region that is DMA mapped persistently, and
+ * is registered for local I/O.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
- * receiving the payload of RDMA RECV operations. regbufs are not
- * used for RDMA READ/WRITE operations, thus are registered only for
- * LOCAL access.
+ * receiving the payload of RDMA RECV operations. During Long Calls
+ * or Replies they may be registered externally via ro_map.
*/
struct rpcrdma_regbuf *
-rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
+rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size,
+ enum dma_data_direction direction, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
struct ib_sge *iov;
@@ -1182,15 +1179,20 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
if (rb == NULL)
goto out;

+ rb->rg_direction = direction;
iov = &rb->rg_iov;
- iov->addr = ib_dma_map_single(ia->ri_device,
- (void *)rb->rg_base, size,
- DMA_BIDIRECTIONAL);
- if (ib_dma_mapping_error(ia->ri_device, iov->addr))
- goto out_free;
-
iov->length = size;
iov->lkey = ia->ri_pd->local_dma_lkey;
+
+ if (direction != DMA_NONE) {
+ iov->addr = ib_dma_map_single(ia->ri_device,
+ (void *)rb->rg_base,
+ rdmab_length(rb),
+ rb->rg_direction);
+ if (ib_dma_mapping_error(ia->ri_device, iov->addr))
+ goto out_free;
+ }
+
return rb;

out_free:
@@ -1207,14 +1209,14 @@ out:
void
rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
- struct ib_sge *iov;
-
if (!rb)
return;

- iov = &rb->rg_iov;
- ib_dma_unmap_single(ia->ri_device,
- iov->addr, iov->length, DMA_BIDIRECTIONAL);
+ if (rb->rg_direction != DMA_NONE) {
+ ib_dma_unmap_single(ia->ri_device, rdmab_addr(rb),
+ rdmab_length(rb), rb->rg_direction);
+ }
+
kfree(rb);
}

@@ -1286,11 +1288,6 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;

- ib_dma_sync_single_for_cpu(ia->ri_device,
- rdmab_addr(rep->rr_rdmabuf),
- rdmab_length(rep->rr_rdmabuf),
- DMA_BIDIRECTIONAL);
-
rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
if (rc)
goto out_postrecv;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 4c77886..c6861ab 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -113,6 +113,7 @@ struct rpcrdma_ep {

struct rpcrdma_regbuf {
struct ib_sge rg_iov;
+ enum dma_data_direction rg_direction;
__be32 rg_base[0] __attribute__ ((aligned(256)));
};

@@ -476,7 +477,8 @@ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);

struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
- size_t, gfp_t);
+ size_t, enum dma_data_direction,
+ gfp_t);
void rpcrdma_free_regbuf(struct rpcrdma_ia *,
struct rpcrdma_regbuf *);



2016-08-15 20:51:37

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 10/22] xprtrdma: Delay DMA mapping Send and Receive buffers

Currently, each regbuf is allocated and DMA mapped at the same time.
This is done during transport creation.

When a device driver is unloaded, every DMA-mapped buffer in use by
a transport has to be unmapped, and then remapped to the new
device if the driver is loaded again. Remapping will have to be done
_after_ the connect worker has set up the new device.

But there's an ordering problem:

call_allocate, which invokes xprt_rdma_allocate which calls
rpcrdma_alloc_regbuf to allocate Send buffers, happens _before_
the connect worker can run to set up the new device.

Instead, at transport creation, allocate each buffer, but leave it
unmapped. Once the RPC carries these buffers into ->send_request, by
which time a transport connection should have been established,
check to see that the RPC's buffers have been DMA mapped. If not,
map them there.

When device driver unplug support is added, it will simply unmap all
the transport's regbufs, but it doesn't have to deallocate the
underlying memory.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 8 ++++
net/sunrpc/xprtrdma/rpc_rdma.c | 9 +++++
net/sunrpc/xprtrdma/verbs.c | 71 +++++++++++++++++++++++--------------
net/sunrpc/xprtrdma/xprt_rdma.h | 16 ++++++++
4 files changed, 78 insertions(+), 26 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index ceae872..8bc249e 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -230,16 +230,24 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
__func__, (int)rpclen, rqst->rq_svec[0].iov_base);
#endif

+ if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
+ goto out_map;
req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);

+ if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
+ goto out_map;
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);

req->rl_niovs = 2;
return 0;
+
+out_map:
+ pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+ return -EIO;
}

/**
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 845586f..68a39c0 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -681,6 +681,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
transfertypes[rtype], transfertypes[wtype],
hdrlen, rpclen);

+ if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
+ goto out_map;
req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
@@ -689,6 +691,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
if (rtype == rpcrdma_areadch)
return 0;

+ if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
+ goto out_map;
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
@@ -704,6 +708,11 @@ out_overflow:
out_unmap:
r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
return PTR_ERR(iptr);
+
+out_map:
+ pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+ iptr = ERR_PTR(-EIO);
+ goto out_unmap;
}

/*
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 0bd2ace..b88bcf4 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1160,9 +1160,8 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
* @direction: direction of data movement
* @flags: GFP flags
*
- * Returns an ERR_PTR, or a pointer to a regbuf, which is a
- * contiguous memory region that is DMA mapped persistently, and
- * is registered for local I/O.
+ * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
+ * can be persistently DMA-mapped for I/O.
*
* xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
* receiving the payload of RDMA RECV operations. During Long Calls
@@ -1173,32 +1172,50 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size,
enum dma_data_direction direction, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
- struct ib_sge *iov;

rb = kmalloc(sizeof(*rb) + size, flags);
if (rb == NULL)
- goto out;
+ return ERR_PTR(-ENOMEM);

+ rb->rg_device = NULL;
rb->rg_direction = direction;
- iov = &rb->rg_iov;
- iov->length = size;
- iov->lkey = ia->ri_pd->local_dma_lkey;
-
- if (direction != DMA_NONE) {
- iov->addr = ib_dma_map_single(ia->ri_device,
- (void *)rb->rg_base,
- rdmab_length(rb),
- rb->rg_direction);
- if (ib_dma_mapping_error(ia->ri_device, iov->addr))
- goto out_free;
- }
+ rb->rg_iov.length = size;

return rb;
+}

-out_free:
- kfree(rb);
-out:
- return ERR_PTR(-ENOMEM);
+/**
+ * __rpcrdma_map_regbuf - DMA-map a regbuf
+ * @ia: controlling rpcrdma_ia
+ * @rb: regbuf to be mapped
+ */
+bool
+__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+ if (rb->rg_direction == DMA_NONE)
+ return false;
+
+ rb->rg_iov.addr = ib_dma_map_single(ia->ri_device,
+ (void *)rb->rg_base,
+ rdmab_length(rb),
+ rb->rg_direction);
+ if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb)))
+ return false;
+
+ rb->rg_device = ia->ri_device;
+ rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
+ return true;
+}
+
+static void
+rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
+{
+ if (!rpcrdma_regbuf_is_mapped(rb))
+ return;
+
+ ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
+ rdmab_length(rb), rb->rg_direction);
+ rb->rg_device = NULL;
}

/**
@@ -1212,11 +1229,7 @@ rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
if (!rb)
return;

- if (rb->rg_direction != DMA_NONE) {
- ib_dma_unmap_single(ia->ri_device, rdmab_addr(rb),
- rdmab_length(rb), rb->rg_direction);
- }
-
+ rpcrdma_dma_unmap_regbuf(rb);
kfree(rb);
}

@@ -1288,11 +1301,17 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;

+ if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
+ goto out_map;
rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
if (rc)
goto out_postrecv;
return 0;

+out_map:
+ pr_err("rpcrdma: failed to DMA map the Receive buffer\n");
+ return -EIO;
+
out_postrecv:
pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
return -ENOTCONN;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c6861ab..a8eb270 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -113,6 +113,7 @@ struct rpcrdma_ep {

struct rpcrdma_regbuf {
struct ib_sge rg_iov;
+ struct ib_device *rg_device;
enum dma_data_direction rg_direction;
__be32 rg_base[0] __attribute__ ((aligned(256)));
};
@@ -479,9 +480,24 @@ void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);
struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
size_t, enum dma_data_direction,
gfp_t);
+bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *);
void rpcrdma_free_regbuf(struct rpcrdma_ia *,
struct rpcrdma_regbuf *);

+static inline bool
+rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_device != NULL;
+}
+
+static inline bool
+rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+ if (!rpcrdma_regbuf_is_mapped(rb))
+ return __rpcrdma_dma_map_regbuf(ia, rb);
+ return true;
+}
+
int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);

int rpcrdma_alloc_wq(void);


2016-08-15 20:51:46

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 11/22] xprtrdma: Eliminate "ia" argument in rpcrdma_{alloc, free}_regbuf

Clean up. The "ia" argument is no longer used.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 7 +++----
net/sunrpc/xprtrdma/transport.c | 11 +++++------
net/sunrpc/xprtrdma/verbs.c | 28 ++++++++++++----------------
net/sunrpc/xprtrdma/xprt_rdma.h | 8 +++-----
4 files changed, 23 insertions(+), 31 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 8bc249e..a19530d 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -27,7 +27,7 @@ static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
list_del(&req->rl_all);
spin_unlock(&buf->rb_reqslock);

- rpcrdma_destroy_req(&r_xprt->rx_ia, req);
+ rpcrdma_destroy_req(req);

kfree(rqst);
}
@@ -35,7 +35,6 @@ static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
size_t size;
@@ -45,14 +44,14 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
return PTR_ERR(req);
req->rl_backchannel = true;

- rb = rpcrdma_alloc_regbuf(ia, RPCRDMA_HDRBUF_SIZE,
+ rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
DMA_TO_DEVICE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;

size = r_xprt->rx_data.inline_rsize;
- rb = rpcrdma_alloc_regbuf(ia, size, DMA_TO_DEVICE, GFP_KERNEL);
+ rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_sendbuf = rb;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 37f5b22..3f50c83 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -490,7 +490,7 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (req->rl_rdmabuf)
return true;

- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, DMA_TO_DEVICE, flags);
+ rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

@@ -517,12 +517,11 @@ rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return true;

min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size,
- DMA_TO_DEVICE, flags);
+ rb = rpcrdma_alloc_regbuf(min_size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

- rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ rpcrdma_free_regbuf(req->rl_sendbuf);
r_xprt->rx_stats.hardway_register_count += min_size;
req->rl_sendbuf = rb;
return true;
@@ -548,11 +547,11 @@ rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
return true;

- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, DMA_NONE, flags);
+ rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
if (IS_ERR(rb))
return false;

- rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_recvbuf);
+ rpcrdma_free_regbuf(req->rl_recvbuf);
r_xprt->rx_stats.hardway_register_count += size;
req->rl_recvbuf = rb;
return true;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index b88bcf4..ba1ca88 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -864,7 +864,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
if (rep == NULL)
goto out;

- rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
+ rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
DMA_FROM_DEVICE, GFP_KERNEL);
if (IS_ERR(rep->rr_rdmabuf)) {
rc = PTR_ERR(rep->rr_rdmabuf);
@@ -965,18 +965,18 @@ rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
}

static void
-rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
+rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
{
- rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
+ rpcrdma_free_regbuf(rep->rr_rdmabuf);
kfree(rep);
}

void
-rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+rpcrdma_destroy_req(struct rpcrdma_req *req)
{
- rpcrdma_free_regbuf(ia, req->rl_recvbuf);
- rpcrdma_free_regbuf(ia, req->rl_sendbuf);
- rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
+ rpcrdma_free_regbuf(req->rl_recvbuf);
+ rpcrdma_free_regbuf(req->rl_sendbuf);
+ rpcrdma_free_regbuf(req->rl_rdmabuf);
kfree(req);
}

@@ -1009,15 +1009,13 @@ rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
- struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-
cancel_delayed_work_sync(&buf->rb_recovery_worker);

while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;

rep = rpcrdma_buffer_get_rep_locked(buf);
- rpcrdma_destroy_rep(ia, rep);
+ rpcrdma_destroy_rep(rep);
}

spin_lock(&buf->rb_reqslock);
@@ -1029,7 +1027,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
list_del(&req->rl_all);

spin_unlock(&buf->rb_reqslock);
- rpcrdma_destroy_req(ia, req);
+ rpcrdma_destroy_req(req);
spin_lock(&buf->rb_reqslock);
}
spin_unlock(&buf->rb_reqslock);
@@ -1155,7 +1153,6 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)

/**
* rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
- * @ia: controlling rpcrdma_ia
* @size: size of buffer to be allocated, in bytes
* @direction: direction of data movement
* @flags: GFP flags
@@ -1168,8 +1165,8 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
* or Replies they may be registered externally via ro_map.
*/
struct rpcrdma_regbuf *
-rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size,
- enum dma_data_direction direction, gfp_t flags)
+rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
+ gfp_t flags)
{
struct rpcrdma_regbuf *rb;

@@ -1220,11 +1217,10 @@ rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)

/**
* rpcrdma_free_regbuf - deregister and free registered buffer
- * @ia: controlling rpcrdma_ia
* @rb: regbuf to be deregistered and freed
*/
void
-rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
{
if (!rb)
return;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a8eb270..f6e973d 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -464,7 +464,7 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
*/
struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
-void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
+void rpcrdma_destroy_req(struct rpcrdma_req *);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);

@@ -477,12 +477,10 @@ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);

void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);

-struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
- size_t, enum dma_data_direction,
+struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction,
gfp_t);
bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *);
-void rpcrdma_free_regbuf(struct rpcrdma_ia *,
- struct rpcrdma_regbuf *);
+void rpcrdma_free_regbuf(struct rpcrdma_regbuf *);

static inline bool
rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)


2016-08-15 20:51:54

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 12/22] xprtrdma: Simplify rpcrdma_ep_post_recv()

Clean up.

Since commit fc66448549bb ("xprtrdma: Split the completion queue"),
rpcrdma_ep_post_recv() no longer uses the "ep" argument.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 2 +-
net/sunrpc/xprtrdma/rpc_rdma.c | 2 +-
net/sunrpc/xprtrdma/verbs.c | 9 ++-------
net/sunrpc/xprtrdma/xprt_rdma.h | 3 +--
4 files changed, 5 insertions(+), 11 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index a19530d..887ef44 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -397,7 +397,7 @@ out_overflow:
out_short:
pr_warn("RPC/RDMA short backward direction call\n");

- if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
xprt_disconnect_done(xprt);
else
pr_warn("RPC: %s: reposting rep %p\n",
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 68a39c0..6187cee 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1141,6 +1141,6 @@ out_duplicate:

repost:
r_xprt->rx_stats.bad_reply_count++;
- if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
rpcrdma_recv_buffer_put(rep);
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ba1ca88..81cae65 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1246,7 +1246,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
int i, rc;

if (rep) {
- rc = rpcrdma_ep_post_recv(ia, ep, rep);
+ rc = rpcrdma_ep_post_recv(ia, rep);
if (rc)
return rc;
req->rl_reply = NULL;
@@ -1281,12 +1281,8 @@ out_postsend_err:
return -ENOTCONN;
}

-/*
- * (Re)post a receive buffer.
- */
int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
- struct rpcrdma_ep *ep,
struct rpcrdma_rep *rep)
{
struct ib_recv_wr recv_wr, *recv_wr_fail;
@@ -1325,7 +1321,6 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_rep *rep;
int rc;

@@ -1336,7 +1331,7 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
rep = rpcrdma_buffer_get_rep_locked(buffers);
spin_unlock(&buffers->rb_lock);

- rc = rpcrdma_ep_post_recv(ia, ep, rep);
+ rc = rpcrdma_ep_post_recv(ia, rep);
if (rc)
goto out_rc;
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index f6e973d..19da45b 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -456,8 +456,7 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);

int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
struct rpcrdma_req *);
-int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
- struct rpcrdma_rep *);
+int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_rep *);

/*
* Buffer calls - xprtrdma/verbs.c


2016-08-15 20:52:02

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 13/22] xprtrdma: Move send_wr to struct rpcrdma_req

Clean up: Most of the fields in each send_wr do not vary. There is
no need to initialize them before each ib_post_send(). This removes
a large-ish data structure from the stack.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 3 ++-
net/sunrpc/xprtrdma/rpc_rdma.c | 5 +++--
net/sunrpc/xprtrdma/verbs.c | 36 +++++++++++++++++-------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 4 ++--
4 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 887ef44..61a58f5 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -241,7 +241,8 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);

- req->rl_niovs = 2;
+ req->rl_send_wr.num_sge = 2;
+
return 0;

out_map:
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 6187cee..c2906e3 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -687,7 +687,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);

- req->rl_niovs = 1;
+ req->rl_send_wr.num_sge = 1;
if (rtype == rpcrdma_areadch)
return 0;

@@ -697,7 +697,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);

- req->rl_niovs = 2;
+ req->rl_send_wr.num_sge = 2;
+
return 0;

out_overflow:
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 81cae65..d5e88ee 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -848,6 +848,10 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
req->rl_cqe.done = rpcrdma_wc_send;
req->rl_buffer = &r_xprt->rx_buf;
INIT_LIST_HEAD(&req->rl_registered);
+ req->rl_send_wr.next = NULL;
+ req->rl_send_wr.wr_cqe = &req->rl_cqe;
+ req->rl_send_wr.sg_list = req->rl_send_iov;
+ req->rl_send_wr.opcode = IB_WR_SEND;
return req;
}

@@ -1112,7 +1116,7 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
struct rpcrdma_buffer *buffers = req->rl_buffer;
struct rpcrdma_rep *rep = req->rl_reply;

- req->rl_niovs = 0;
+ req->rl_send_wr.num_sge = 0;
req->rl_reply = NULL;

spin_lock(&buffers->rb_lock);
@@ -1240,38 +1244,32 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_req *req)
{
struct ib_device *device = ia->ri_device;
- struct ib_send_wr send_wr, *send_wr_fail;
- struct rpcrdma_rep *rep = req->rl_reply;
- struct ib_sge *iov = req->rl_send_iov;
+ struct ib_send_wr *send_wr = &req->rl_send_wr;
+ struct ib_send_wr *send_wr_fail;
+ struct ib_sge *sge = req->rl_send_iov;
int i, rc;

- if (rep) {
- rc = rpcrdma_ep_post_recv(ia, rep);
+ if (req->rl_reply) {
+ rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
if (rc)
return rc;
req->rl_reply = NULL;
}

- send_wr.next = NULL;
- send_wr.wr_cqe = &req->rl_cqe;
- send_wr.sg_list = iov;
- send_wr.num_sge = req->rl_niovs;
- send_wr.opcode = IB_WR_SEND;
-
- for (i = 0; i < send_wr.num_sge; i++)
- ib_dma_sync_single_for_device(device, iov[i].addr,
- iov[i].length, DMA_TO_DEVICE);
+ for (i = 0; i < send_wr->num_sge; i++)
+ ib_dma_sync_single_for_device(device, sge[i].addr,
+ sge[i].length, DMA_TO_DEVICE);
dprintk("RPC: %s: posting %d s/g entries\n",
- __func__, send_wr.num_sge);
+ __func__, send_wr->num_sge);

if (DECR_CQCOUNT(ep) > 0)
- send_wr.send_flags = 0;
+ send_wr->send_flags = 0;
else { /* Provider must take a send completion every now and then */
INIT_CQCOUNT(ep);
- send_wr.send_flags = IB_SEND_SIGNALED;
+ send_wr->send_flags = IB_SEND_SIGNALED;
}

- rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
+ rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
if (rc)
goto out_postsend_err;
return 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 19da45b..73e3345 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -284,10 +284,10 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_free;
- unsigned int rl_niovs;
unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
- struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
+ struct rpcrdma_rep *rl_reply;
+ struct ib_send_wr rl_send_wr;
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */


2016-08-15 20:52:10

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 14/22] xprtrdma: Move recv_wr to struct rpcrdma_rep

Clean up: The fields in the recv_wr do not vary. There is no need to
initialize them before each ib_post_recv(). This removes a large-ish
data structure from the stack.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 13 ++++++-------
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index d5e88ee..edc81ac 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -879,6 +879,10 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_cqe.done = rpcrdma_receive_wc;
rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
+ rep->rr_recv_wr.next = NULL;
+ rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
+ rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
+ rep->rr_recv_wr.num_sge = 1;
return rep;

out_free:
@@ -1283,17 +1287,12 @@ int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
struct rpcrdma_rep *rep)
{
- struct ib_recv_wr recv_wr, *recv_wr_fail;
+ struct ib_recv_wr *recv_wr_fail;
int rc;

- recv_wr.next = NULL;
- recv_wr.wr_cqe = &rep->rr_cqe;
- recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
- recv_wr.num_sge = 1;
-
if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
goto out_map;
- rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
+ rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail);
if (rc)
goto out_postrecv;
return 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 73e3345..b8d0dea 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -189,6 +189,7 @@ struct rpcrdma_rep {
struct rpcrdma_xprt *rr_rxprt;
struct work_struct rr_work;
struct list_head rr_list;
+ struct ib_recv_wr rr_recv_wr;
struct rpcrdma_regbuf *rr_rdmabuf;
};



2016-08-15 20:52:18

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 15/22] xprtrmda: Report address of frmr, not mw

Tie frwr debugging messages together by always reporting the address
of the frwr.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 892b5e1..ab9a752 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -161,7 +161,7 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
return PTR_ERR(f->fr_mr);
}

- dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
+ dprintk("RPC: %s: recovered FRMR %p\n", __func__, f);
f->fr_state = FRMR_IS_INVALID;
return 0;
}
@@ -396,7 +396,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
goto out_mapmr_err;

dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n",
- __func__, mw, mw->mw_nents, mr->length);
+ __func__, frmr, mw->mw_nents, mr->length);

key = (u8)(mr->rkey & 0x000000FF);
ib_update_fast_reg_key(mr, ++key);
@@ -449,6 +449,9 @@ __frwr_prepare_linv_wr(struct rpcrdma_mw *mw)
struct rpcrdma_frmr *f = &mw->frmr;
struct ib_send_wr *invalidate_wr;

+ dprintk("RPC: %s: invalidating frmr %p\n",
+ __func__, f);
+
f->fr_state = FRMR_IS_INVALID;
invalidate_wr = &f->fr_invwr;

@@ -521,6 +524,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
unmap:
list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
+ dprintk("RPC: %s: unmapping frmr %p\n",
+ __func__, &mw->frmr);
list_del_init(&mw->mw_list);
ib_dma_unmap_sg(ia->ri_device,
mw->mw_sg, mw->mw_nents, mw->mw_dir);


2016-08-15 20:52:27

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 16/22] rpcrdma: RDMA/CM private message data structure

Introduce data structure used by both client and server to exchange
implementation details during RDMA/CM connection establishment.

This is an experimental out-of-band exchange between Linux
RPC-over-RDMA Version One implementations, replacing the deprecated
CCP (see RFC 5666bis). The purpose of this extension is to enable
prototyping of features that might be introduced in a subsequent
version of RPC-over-RDMA.

Suggested by Christoph Hellwig and Devesh Sharma.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 35 +++++++++++++++++++++++++++++++++++
1 file changed, 35 insertions(+)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index 3b1ff38..a7da6bf 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -41,6 +41,7 @@
#define _LINUX_SUNRPC_RPC_RDMA_H

#include <linux/types.h>
+#include <linux/bitops.h>

#define RPCRDMA_VERSION 1
#define rpcrdma_version cpu_to_be32(RPCRDMA_VERSION)
@@ -129,4 +130,38 @@ enum rpcrdma_proc {
#define rdma_done cpu_to_be32(RDMA_DONE)
#define rdma_error cpu_to_be32(RDMA_ERROR)

+/*
+ * Private extension to RPC-over-RDMA Version One.
+ * Message passed during RDMA-CM connection set-up.
+ *
+ * Add new fields at the end, and don't permute existing
+ * fields.
+ */
+struct rpcrdma_connect_private {
+ __be32 cp_magic;
+ u8 cp_version;
+ u8 cp_flags;
+ u8 cp_send_size;
+ u8 cp_recv_size;
+} __packed;
+
+#define rpcrdma_cmp_magic __cpu_to_be32(0xf6ab0e18)
+
+enum {
+ RPCRDMA_CMP_VERSION = 1,
+ RPCRDMA_CMP_F_SND_W_INV_OK = BIT(0),
+};
+
+static inline u8
+rpcrdma_encode_buffer_size(unsigned int size)
+{
+ return (size >> 10) - 1;
+}
+
+static inline unsigned int
+rpcrdma_decode_buffer_size(u8 val)
+{
+ return ((unsigned int)val + 1) << 10;
+}
+
#endif /* _LINUX_SUNRPC_RPC_RDMA_H */


2016-08-15 20:52:35

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 17/22] xprtrdma: Client-side support for rpcrdma_connect_private

Send an RDMA-CM private message on connect, and look for one during
a connection-established event.

Both sides can communicate their various implementation limits.
Implementations that don't support this sideband protocol ignore it.

Once the client knows the server's inline threshold maxima, it can
adjust the use of Reply chunks, and eliminate most use of Position
Zero Read chunks. Moderately-sized I/O can be done using a pure
inline RDMA Send instead of RDMA operations that require memory
registration.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/rpc_rdma.h | 4 ++++
net/sunrpc/xprtrdma/fmr_ops.c | 5 ++---
net/sunrpc/xprtrdma/frwr_ops.c | 5 ++---
net/sunrpc/xprtrdma/rpc_rdma.c | 8 +++++---
net/sunrpc/xprtrdma/verbs.c | 41 ++++++++++++++++++++++++++++++++++++---
net/sunrpc/xprtrdma/xprt_rdma.h | 6 +++---
6 files changed, 54 insertions(+), 15 deletions(-)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index a7da6bf..cfda6ad 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -46,6 +46,10 @@
#define RPCRDMA_VERSION 1
#define rpcrdma_version cpu_to_be32(RPCRDMA_VERSION)

+enum {
+ RPCRDMA_V1_DEF_INLINE_SIZE = 1024,
+};
+
struct rpcrdma_segment {
__be32 rs_handle; /* Registered memory handle */
__be32 rs_length; /* Length of the chunk in bytes */
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 21cb3b1..16690a1 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -160,9 +160,8 @@ static int
fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct rpcrdma_create_data_internal *cdata)
{
- rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
- RPCRDMA_MAX_DATA_SEGS /
- RPCRDMA_MAX_FMR_SGES));
+ ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
+ RPCRDMA_MAX_FMR_SGES);
return 0;
}

diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index ab9a752..977d56a 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -242,9 +242,8 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
depth;
}

- rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
- RPCRDMA_MAX_DATA_SEGS /
- ia->ri_max_frmr_depth));
+ ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
+ ia->ri_max_frmr_depth);
return 0;
}

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c2906e3..ea734c2 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -118,10 +118,12 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
return size;
}

-void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
- struct rpcrdma_create_data_internal *cdata,
- unsigned int maxsegs)
+void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ unsigned int maxsegs = ia->ri_max_segs;
+
ia->ri_max_inline_write = cdata->inline_wsize -
rpcrdma_max_call_header_size(maxsegs);
ia->ri_max_inline_read = cdata->inline_rsize -
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index edc81ac..431d16d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -203,6 +203,34 @@ out_fail:
goto out_schedule;
}

+static void
+rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
+ struct rdma_conn_param *param)
+{
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ const struct rpcrdma_connect_private *pmsg = param->private_data;
+ unsigned int rsize, wsize;
+
+ rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
+ wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
+
+ if (pmsg &&
+ pmsg->cp_magic == rpcrdma_cmp_magic &&
+ pmsg->cp_version == RPCRDMA_CMP_VERSION) {
+ rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
+ wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
+
+ pr_info("rpcrdma: server reports rsize %u, wsize %u\n",
+ rsize, wsize);
+ }
+
+ if (rsize < cdata->inline_rsize)
+ cdata->inline_rsize = rsize;
+ if (wsize < cdata->inline_wsize)
+ cdata->inline_wsize = wsize;
+ rpcrdma_set_max_header_sizes(r_xprt);
+}
+
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
@@ -243,6 +271,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
" (%d initiator)\n",
__func__, attr->max_dest_rd_atomic,
attr->max_rd_atomic);
+ rpcrdma_update_connect_private(xprt, &event->param.conn);
goto connected;
case RDMA_CM_EVENT_CONNECT_ERROR:
connstate = -ENOTCONN;
@@ -453,6 +482,7 @@ int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct rpcrdma_create_data_internal *cdata)
{
+ struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
struct ib_cq *sendcq, *recvcq;
unsigned int max_qp_wr;
int rc;
@@ -535,9 +565,14 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
/* Initialize cma parameters */
memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));

- /* RPC/RDMA does not use private data */
- ep->rep_remote_cma.private_data = NULL;
- ep->rep_remote_cma.private_data_len = 0;
+ /* Prepare RDMA-CM private message */
+ pmsg->cp_magic = rpcrdma_cmp_magic;
+ pmsg->cp_version = RPCRDMA_CMP_VERSION;
+ pmsg->cp_flags = 0;
+ pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
+ pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
+ ep->rep_remote_cma.private_data = pmsg;
+ ep->rep_remote_cma.private_data_len = sizeof(*pmsg);

/* Client offers RDMA Read but does not initiate */
ep->rep_remote_cma.initiator_depth = 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index b8d0dea..a41dac2 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -70,6 +70,7 @@ struct rpcrdma_ia {
struct ib_pd *ri_pd;
struct completion ri_done;
int ri_async_rc;
+ unsigned int ri_max_segs;
unsigned int ri_max_frmr_depth;
unsigned int ri_max_inline_write;
unsigned int ri_max_inline_read;
@@ -87,6 +88,7 @@ struct rpcrdma_ep {
int rep_connected;
struct ib_qp_init_attr rep_attr;
wait_queue_head_t rep_connect_wait;
+ struct rpcrdma_connect_private rep_cm_private;
struct rdma_conn_param rep_remote_cma;
struct sockaddr_storage rep_remote_addr;
struct delayed_work rep_connect_worker;
@@ -522,9 +524,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
int rpcrdma_marshal_req(struct rpc_rqst *);
-void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *,
- struct rpcrdma_create_data_internal *,
- unsigned int);
+void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);

/* RPC/RDMA module init - xprtrdma/transport.c
*/


2016-08-15 20:52:43

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 18/22] xprtrdma: Basic support for Remote Invalidation

Have frwr's ro_unmap_sync recognize an invalidated rkey that appears
as part of a Receive completion. Local invalidation can be skipped
for that rkey.

Use an out-of-band signaling mechanism to indicate to the server
that the client is prepared to receive RDMA Send With Invalidate.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 2 ++
net/sunrpc/xprtrdma/frwr_ops.c | 13 +++++++++++++
net/sunrpc/xprtrdma/rpc_rdma.c | 18 ++++++++++++++----
net/sunrpc/xprtrdma/transport.c | 5 +++--
net/sunrpc/xprtrdma/verbs.c | 8 +++++++-
net/sunrpc/xprtrdma/xprt_rdma.h | 5 +++++
6 files changed, 44 insertions(+), 7 deletions(-)

diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 16690a1..1ebb09e 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -273,6 +273,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
list_for_each_entry(mw, &req->rl_registered, mw_list)
list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
+ r_xprt->rx_stats.local_inv_needed++;
rc = ib_unmap_fmr(&unmap_list);
if (rc)
goto out_reset;
@@ -330,4 +331,5 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_init_mr = fmr_op_init_mr,
.ro_release_mr = fmr_op_release_mr,
.ro_displayname = "fmr",
+ .ro_send_w_inv_ok = 0,
};
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 977d56a..2e51ef5 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -67,6 +67,8 @@
* pending send queue WRs before the transport is reconnected.
*/

+#include <linux/sunrpc/rpc_rdma.h>
+
#include "xprt_rdma.h"

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -474,6 +476,7 @@ static void
frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
+ struct rpcrdma_rep *rep = req->rl_reply;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mw *mw, *tmp;
struct rpcrdma_frmr *f;
@@ -489,6 +492,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
f = NULL;
invalidate_wrs = pos = prev = NULL;
list_for_each_entry(mw, &req->rl_registered, mw_list) {
+ if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
+ (mw->mw_handle == rep->rr_inv_rkey)) {
+ mw->frmr.fr_state = FRMR_IS_INVALID;
+ continue;
+ }
+
pos = __frwr_prepare_linv_wr(mw);

if (!invalidate_wrs)
@@ -498,6 +507,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
prev = pos;
f = &mw->frmr;
}
+ if (!f)
+ goto unmap;

/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
@@ -512,6 +523,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* replaces the QP. The RPC reply handler won't call us
* unless ri_id->qp is a valid pointer.
*/
+ r_xprt->rx_stats.local_inv_needed++;
rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
if (rc)
goto reset_mrs;
@@ -580,4 +592,5 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_init_mr = frwr_op_init_mr,
.ro_release_mr = frwr_op_release_mr,
.ro_displayname = "frwr",
+ .ro_send_w_inv_ok = RPCRDMA_CMP_F_SND_W_INV_OK,
};
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index ea734c2..31a434d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -231,7 +231,8 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)

static int
rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
- enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
+ enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg,
+ bool reminv_expected)
{
int len, n, p, page_base;
struct page **ppages;
@@ -273,6 +274,13 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
if (type == rpcrdma_readch)
return n;

+ /* When encoding the Write list, some servers need to see an extra
+ * segment for odd-length Write chunks. The upper layer provides
+ * space in the tail iovec for this purpose.
+ */
+ if (type == rpcrdma_writech && reminv_expected)
+ return n;
+
if (xdrbuf->tail[0].iov_len) {
/* the rpcrdma protocol allows us to omit any trailing
* xdr pad bytes, saving the server an RDMA operation. */
@@ -329,7 +337,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
if (rtype == rpcrdma_areadch)
pos = 0;
seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, false);
if (nsegs < 0)
return ERR_PTR(nsegs);

@@ -393,7 +401,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
rqst->rq_rcv_buf.head[0].iov_len,
- wtype, seg);
+ wtype, seg,
+ r_xprt->rx_ia.ri_reminv_expected);
if (nsegs < 0)
return ERR_PTR(nsegs);

@@ -458,7 +467,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
}

seg = req->rl_segments;
- nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+ r_xprt->rx_ia.ri_reminv_expected);
if (nsegs < 0)
return ERR_PTR(nsegs);

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 3f50c83..0ee841f 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -729,10 +729,11 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
r_xprt->rx_stats.failed_marshal_count,
r_xprt->rx_stats.bad_reply_count,
r_xprt->rx_stats.nomsg_call_count);
- seq_printf(seq, "%lu %lu %lu\n",
+ seq_printf(seq, "%lu %lu %lu %lu\n",
r_xprt->rx_stats.mrs_recovered,
r_xprt->rx_stats.mrs_orphaned,
- r_xprt->rx_stats.mrs_allocated);
+ r_xprt->rx_stats.mrs_allocated,
+ r_xprt->rx_stats.local_inv_needed);
}

static int
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 431d16d..55ce5a8 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -184,6 +184,9 @@ rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
__func__, rep, wc->byte_len);

rep->rr_len = wc->byte_len;
+ rep->rr_wc_flags = wc->wc_flags;
+ rep->rr_inv_rkey = wc->ex.invalidate_rkey;
+
ib_dma_sync_single_for_cpu(rep->rr_device,
rdmab_addr(rep->rr_rdmabuf),
rep->rr_len, DMA_FROM_DEVICE);
@@ -211,12 +214,15 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
const struct rpcrdma_connect_private *pmsg = param->private_data;
unsigned int rsize, wsize;

+ /* Default settings for RPC-over-RDMA Version One */
+ r_xprt->rx_ia.ri_reminv_expected = false;
rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
wsize = RPCRDMA_V1_DEF_INLINE_SIZE;

if (pmsg &&
pmsg->cp_magic == rpcrdma_cmp_magic &&
pmsg->cp_version == RPCRDMA_CMP_VERSION) {
+ r_xprt->rx_ia.ri_reminv_expected = true;
rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);

@@ -568,7 +574,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
/* Prepare RDMA-CM private message */
pmsg->cp_magic = rpcrdma_cmp_magic;
pmsg->cp_version = RPCRDMA_CMP_VERSION;
- pmsg->cp_flags = 0;
+ pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
ep->rep_remote_cma.private_data = pmsg;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a41dac2..36b7180 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -74,6 +74,7 @@ struct rpcrdma_ia {
unsigned int ri_max_frmr_depth;
unsigned int ri_max_inline_write;
unsigned int ri_max_inline_read;
+ bool ri_reminv_expected;
struct ib_qp_attr ri_qp_attr;
struct ib_qp_init_attr ri_qp_init_attr;
};
@@ -187,6 +188,8 @@ enum {
struct rpcrdma_rep {
struct ib_cqe rr_cqe;
unsigned int rr_len;
+ int rr_wc_flags;
+ u32 rr_inv_rkey;
struct ib_device *rr_device;
struct rpcrdma_xprt *rr_rxprt;
struct work_struct rr_work;
@@ -384,6 +387,7 @@ struct rpcrdma_stats {
unsigned long mrs_recovered;
unsigned long mrs_orphaned;
unsigned long mrs_allocated;
+ unsigned long local_inv_needed;
};

/*
@@ -407,6 +411,7 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_mw *);
void (*ro_release_mr)(struct rpcrdma_mw *);
const char *ro_displayname;
+ const int ro_send_w_inv_ok;
};

extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;


2016-08-15 20:52:51

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 19/22] xprtrdma: Use gathered Send for large inline messages

An RPC Call message that is sent inline but that has a data payload
(ie, one or more items in rq_snd_buf's page list) must be "pulled
up:"

- call_allocate has to reserve enough RPC Call buffer space to
accommodate the data payload

- call_transmit has to memcopy the rq_snd_buf's page list and tail
into its head iovec before it is sent

As the inline threshold is increased beyond its current 1KB default,
however, this means data payloads of more than a few dozen bytes
will be copied by the host CPU. For example, if the inline threshold
is increased to 4KB, then NFS WRITE requests up to 4KB would involve
a memcpy of the NFS WRITE's payload data into the RPC Call buffer.
This is an undesirable amount of participation by the host CPU.

The inline threshold may be larger than 4KB in the future.

Instead of copying the components of rq_snd_buf into its head iovec,
construct a gather list of these components, and send them all in
place. The same approach is already used in the Linux server's
RPC-over-RDMA reply path.

This same mechanism eliminates the need for rpcrdma_tail_pullup,
which is used to manage the XDR pad and trailing inline content when
a Read list is present.

This requires that the pages in rq_snd_buf's page list be DMA-mapped
during marshaling, and unmapped when a data-bearing RPC is
completed. This is slightly less efficient for very small I/O
payloads, but significantly more efficient as payload size and
inline threshold increase past a kilobyte.

Note: xprtrdma does not typically signal Sends. So we are playing
just a little fast and loose by assuming that the arrival of an RPC
Reply means the earlier Send of the matching RPC Call has completed
and its Send buffer can be re-cycled.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 16 +-
net/sunrpc/xprtrdma/rpc_rdma.c | 265 ++++++++++++++++++++++++++-----------
net/sunrpc/xprtrdma/transport.c | 14 --
net/sunrpc/xprtrdma/verbs.c | 13 +-
net/sunrpc/xprtrdma/xprt_rdma.h | 21 +++
5 files changed, 216 insertions(+), 113 deletions(-)

diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 61a58f5..e3cecde 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -229,20 +229,12 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
__func__, (int)rpclen, rqst->rq_svec[0].iov_base);
#endif

- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
+ req->rl_send_wr.num_sge = 0;
+ req->rl_mapped_pages = 0;
+ if (!rpcrdma_prepare_hdr_sge(r_xprt, req, RPCRDMA_HDRLEN_MIN))
goto out_map;
- req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
- req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
- req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
-
- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
+ if (!rpcrdma_prepare_msg_sge(r_xprt, req, &rqst->rq_snd_buf))
goto out_map;
- req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
- req->rl_send_iov[1].length = rpclen;
- req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
-
- req->rl_send_wr.num_sge = 2;
-
return 0;

out_map:
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 31a434d..e125a86 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -503,74 +503,191 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
return iptr;
}

-/*
- * Copy write data inline.
- * This function is used for "small" requests. Data which is passed
- * to RPC via iovecs (or page list) is copied directly into the
- * pre-registered memory buffer for this request. For small amounts
- * of data, this is efficient. The cutoff value is tunable.
+static void
+rpcrdma_dma_sync_sge(struct rpcrdma_xprt *r_xprt, struct ib_sge *sge)
+{
+ struct ib_device *device = r_xprt->rx_ia.ri_device;
+
+ ib_dma_sync_single_for_device(device, sge->addr,
+ sge->length, DMA_TO_DEVICE);
+}
+
+/* Prepare the RPC-over-RDMA header SGE.
*/
-static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
+bool
+rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ u32 len)
{
- int i, npages, curlen;
- int copy_len;
- unsigned char *srcp, *destp;
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
- int page_base;
- struct page **ppages;
+ struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
+ struct ib_sge *sge = &req->rl_send_sge[0];
+
+ if (!rpcrdma_regbuf_is_mapped(rb)) {
+ if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb))
+ return false;
+ sge->addr = rdmab_addr(rb);
+ sge->lkey = rdmab_lkey(rb);
+ }
+ sge->length = len;
+
+ rpcrdma_dma_sync_sge(r_xprt, sge);

- destp = rqst->rq_svec[0].iov_base;
- curlen = rqst->rq_svec[0].iov_len;
- destp += curlen;
+ dprintk("RPC: %s: hdr: sge[0]: [%p, %u]\n",
+ __func__, (void *)sge->addr, sge->length);
+ req->rl_send_wr.num_sge++;
+ return true;
+}
+
+/* Prepare the RPC payload SGE. The tail iovec is pulled up into
+ * the head, in case it is not in the same page (krb5p). The
+ * page list is skipped: either it is going via RDMA Read, or we
+ * already know for sure there is no page list.
+ */
+bool
+rpcrdma_prepare_msg_sge(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ struct xdr_buf *xdr)
+{
+ struct rpcrdma_regbuf *rb = req->rl_sendbuf;
+ struct ib_sge *sge = &req->rl_send_sge[1];

- dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n",
- __func__, destp, rqst->rq_slen, curlen);
+ /* covers both the head and tail iovecs */
+ if (!rpcrdma_regbuf_is_mapped(rb))
+ if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb))
+ return false;

- copy_len = rqst->rq_snd_buf.page_len;
+ /* head iovec */
+ sge->addr = rdmab_addr(rb);
+ sge->length = xdr->head[0].iov_len;
+ sge->lkey = rdmab_lkey(rb);

- if (rqst->rq_snd_buf.tail[0].iov_len) {
- curlen = rqst->rq_snd_buf.tail[0].iov_len;
- if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
- memmove(destp + copy_len,
- rqst->rq_snd_buf.tail[0].iov_base, curlen);
- r_xprt->rx_stats.pullup_copy_count += curlen;
+ /* tail iovec */
+ sge->length += rpcrdma_tail_pullup(xdr);
+
+ rpcrdma_dma_sync_sge(r_xprt, sge);
+
+ dprintk("RPC: %s: head: sge[1]: [%p, %u]\n",
+ __func__, (void *)sge->addr, sge->length);
+ req->rl_send_wr.num_sge++;
+ return true;
+}
+
+/* Prepare the Send SGEs. The head and tail iovec, and each entry
+ * in the page list, gets its own SGE.
+ */
+bool
+rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ struct xdr_buf *xdr)
+{
+ struct rpcrdma_regbuf *rb = req->rl_sendbuf;
+ unsigned int i, page_base, len, remaining;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct ib_sge *sge = req->rl_send_sge;
+ struct page **ppages;
+ bool result;
+
+ i = 1;
+ result = false;
+
+ /* covers both the head and tail iovecs */
+ if (!rpcrdma_regbuf_is_mapped(rb))
+ if (!__rpcrdma_dma_map_regbuf(ia, rb))
+ goto out;
+
+ sge[i].addr = rdmab_addr(rb);
+ sge[i].length = xdr->head[0].iov_len;
+ sge[i].lkey = rdmab_lkey(rb);
+ rpcrdma_dma_sync_sge(r_xprt, &sge[i]);
+ dprintk("RPC: %s: head: sge[%u]: [%p, %u]\n",
+ __func__, i, (void *)sge[i].addr, sge[i].length);
+
+ if (!xdr->page_len)
+ goto tail;
+
+ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+ page_base = xdr->page_base & ~PAGE_MASK;
+ remaining = xdr->page_len;
+ while (remaining) {
+ i++;
+ if (i > RPCRDMA_MAX_SEND_SGES - 2) {
+ pr_err("rpcrdma: too many Send SGEs (%u)\n", i);
+ goto out;
}
- dprintk("RPC: %s: tail destp 0x%p len %d\n",
- __func__, destp + copy_len, curlen);
- rqst->rq_svec[0].iov_len += curlen;
- }
- r_xprt->rx_stats.pullup_copy_count += copy_len;

- page_base = rqst->rq_snd_buf.page_base;
- ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
- page_base &= ~PAGE_MASK;
- npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
- for (i = 0; copy_len && i < npages; i++) {
- curlen = PAGE_SIZE - page_base;
- if (curlen > copy_len)
- curlen = copy_len;
- dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
- __func__, i, destp, copy_len, curlen);
- srcp = kmap_atomic(ppages[i]);
- memcpy(destp, srcp+page_base, curlen);
- kunmap_atomic(srcp);
- rqst->rq_svec[0].iov_len += curlen;
- destp += curlen;
- copy_len -= curlen;
+ len = min_t(u32, PAGE_SIZE - page_base, remaining);
+ sge[i].addr = ib_dma_map_page(ia->ri_device, *ppages,
+ page_base, len,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(ia->ri_device, sge->addr)) {
+ pr_err("rpcrdma: Send mapping error\n");
+ goto out;
+ }
+ sge[i].length = len;
+ sge[i].lkey = ia->ri_pd->local_dma_lkey;
+
+ dprintk("RPC: %s: page: sge[%u]: [%p, %u]\n",
+ __func__, i, (void *)sge[i].addr, sge[i].length);
+ req->rl_mapped_pages++;
+ ppages++;
+ remaining -= len;
page_base = 0;
}
- /* header now contains entire send message */
+
+tail:
+ if (xdr->tail[0].iov_len) {
+ unsigned char *destp;
+
+ /* The tail iovec is not always constructed in the same
+ * page where the head iovec resides (see, for example,
+ * gss_wrap_req_priv). Check for that, and if needed,
+ * move the tail into the rb so that it is properly
+ * DMA-mapped.
+ */
+ len = xdr->tail[0].iov_len;
+ destp = xdr->head[0].iov_base;
+ destp += xdr->head[0].iov_len;
+ if (destp != xdr->tail[0].iov_base) {
+ dprintk("RPC: %s: moving %u tail bytes\n",
+ __func__, len);
+ memmove(destp, xdr->tail[0].iov_base, len);
+ r_xprt->rx_stats.pullup_copy_count += len;
+ }
+
+ i++;
+ sge[i].addr = rdmab_addr(rb) + xdr->head[0].iov_len;
+ sge[i].length = len;
+ sge[i].lkey = rdmab_lkey(rb);
+ rpcrdma_dma_sync_sge(r_xprt, &sge[i]);
+ dprintk("RPC: %s: tail: sge[%u]: [%p, %u]\n",
+ __func__, i, (void *)sge[i].addr, sge[i].length);
+ }
+
+ i++;
+ result = true;
+
+out:
+ req->rl_send_wr.num_sge = i;
+ return result;
+}
+
+void
+rpcrdma_pages_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct ib_sge *sge;
+ unsigned int i;
+
+ for (i = 2; req->rl_mapped_pages--; i++) {
+ sge = &req->rl_send_sge[i];
+ dprintk("RPC: %s: unmapping sge[%u]: [%p, %u]\n",
+ __func__, i, (void *)sge->addr, sge->length);
+ ib_dma_unmap_page(ia->ri_device, sge->addr,
+ sge->length, DMA_TO_DEVICE);
+ }
}

/*
* Marshal a request: the primary job of this routine is to choose
* the transfer modes. See comments below.
*
- * Prepares up to two IOVs per Call message:
- *
- * [0] -- RPC RDMA header
- * [1] -- the RPC header/data
- *
* Returns zero on success, otherwise a negative errno.
*/

@@ -638,12 +755,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
*/
if (rpcrdma_args_inline(r_xprt, rqst)) {
rtype = rpcrdma_noch;
- rpcrdma_inline_pullup(rqst);
- rpclen = rqst->rq_svec[0].iov_len;
+ rpclen = rqst->rq_snd_buf.len;
} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = rpcrdma_readch;
- rpclen = rqst->rq_svec[0].iov_len;
- rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
+ rpclen = rqst->rq_snd_buf.head[0].iov_len +
+ rqst->rq_snd_buf.tail[0].iov_len;
} else {
r_xprt->rx_stats.nomsg_call_count++;
headerp->rm_type = htonl(RDMA_NOMSG);
@@ -685,39 +801,30 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
goto out_unmap;
hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;

- if (hdrlen + rpclen > r_xprt->rx_data.inline_wsize)
- goto out_overflow;
-
dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
rqst->rq_task->tk_pid, __func__,
transfertypes[rtype], transfertypes[wtype],
hdrlen, rpclen);

- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
- goto out_map;
- req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
- req->rl_send_iov[0].length = hdrlen;
- req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
-
- req->rl_send_wr.num_sge = 1;
- if (rtype == rpcrdma_areadch)
- return 0;
-
- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
+ req->rl_send_wr.num_sge = 0;
+ req->rl_mapped_pages = 0;
+ if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
goto out_map;
- req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
- req->rl_send_iov[1].length = rpclen;
- req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);

- req->rl_send_wr.num_sge = 2;
+ switch (rtype) {
+ case rpcrdma_areadch:
+ break;
+ case rpcrdma_readch:
+ if (!rpcrdma_prepare_msg_sge(r_xprt, req, &rqst->rq_snd_buf))
+ goto out_map;
+ break;
+ default:
+ if (!rpcrdma_prepare_msg_sges(r_xprt, req, &rqst->rq_snd_buf))
+ goto out_map;
+ }

return 0;

-out_overflow:
- pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
- hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
- iptr = ERR_PTR(-EIO);
-
out_unmap:
r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
return PTR_ERR(iptr);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 0ee841f..1762f59 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -499,30 +499,21 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return true;
}

-/* RPC/RDMA marshaling may choose to send payload bearing ops inline,
- * if the resulting Call message is smaller than the inline threshold.
- * The value of the "rq_callsize" argument accounts for RPC header
- * requirements, but not for the data payload in these cases.
- *
- * See rpcrdma_inline_pullup.
- */
static bool
rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
size_t size, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
- size_t min_size;

if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
return true;

- min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
- rb = rpcrdma_alloc_regbuf(min_size, DMA_TO_DEVICE, flags);
+ rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;

rpcrdma_free_regbuf(req->rl_sendbuf);
- r_xprt->rx_stats.hardway_register_count += min_size;
+ r_xprt->rx_stats.hardway_register_count += size;
req->rl_sendbuf = rb;
return true;
}
@@ -630,6 +621,7 @@ xprt_rdma_free(struct rpc_task *task)
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);

r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
+ rpcrdma_pages_unmap(r_xprt, req);
rpcrdma_buffer_put(req);
}

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 55ce5a8..f921a03 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -493,7 +493,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
unsigned int max_qp_wr;
int rc;

- if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
+ if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_SEND_SGES) {
dprintk("RPC: %s: insufficient sge's available\n",
__func__);
return -ENOMEM;
@@ -522,7 +522,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */
- ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
+ ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_SEND_SGES;
ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
@@ -891,7 +891,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
INIT_LIST_HEAD(&req->rl_registered);
req->rl_send_wr.next = NULL;
req->rl_send_wr.wr_cqe = &req->rl_cqe;
- req->rl_send_wr.sg_list = req->rl_send_iov;
+ req->rl_send_wr.sg_list = req->rl_send_sge;
req->rl_send_wr.opcode = IB_WR_SEND;
return req;
}
@@ -1288,11 +1288,9 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
- struct ib_device *device = ia->ri_device;
struct ib_send_wr *send_wr = &req->rl_send_wr;
struct ib_send_wr *send_wr_fail;
- struct ib_sge *sge = req->rl_send_iov;
- int i, rc;
+ int rc;

if (req->rl_reply) {
rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
@@ -1301,9 +1299,6 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
req->rl_reply = NULL;
}

- for (i = 0; i < send_wr->num_sge; i++)
- ib_dma_sync_single_for_device(device, sge[i].addr,
- sge[i].length, DMA_TO_DEVICE);
dprintk("RPC: %s: posting %d s/g entries\n",
__func__, send_wr->num_sge);

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 36b7180..f90da22 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -285,16 +285,27 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
char *mr_offset; /* kva if no page, else offset */
};

-#define RPCRDMA_MAX_IOVS (2)
+/* Reserve enough Send SGEs to send a maximum size inline request:
+ * - RPC-over-RDMA header
+ * - xdr_buf head iovec
+ * - RPCRDMA_MAX_INLINE bytes, possibly unaligned, in pages
+ * - xdr_buf tail iovec
+ */
+enum {
+ RPCRDMA_MAX_SEND_PAGES = PAGE_SIZE + RPCRDMA_MAX_INLINE - 1,
+ RPCRDMA_MAX_PAGE_SGES = (RPCRDMA_MAX_SEND_PAGES >> PAGE_SHIFT) + 1,
+ RPCRDMA_MAX_SEND_SGES = 1 + 1 + RPCRDMA_MAX_PAGE_SGES + 1,
+};

struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_free;
+ unsigned int rl_mapped_pages;
unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;
struct ib_send_wr rl_send_wr;
- struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
+ struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES];
struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
@@ -528,6 +539,12 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
+bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *, struct rpcrdma_req *, u32);
+bool rpcrdma_prepare_msg_sge(struct rpcrdma_xprt *, struct rpcrdma_req *,
+ struct xdr_buf *);
+bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *, struct rpcrdma_req *,
+ struct xdr_buf *);
+void rpcrdma_pages_unmap(struct rpcrdma_xprt *, struct rpcrdma_req *);
int rpcrdma_marshal_req(struct rpc_rqst *);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);



2016-08-15 20:52:59

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 20/22] xprtrdma: Support larger inline thresholds

RPC-over-RDMA Version Two will likely require at least a 4KB inline
threshold by default. The Version One inline threshold is still 1KB,
and it's automatically negotiated down to in
rpcrdma_update_connect_private.

The maximum is somewhat arbitrary. There's no fundamental
architectural limit I'm aware of, but it's good to keep the size of
Receive buffers reasonable. Now that Send can use a s/g list, a
Send buffer is only as large as each RPC requires. Receive buffers
are always the size of the inline threshold, however.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprtrdma.h | 4 ++--
net/sunrpc/xprtrdma/transport.c | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
index 39267dc..221b7a2 100644
--- a/include/linux/sunrpc/xprtrdma.h
+++ b/include/linux/sunrpc/xprtrdma.h
@@ -53,8 +53,8 @@
#define RPCRDMA_MAX_SLOT_TABLE (256U)

#define RPCRDMA_MIN_INLINE (1024) /* min inline thresh */
-#define RPCRDMA_DEF_INLINE (1024) /* default inline thresh */
-#define RPCRDMA_MAX_INLINE (3068) /* max inline thresh */
+#define RPCRDMA_DEF_INLINE (4096) /* default inline thresh */
+#define RPCRDMA_MAX_INLINE (65536) /* max inline thresh */

/* Memory registration strategies, by number.
* This is part of a kernel / user space API. Do not remove. */
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 1762f59..7eb5864 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -97,7 +97,7 @@ static struct ctl_table xr_tunables_table[] = {
.data = &xprt_rdma_max_inline_read,
.maxlen = sizeof(unsigned int),
.mode = 0644,
- .proc_handler = proc_dointvec,
+ .proc_handler = proc_dointvec_minmax,
.extra1 = &min_inline_size,
.extra2 = &max_inline_size,
},
@@ -106,7 +106,7 @@ static struct ctl_table xr_tunables_table[] = {
.data = &xprt_rdma_max_inline_write,
.maxlen = sizeof(unsigned int),
.mode = 0644,
- .proc_handler = proc_dointvec,
+ .proc_handler = proc_dointvec_minmax,
.extra1 = &min_inline_size,
.extra2 = &max_inline_size,
},


2016-08-15 20:53:07

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 21/22] xprtrdma: Rename rpcrdma_receive_wc()

Clean up: When converting xprtrdma to use the new CQ API, I missed a
spot. The naming convention elsewhere is:

{svc_rdma,rpcrdma}_wc_{operation}

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f921a03..ef7177a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -160,13 +160,13 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
}

/**
- * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
*/
static void
-rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
+rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
{
struct ib_cqe *cqe = wc->wr_cqe;
struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
@@ -917,7 +917,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
}

rep->rr_device = ia->ri_device;
- rep->rr_cqe.done = rpcrdma_receive_wc;
+ rep->rr_cqe.done = rpcrdma_wc_receive;
rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
rep->rr_recv_wr.next = NULL;


2016-08-15 20:53:16

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v1 22/22] xprtrdma: Eliminate rpcrdma_receive_worker()

Clean up: the extra layer of indirection doesn't add value.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 4 +++-
net/sunrpc/xprtrdma/verbs.c | 11 +----------
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +-
3 files changed, 5 insertions(+), 12 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index e125a86..40bd4b4 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1045,8 +1045,10 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
* allowed to timeout, to discover the errors at that time.
*/
void
-rpcrdma_reply_handler(struct rpcrdma_rep *rep)
+rpcrdma_reply_handler(struct work_struct *work)
{
+ struct rpcrdma_rep *rep =
+ container_of(work, struct rpcrdma_rep, rr_work);
struct rpcrdma_msg *headerp;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ef7177a..c2dc0f7 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -128,15 +128,6 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
wc->status, wc->vendor_err);
}

-static void
-rpcrdma_receive_worker(struct work_struct *work)
-{
- struct rpcrdma_rep *rep =
- container_of(work, struct rpcrdma_rep, rr_work);
-
- rpcrdma_reply_handler(rep);
-}
-
/* Perform basic sanity checking to avoid using garbage
* to update the credit grant value.
*/
@@ -919,7 +910,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_device = ia->ri_device;
rep->rr_cqe.done = rpcrdma_wc_receive;
rep->rr_rxprt = r_xprt;
- INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
+ INIT_WORK(&rep->rr_work, rpcrdma_reply_handler);
rep->rr_recv_wr.next = NULL;
rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index f90da22..f062f7d 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -534,7 +534,7 @@ rpcrdma_data_dir(bool writing)
*/
void rpcrdma_connect_worker(struct work_struct *);
void rpcrdma_conn_func(struct rpcrdma_ep *);
-void rpcrdma_reply_handler(struct rpcrdma_rep *);
+void rpcrdma_reply_handler(struct work_struct *);

/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c


2016-08-15 21:47:36

by kernel test robot

[permalink] [raw]
Subject: Re: [PATCH v1 19/22] xprtrdma: Use gathered Send for large inline messages

Hi Chuck,

[auto build test WARNING on nfs/linux-next]
[also build test WARNING on v4.8-rc2 next-20160815]
[if your patch is applied to the wrong git tree, please drop us a note to help improve the system]

url: https://github.com/0day-ci/linux/commits/Chuck-Lever/client-side-NFS-RDMA-patches-proposed-for-v4-9/20160816-050417
base: git://git.linux-nfs.org/projects/trondmy/linux-nfs.git linux-next
config: xtensa-allmodconfig (attached as .config)
compiler: xtensa-linux-gcc (GCC) 4.9.0
reproduce:
wget https://git.kernel.org/cgit/linux/kernel/git/wfg/lkp-tests.git/plain/sbin/make.cross -O ~/bin/make.cross
chmod +x ~/bin/make.cross
# save the attached .config to linux build tree
make.cross ARCH=xtensa

All warnings (new ones prefixed by >>):

In file included from include/linux/sunrpc/types.h:14:0,
from include/linux/sunrpc/sched.h:14,
from include/linux/sunrpc/clnt.h:18,
from net/sunrpc/xprtrdma/xprt_rdma.h:51,
from net/sunrpc/xprtrdma/rpc_rdma.c:48:
net/sunrpc/xprtrdma/rpc_rdma.c: In function 'rpcrdma_prepare_hdr_sge':
>> net/sunrpc/xprtrdma/rpc_rdma.c:535:13: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
__func__, (void *)sge->addr, sge->length);
^
include/linux/sunrpc/debug.h:33:24: note: in definition of macro 'dfprintk'
printk(KERN_DEFAULT args); \
^
>> net/sunrpc/xprtrdma/rpc_rdma.c:534:2: note: in expansion of macro 'dprintk'
dprintk("RPC: %s: hdr: sge[0]: [%p, %u]\n",
^
net/sunrpc/xprtrdma/rpc_rdma.c: In function 'rpcrdma_prepare_msg_sge':
net/sunrpc/xprtrdma/rpc_rdma.c:568:13: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
__func__, (void *)sge->addr, sge->length);
^
include/linux/sunrpc/debug.h:33:24: note: in definition of macro 'dfprintk'
printk(KERN_DEFAULT args); \
^
net/sunrpc/xprtrdma/rpc_rdma.c:567:2: note: in expansion of macro 'dprintk'
dprintk("RPC: %s: head: sge[1]: [%p, %u]\n",
^
net/sunrpc/xprtrdma/rpc_rdma.c: In function 'rpcrdma_prepare_msg_sges':
net/sunrpc/xprtrdma/rpc_rdma.c:600:16: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
__func__, i, (void *)sge[i].addr, sge[i].length);
^
include/linux/sunrpc/debug.h:33:24: note: in definition of macro 'dfprintk'
printk(KERN_DEFAULT args); \
^
net/sunrpc/xprtrdma/rpc_rdma.c:599:2: note: in expansion of macro 'dprintk'
dprintk("RPC: %s: head: sge[%u]: [%p, %u]\n",
^
net/sunrpc/xprtrdma/rpc_rdma.c:627:17: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
__func__, i, (void *)sge[i].addr, sge[i].length);
^
include/linux/sunrpc/debug.h:33:24: note: in definition of macro 'dfprintk'
printk(KERN_DEFAULT args); \
^
net/sunrpc/xprtrdma/rpc_rdma.c:626:3: note: in expansion of macro 'dprintk'
dprintk("RPC: %s: page: sge[%u]: [%p, %u]\n",
^
net/sunrpc/xprtrdma/rpc_rdma.c:660:17: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
__func__, i, (void *)sge[i].addr, sge[i].length);
^
include/linux/sunrpc/debug.h:33:24: note: in definition of macro 'dfprintk'
printk(KERN_DEFAULT args); \
^
net/sunrpc/xprtrdma/rpc_rdma.c:659:3: note: in expansion of macro 'dprintk'
dprintk("RPC: %s: tail: sge[%u]: [%p, %u]\n",
^
net/sunrpc/xprtrdma/rpc_rdma.c: In function 'rpcrdma_pages_unmap':
net/sunrpc/xprtrdma/rpc_rdma.c:681:17: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
__func__, i, (void *)sge->addr, sge->length);
^
include/linux/sunrpc/debug.h:33:24: note: in definition of macro 'dfprintk'
printk(KERN_DEFAULT args); \
^
net/sunrpc/xprtrdma/rpc_rdma.c:680:3: note: in expansion of macro 'dprintk'
dprintk("RPC: %s: unmapping sge[%u]: [%p, %u]\n",
^

vim +535 net/sunrpc/xprtrdma/rpc_rdma.c

528 sge->lkey = rdmab_lkey(rb);
529 }
530 sge->length = len;
531
532 rpcrdma_dma_sync_sge(r_xprt, sge);
533
> 534 dprintk("RPC: %s: hdr: sge[0]: [%p, %u]\n",
> 535 __func__, (void *)sge->addr, sge->length);
536 req->rl_send_wr.num_sge++;
537 return true;
538 }

---
0-DAY kernel test infrastructure Open Source Technology Center
https://lists.01.org/pipermail/kbuild-all Intel Corporation


Attachments:
(No filename) (4.68 kB)
.config.gz (44.96 kB)
Download all attachments

2016-08-15 21:51:19

by Trond Myklebust

[permalink] [raw]
Subject: Re: [PATCH v1 02/22] SUNRPC: Refactor rpc_xdr_buf_init()


> On Aug 15, 2016, at 16:50, Chuck Lever <[email protected]> wrote:
>=20
> Clean up: there is some XDR initialization logic that is commoon
> to the forward channel and backchannel. Move it to an XDR header
> so it can be shared.
>=20
> rpc_rqst::rq_buffer points to a buffer containing big-endian data.
> Update its annotation as part of the clean up.
>=20
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/xdr.h | 10 ++++++++++
> include/linux/sunrpc/xprt.h | 2 +-
> net/sunrpc/backchannel_rqst.c | 8 +-------
> net/sunrpc/clnt.c | 24 ++++++------------------
> net/sunrpc/xprtrdma/backchannel.c | 12 +-----------
> 5 files changed, 19 insertions(+), 37 deletions(-)
>=20
> diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
> index 70c6b92..551569c 100644
> --- a/include/linux/sunrpc/xdr.h
> +++ b/include/linux/sunrpc/xdr.h
> @@ -67,6 +67,16 @@ struct xdr_buf {
> =09=09=09len;=09=09/* Length of XDR encoded message */
> };
>=20
> +static inline void
> +xdr_buf_init(struct xdr_buf *buf, __be32 *start, size_t len)
> +{
> +=09memset(buf, 0, sizeof(*buf));

Can we please leave this as it was. There is no need to zero out unused fie=
lds such as the pages pointer, the tail iov_base, etc.

> +
> +=09buf->head[0].iov_base =3D start;
> +=09buf->head[0].iov_len =3D len;
> +=09buf->buflen =3D len;
> +}
> +
> /*
> * pre-xdr'ed macros.
> */
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index a16070d..559dad3 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -83,7 +83,7 @@ struct rpc_rqst {
> =09void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages=
*/
> =09struct list_head=09rq_list;
>=20
> -=09__u32 *=09=09=09rq_buffer;=09/* XDR encode buffer */
> +=09__be32=09=09=09*rq_buffer;=09/* Call XDR encode buffer */

If changing it, why not make it a void* ? Nothing should ever be touching t=
he contents of this buffer directly.

> =09size_t=09=09=09rq_callsize,
> =09=09=09=09rq_rcvsize;
> =09size_t=09=09=09rq_xmit_bytes_sent;=09/* total bytes sent */
> diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.=
c
> index 229956b..ac701c2 100644
> --- a/net/sunrpc/backchannel_rqst.c
> +++ b/net/sunrpc/backchannel_rqst.c
> @@ -76,13 +76,7 @@ static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp=
_t gfp_flags)
> =09page =3D alloc_page(gfp_flags);
> =09if (page =3D=3D NULL)
> =09=09return -ENOMEM;
> -=09buf->head[0].iov_base =3D page_address(page);
> -=09buf->head[0].iov_len =3D PAGE_SIZE;
> -=09buf->tail[0].iov_base =3D NULL;
> -=09buf->tail[0].iov_len =3D 0;
> -=09buf->page_len =3D 0;
> -=09buf->len =3D 0;
> -=09buf->buflen =3D PAGE_SIZE;
> +=09xdr_buf_init(buf, page_address(page), PAGE_SIZE);
> =09return 0;
> }
>=20
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index 7f79fb7..0e75369 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1748,18 +1748,6 @@ rpc_task_force_reencode(struct rpc_task *task)
> =09task->tk_rqstp->rq_bytes_sent =3D 0;
> }
>=20
> -static inline void
> -rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
> -{
> -=09buf->head[0].iov_base =3D start;
> -=09buf->head[0].iov_len =3D len;
> -=09buf->tail[0].iov_len =3D 0;
> -=09buf->page_len =3D 0;
> -=09buf->flags =3D 0;
> -=09buf->len =3D 0;
> -=09buf->buflen =3D len;
> -}
> -
> /*
> * 3.=09Encode arguments of an RPC call
> */
> @@ -1772,12 +1760,12 @@ rpc_xdr_encode(struct rpc_task *task)
>=20
> =09dprint_status(task);
>=20
> -=09rpc_xdr_buf_init(&req->rq_snd_buf,
> -=09=09=09 req->rq_buffer,
> -=09=09=09 req->rq_callsize);
> -=09rpc_xdr_buf_init(&req->rq_rcv_buf,
> -=09=09=09 (char *)req->rq_buffer + req->rq_callsize,
> -=09=09=09 req->rq_rcvsize);
> +=09xdr_buf_init(&req->rq_snd_buf,
> +=09=09 req->rq_buffer,
> +=09=09 req->rq_callsize);
> +=09xdr_buf_init(&req->rq_rcv_buf,
> +=09=09 (__be32 *)((char *)req->rq_buffer + req->rq_callsize),
> +=09=09 req->rq_rcvsize);
>=20
> =09p =3D rpc_encode_header(task);
> =09if (p =3D=3D NULL) {
> diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/back=
channel.c
> index 5f60ab2..d3cfaf2 100644
> --- a/net/sunrpc/xprtrdma/backchannel.c
> +++ b/net/sunrpc/xprtrdma/backchannel.c
> @@ -38,7 +38,6 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r=
_xprt,
> =09struct rpcrdma_ia *ia =3D &r_xprt->rx_ia;
> =09struct rpcrdma_regbuf *rb;
> =09struct rpcrdma_req *req;
> -=09struct xdr_buf *buf;
> =09size_t size;
>=20
> =09req =3D rpcrdma_create_req(r_xprt);
> @@ -60,16 +59,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *=
r_xprt,
> =09req->rl_sendbuf =3D rb;
> =09/* so that rpcr_to_rdmar works when receiving a request */
> =09rqst->rq_buffer =3D (void *)req->rl_sendbuf->rg_base;
> -
> -=09buf =3D &rqst->rq_snd_buf;
> -=09buf->head[0].iov_base =3D rqst->rq_buffer;
> -=09buf->head[0].iov_len =3D 0;
> -=09buf->tail[0].iov_base =3D NULL;
> -=09buf->tail[0].iov_len =3D 0;
> -=09buf->page_len =3D 0;
> -=09buf->len =3D 0;
> -=09buf->buflen =3D size;
> -
> +=09xdr_buf_init(&rqst->rq_snd_buf, rqst->rq_buffer, size);
> =09return 0;
>=20
> out_fail:
>=20
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>=20


2016-08-15 21:59:48

by Trond Myklebust

[permalink] [raw]
Subject: Re: [PATCH v1 03/22] SUNRPC: Generalize the RPC buffer allocation API


> On Aug 15, 2016, at 16:50, Chuck Lever <[email protected]> wrote:
>=20
> xprtrdma needs to allocate the Call and Reply buffers separately.
> TBH, the reliance on using a single buffer for the pair of XDR
> buffers is transport implementation-specific.
>=20
> Transports that want to allocate separate Call and Reply buffers
> will ignore the "size" argument anyway. Don't bother passing it.
>=20
> The buf_alloc method can't return two pointers. Instead, make the
> method's return value an error code, and set the rq_buffer pointer
> in the method itself.
>=20
> This gives call_allocate an opportunity to terminate an RPC instead
> of looping forever when a permanent problem occurs. If a request is
> just bogus, or the transport is in a state where it can't allocate
> resources for any request, there needs to be a way to kill the RPC
> right there and not loop.
>=20
> This immediately fixes a rare problem in the backchannel send path,
> which loops if the server happens to send a CB request whose
> call+reply size is larger than a page (which it shouldn't do yet).
>=20
> One more issue: looks like xprt_inject_disconnect was incorrectly
> placed in the failure path in call_allocate. It needs to be in the
> success path, as it is for other call-sites.
>=20
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> include/linux/sunrpc/sched.h | 2 +-
> include/linux/sunrpc/xprt.h | 2 +-
> net/sunrpc/clnt.c | 12 ++++++++----
> net/sunrpc/sched.c | 24 +++++++++++++++--------=
-
> net/sunrpc/sunrpc.h | 2 +-
> net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 17 +++++++++--------
> net/sunrpc/xprtrdma/transport.c | 26 ++++++++++++++++++-----=
---
> net/sunrpc/xprtsock.c | 17 +++++++++++------
> 8 files changed, 64 insertions(+), 38 deletions(-)
>=20
> diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
> index 817af0b..38d4c1b 100644
> --- a/include/linux/sunrpc/sched.h
> +++ b/include/linux/sunrpc/sched.h
> @@ -239,7 +239,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_qu=
eue *,
> =09=09=09=09=09void *);
> void=09=09rpc_wake_up_status(struct rpc_wait_queue *, int);
> void=09=09rpc_delay(struct rpc_task *, unsigned long);
> -void *=09=09rpc_malloc(struct rpc_task *, size_t);
> +int=09=09rpc_malloc(struct rpc_task *);
> void=09=09rpc_free(void *);
> int=09=09rpciod_up(void);
> void=09=09rpciod_down(void);
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index 559dad3..6f0f796 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -127,7 +127,7 @@ struct rpc_xprt_ops {
> =09void=09=09(*rpcbind)(struct rpc_task *task);
> =09void=09=09(*set_port)(struct rpc_xprt *xprt, unsigned short port);
> =09void=09=09(*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
> -=09void *=09=09(*buf_alloc)(struct rpc_task *task, size_t size);
> +=09int=09=09(*buf_alloc)(struct rpc_task *task);
> =09void=09=09(*buf_free)(void *buffer);
> =09int=09=09(*send_request)(struct rpc_task *task);
> =09void=09=09(*set_retrans_timeout)(struct rpc_task *task);
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index 0e75369..601b3ca 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1693,6 +1693,7 @@ call_allocate(struct rpc_task *task)
> =09struct rpc_rqst *req =3D task->tk_rqstp;
> =09struct rpc_xprt *xprt =3D req->rq_xprt;
> =09struct rpc_procinfo *proc =3D task->tk_msg.rpc_proc;
> +=09int status;
>=20
> =09dprint_status(task);
>=20
> @@ -1718,11 +1719,14 @@ call_allocate(struct rpc_task *task)
> =09req->rq_rcvsize =3D RPC_REPHDRSIZE + slack + proc->p_replen;
> =09req->rq_rcvsize <<=3D 2;
>=20
> -=09req->rq_buffer =3D xprt->ops->buf_alloc(task,
> -=09=09=09=09=09req->rq_callsize + req->rq_rcvsize);
> -=09if (req->rq_buffer !=3D NULL)
> -=09=09return;
> +=09status =3D xprt->ops->buf_alloc(task);
> =09xprt_inject_disconnect(xprt);
> +=09if (status =3D=3D 0)
> +=09=09return;
> +=09if (status =3D=3D -EIO) {
> +=09=09rpc_exit(task, -EIO);

What does EIO mean in this context, and why should it be a fatal error? A b=
uffer allocation is not normally considered to be I/O.

> +=09=09return;
> +=09}
>=20
> =09dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
>=20
> diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
> index 9ae5885..b964d40 100644
> --- a/net/sunrpc/sched.c
> +++ b/net/sunrpc/sched.c
> @@ -849,14 +849,17 @@ static void rpc_async_schedule(struct work_struct *=
work)
> }
>=20
> /**
> - * rpc_malloc - allocate an RPC buffer
> - * @task: RPC task that will use this buffer
> - * @size: requested byte size
> + * rpc_malloc - allocate RPC buffer resources
> + * @task: RPC task
> + *
> + * A single memory region is allocated, which is split between the
> + * RPC call and RPC reply that this task is being used for. When
> + * this RPC is retired, the memory is released by calling rpc_free.
> *
> * To prevent rpciod from hanging, this allocator never sleeps,
> - * returning NULL and suppressing warning if the request cannot be servi=
ced
> - * immediately.
> - * The caller can arrange to sleep in a way that is safe for rpciod.
> + * returning -ENOMEM and suppressing warning if the request cannot
> + * be serviced immediately. The caller can arrange to sleep in a
> + * way that is safe for rpciod.
> *
> * Most requests are 'small' (under 2KiB) and can be serviced from a
> * mempool, ensuring that NFS reads and writes can always proceed,
> @@ -865,8 +868,10 @@ static void rpc_async_schedule(struct work_struct *w=
ork)
> * In order to avoid memory starvation triggering more writebacks of
> * NFS requests, we avoid using GFP_KERNEL.
> */
> -void *rpc_malloc(struct rpc_task *task, size_t size)
> +int rpc_malloc(struct rpc_task *task)
> {
> +=09struct rpc_rqst *rqst =3D task->tk_rqstp;
> +=09size_t size =3D rqst->rq_callsize + rqst->rq_rcvsize;
> =09struct rpc_buffer *buf;
> =09gfp_t gfp =3D GFP_NOIO | __GFP_NOWARN;
>=20
> @@ -880,12 +885,13 @@ void *rpc_malloc(struct rpc_task *task, size_t size=
)
> =09=09buf =3D kmalloc(size, gfp);
>=20
> =09if (!buf)
> -=09=09return NULL;
> +=09=09return -ENOMEM;
>=20
> =09buf->len =3D size;
> =09dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
> =09=09=09task->tk_pid, size, buf);
> -=09return &buf->data;
> +=09rqst->rq_buffer =3D buf->data;
> +=09return 0;
> }
> EXPORT_SYMBOL_GPL(rpc_malloc);
>=20
> diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h
> index f2b7cb5..a4f44ca 100644
> --- a/net/sunrpc/sunrpc.h
> +++ b/net/sunrpc/sunrpc.h
> @@ -34,7 +34,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DA=
MAGE.
> */
> struct rpc_buffer {
> =09size_t=09len;
> -=09char=09data[];
> +=09__be32=09data[];

Again, why? Nothing should be touching the data contents directly from this=
pointer.

> };
>=20
> static inline int rpc_reply_expected(struct rpc_task *task)
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprt=
rdma/svc_rdma_backchannel.c
> index a2a7519..b7c698f7f 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
> @@ -159,29 +159,30 @@ out_unmap:
> /* Server-side transport endpoint wants a whole page for its send
> * buffer. The client RPC code constructs the RPC header in this
> * buffer before it invokes ->send_request.
> - *
> - * Returns NULL if there was a temporary allocation failure.
> */
> -static void *
> -xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
> +static int
> +xprt_rdma_bc_allocate(struct rpc_task *task)
> {
> =09struct rpc_rqst *rqst =3D task->tk_rqstp;
> =09struct svc_xprt *sxprt =3D rqst->rq_xprt->bc_xprt;
> +=09size_t size =3D rqst->rq_callsize;
> =09struct svcxprt_rdma *rdma;
> =09struct page *page;
>=20
> =09rdma =3D container_of(sxprt, struct svcxprt_rdma, sc_xprt);
>=20
> -=09/* Prevent an infinite loop: try to make this case work */
> -=09if (size > PAGE_SIZE)
> +=09if (size > PAGE_SIZE) {
> =09=09WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
> =09=09=09 size);
> +=09=09return -EIO;
> +=09}
>=20
> =09page =3D alloc_page(RPCRDMA_DEF_GFP);
> =09if (!page)
> -=09=09return NULL;
> +=09=09return -ENOMEM;
>=20
> -=09return page_address(page);
> +=09rqst->rq_buffer =3D page_address(page);
> +=09return 0;
> }
>=20
> static void
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transp=
ort.c
> index be95ece..daa7d4d 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -477,7 +477,15 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_=
task *task)
> =09}
> }
>=20
> -/*
> +/**
> + * xprt_rdma_allocate - allocate transport resources for an RPC
> + * @task: RPC task
> + *
> + * Return values:
> + * 0:=09Success; rq_buffer points to RPC buffer to use
> + * ENOMEM:=09Out of memory, call again later
> + * EIO:=09A permanent error occurred, do not retry
> + *
> * The RDMA allocate/free functions need the task structure as a place
> * to hide the struct rpcrdma_req, which is necessary for the actual send=
/recv
> * sequence.
> @@ -486,11 +494,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc=
_task *task)
> * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffe=
r).
> * We may register rq_rcv_buf when using reply chunks.
> */
> -static void *
> -xprt_rdma_allocate(struct rpc_task *task, size_t size)
> +static int
> +xprt_rdma_allocate(struct rpc_task *task)
> {
> -=09struct rpc_xprt *xprt =3D task->tk_rqstp->rq_xprt;
> -=09struct rpcrdma_xprt *r_xprt =3D rpcx_to_rdmax(xprt);
> +=09struct rpc_rqst *rqst =3D task->tk_rqstp;
> +=09size_t size =3D rqst->rq_callsize + rqst->rq_rcvsize;
> +=09struct rpcrdma_xprt *r_xprt =3D rpcx_to_rdmax(rqst->rq_xprt);
> =09struct rpcrdma_regbuf *rb;
> =09struct rpcrdma_req *req;
> =09size_t min_size;
> @@ -498,7 +507,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size=
)
>=20
> =09req =3D rpcrdma_buffer_get(&r_xprt->rx_buf);
> =09if (req =3D=3D NULL)
> -=09=09return NULL;
> +=09=09return -ENOMEM;
>=20
> =09flags =3D RPCRDMA_DEF_GFP;
> =09if (RPC_IS_SWAPPER(task))
> @@ -515,7 +524,8 @@ out:
> =09dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req=
);
> =09req->rl_connect_cookie =3D 0;=09/* our reserved value */
> =09req->rl_task =3D task;
> -=09return req->rl_sendbuf->rg_base;
> +=09rqst->rq_buffer =3D req->rl_sendbuf->rg_base;
> +=09return 0;
>=20
> out_rdmabuf:
> =09min_size =3D r_xprt->rx_data.inline_wsize;
> @@ -558,7 +568,7 @@ out_sendbuf:
>=20
> out_fail:
> =09rpcrdma_buffer_put(req);
> -=09return NULL;
> +=09return -ENOMEM;
> }
>=20
> /*
> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> index 8ede3bc..d6db5cf 100644
> --- a/net/sunrpc/xprtsock.c
> +++ b/net/sunrpc/xprtsock.c
> @@ -2533,23 +2533,28 @@ static void xs_tcp_print_stats(struct rpc_xprt *x=
prt, struct seq_file *seq)
> * we allocate pages instead doing a kmalloc like rpc_malloc is because w=
e want
> * to use the server side send routines.
> */
> -static void *bc_malloc(struct rpc_task *task, size_t size)
> +static int bc_malloc(struct rpc_task *task)
> {
> +=09struct rpc_rqst *rqst =3D task->tk_rqstp;
> +=09size_t size =3D rqst->rq_callsize;
> =09struct page *page;
> =09struct rpc_buffer *buf;
>=20
> -=09WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer));
> -=09if (size > PAGE_SIZE - sizeof(struct rpc_buffer))
> -=09=09return NULL;
> +=09if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) {
> +=09=09WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n",
> +=09=09=09 size);
> +=09=09return -EIO;
> +=09}
>=20
> =09page =3D alloc_page(GFP_KERNEL);
> =09if (!page)
> -=09=09return NULL;
> +=09=09return -ENOMEM;
>=20
> =09buf =3D page_address(page);
> =09buf->len =3D PAGE_SIZE;
>=20
> -=09return buf->data;
> +=09rqst->rq_buffer =3D buf->data;
> +=09return 0;
> }
>=20
> /*
>=20
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>=20


2016-08-16 01:21:05

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v1 03/22] SUNRPC: Generalize the RPC buffer allocation API


> On Aug 15, 2016, at 5:59 PM, Trond Myklebust <[email protected]> wrote:
>
>>
>> On Aug 15, 2016, at 16:50, Chuck Lever <[email protected]> wrote:
>>
>> xprtrdma needs to allocate the Call and Reply buffers separately.
>> TBH, the reliance on using a single buffer for the pair of XDR
>> buffers is transport implementation-specific.
>>
>> Transports that want to allocate separate Call and Reply buffers
>> will ignore the "size" argument anyway. Don't bother passing it.
>>
>> The buf_alloc method can't return two pointers. Instead, make the
>> method's return value an error code, and set the rq_buffer pointer
>> in the method itself.
>>
>> This gives call_allocate an opportunity to terminate an RPC instead
>> of looping forever when a permanent problem occurs. If a request is
>> just bogus, or the transport is in a state where it can't allocate
>> resources for any request, there needs to be a way to kill the RPC
>> right there and not loop.
>>
>> This immediately fixes a rare problem in the backchannel send path,
>> which loops if the server happens to send a CB request whose
>> call+reply size is larger than a page (which it shouldn't do yet).
>>
>> One more issue: looks like xprt_inject_disconnect was incorrectly
>> placed in the failure path in call_allocate. It needs to be in the
>> success path, as it is for other call-sites.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> include/linux/sunrpc/sched.h | 2 +-
>> include/linux/sunrpc/xprt.h | 2 +-
>> net/sunrpc/clnt.c | 12 ++++++++----
>> net/sunrpc/sched.c | 24 +++++++++++++++---------
>> net/sunrpc/sunrpc.h | 2 +-
>> net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 17 +++++++++--------
>> net/sunrpc/xprtrdma/transport.c | 26 ++++++++++++++++++--------
>> net/sunrpc/xprtsock.c | 17 +++++++++++------
>> 8 files changed, 64 insertions(+), 38 deletions(-)
>>
>> diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
>> index 817af0b..38d4c1b 100644
>> --- a/include/linux/sunrpc/sched.h
>> +++ b/include/linux/sunrpc/sched.h
>> @@ -239,7 +239,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
>> void *);
>> void rpc_wake_up_status(struct rpc_wait_queue *, int);
>> void rpc_delay(struct rpc_task *, unsigned long);
>> -void * rpc_malloc(struct rpc_task *, size_t);
>> +int rpc_malloc(struct rpc_task *);
>> void rpc_free(void *);
>> int rpciod_up(void);
>> void rpciod_down(void);
>> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
>> index 559dad3..6f0f796 100644
>> --- a/include/linux/sunrpc/xprt.h
>> +++ b/include/linux/sunrpc/xprt.h
>> @@ -127,7 +127,7 @@ struct rpc_xprt_ops {
>> void (*rpcbind)(struct rpc_task *task);
>> void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
>> void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
>> - void * (*buf_alloc)(struct rpc_task *task, size_t size);
>> + int (*buf_alloc)(struct rpc_task *task);
>> void (*buf_free)(void *buffer);
>> int (*send_request)(struct rpc_task *task);
>> void (*set_retrans_timeout)(struct rpc_task *task);
>> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
>> index 0e75369..601b3ca 100644
>> --- a/net/sunrpc/clnt.c
>> +++ b/net/sunrpc/clnt.c
>> @@ -1693,6 +1693,7 @@ call_allocate(struct rpc_task *task)
>> struct rpc_rqst *req = task->tk_rqstp;
>> struct rpc_xprt *xprt = req->rq_xprt;
>> struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
>> + int status;
>>
>> dprint_status(task);
>>
>> @@ -1718,11 +1719,14 @@ call_allocate(struct rpc_task *task)
>> req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
>> req->rq_rcvsize <<= 2;
>>
>> - req->rq_buffer = xprt->ops->buf_alloc(task,
>> - req->rq_callsize + req->rq_rcvsize);
>> - if (req->rq_buffer != NULL)
>> - return;
>> + status = xprt->ops->buf_alloc(task);
>> xprt_inject_disconnect(xprt);
>> + if (status == 0)
>> + return;
>> + if (status == -EIO) {
>> + rpc_exit(task, -EIO);
>
> What does EIO mean in this context, and why should it be a fatal error? A buffer allocation is not normally considered to be I/O.

Here I'm using it to mean "some permanent error occurred" as
opposed to something like EAGAIN or ENOMEM, meaning "transient
error, it's OK to try again later."

So, at one point, this patch series changed xprt_rdma_allocate
to return EIO if DMA mapping could not be done on the returned
buffer. That kind of maybe qualifies as a local I/O error.

Or, in the backchannel, if the caller has specified a buffer
size that is too large, the RPC needs to be killed; otherwise
call_allocate will simply delay and call again, with the same
result over and over. In that case, EINVAL might be more
appropriate.

But -EIO is the usual status when an RPC is killed. It depends
on what you want to allow to be leaked upwards.


>> + return;
>> + }
>>
>> dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
>>
>> diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
>> index 9ae5885..b964d40 100644
>> --- a/net/sunrpc/sched.c
>> +++ b/net/sunrpc/sched.c
>> @@ -849,14 +849,17 @@ static void rpc_async_schedule(struct work_struct *work)
>> }
>>
>> /**
>> - * rpc_malloc - allocate an RPC buffer
>> - * @task: RPC task that will use this buffer
>> - * @size: requested byte size
>> + * rpc_malloc - allocate RPC buffer resources
>> + * @task: RPC task
>> + *
>> + * A single memory region is allocated, which is split between the
>> + * RPC call and RPC reply that this task is being used for. When
>> + * this RPC is retired, the memory is released by calling rpc_free.
>> *
>> * To prevent rpciod from hanging, this allocator never sleeps,
>> - * returning NULL and suppressing warning if the request cannot be serviced
>> - * immediately.
>> - * The caller can arrange to sleep in a way that is safe for rpciod.
>> + * returning -ENOMEM and suppressing warning if the request cannot
>> + * be serviced immediately. The caller can arrange to sleep in a
>> + * way that is safe for rpciod.
>> *
>> * Most requests are 'small' (under 2KiB) and can be serviced from a
>> * mempool, ensuring that NFS reads and writes can always proceed,
>> @@ -865,8 +868,10 @@ static void rpc_async_schedule(struct work_struct *work)
>> * In order to avoid memory starvation triggering more writebacks of
>> * NFS requests, we avoid using GFP_KERNEL.
>> */
>> -void *rpc_malloc(struct rpc_task *task, size_t size)
>> +int rpc_malloc(struct rpc_task *task)
>> {
>> + struct rpc_rqst *rqst = task->tk_rqstp;
>> + size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
>> struct rpc_buffer *buf;
>> gfp_t gfp = GFP_NOIO | __GFP_NOWARN;
>>
>> @@ -880,12 +885,13 @@ void *rpc_malloc(struct rpc_task *task, size_t size)
>> buf = kmalloc(size, gfp);
>>
>> if (!buf)
>> - return NULL;
>> + return -ENOMEM;
>>
>> buf->len = size;
>> dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
>> task->tk_pid, size, buf);
>> - return &buf->data;
>> + rqst->rq_buffer = buf->data;
>> + return 0;
>> }
>> EXPORT_SYMBOL_GPL(rpc_malloc);
>>
>> diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h
>> index f2b7cb5..a4f44ca 100644
>> --- a/net/sunrpc/sunrpc.h
>> +++ b/net/sunrpc/sunrpc.h
>> @@ -34,7 +34,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
>> */
>> struct rpc_buffer {
>> size_t len;
>> - char data[];
>> + __be32 data[];
>
> Again, why? Nothing should be touching the data contents directly from this pointer.

Technically, the contents of this buffer are network-byte-order,
and are accessed in four-octet pieces, so __be32 * is the correctly-
annotated type to use.

The type change in this patch and the previous aren't required for
proper function, though. I just wanted to cut down on the amount of
implicit and explicit pointer type casting that's going on. It can
be reworked if you prefer.

Thanks for the review!


>> };
>>
>> static inline int rpc_reply_expected(struct rpc_task *task)
>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>> index a2a7519..b7c698f7f 100644
>> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>> @@ -159,29 +159,30 @@ out_unmap:
>> /* Server-side transport endpoint wants a whole page for its send
>> * buffer. The client RPC code constructs the RPC header in this
>> * buffer before it invokes ->send_request.
>> - *
>> - * Returns NULL if there was a temporary allocation failure.
>> */
>> -static void *
>> -xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
>> +static int
>> +xprt_rdma_bc_allocate(struct rpc_task *task)
>> {
>> struct rpc_rqst *rqst = task->tk_rqstp;
>> struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
>> + size_t size = rqst->rq_callsize;
>> struct svcxprt_rdma *rdma;
>> struct page *page;
>>
>> rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
>>
>> - /* Prevent an infinite loop: try to make this case work */
>> - if (size > PAGE_SIZE)
>> + if (size > PAGE_SIZE) {
>> WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
>> size);
>> + return -EIO;
>> + }
>>
>> page = alloc_page(RPCRDMA_DEF_GFP);
>> if (!page)
>> - return NULL;
>> + return -ENOMEM;
>>
>> - return page_address(page);
>> + rqst->rq_buffer = page_address(page);
>> + return 0;
>> }
>>
>> static void
>> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
>> index be95ece..daa7d4d 100644
>> --- a/net/sunrpc/xprtrdma/transport.c
>> +++ b/net/sunrpc/xprtrdma/transport.c
>> @@ -477,7 +477,15 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
>> }
>> }
>>
>> -/*
>> +/**
>> + * xprt_rdma_allocate - allocate transport resources for an RPC
>> + * @task: RPC task
>> + *
>> + * Return values:
>> + * 0: Success; rq_buffer points to RPC buffer to use
>> + * ENOMEM: Out of memory, call again later
>> + * EIO: A permanent error occurred, do not retry
>> + *
>> * The RDMA allocate/free functions need the task structure as a place
>> * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
>> * sequence.
>> @@ -486,11 +494,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
>> * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
>> * We may register rq_rcv_buf when using reply chunks.
>> */
>> -static void *
>> -xprt_rdma_allocate(struct rpc_task *task, size_t size)
>> +static int
>> +xprt_rdma_allocate(struct rpc_task *task)
>> {
>> - struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
>> - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
>> + struct rpc_rqst *rqst = task->tk_rqstp;
>> + size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
>> + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
>> struct rpcrdma_regbuf *rb;
>> struct rpcrdma_req *req;
>> size_t min_size;
>> @@ -498,7 +507,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
>>
>> req = rpcrdma_buffer_get(&r_xprt->rx_buf);
>> if (req == NULL)
>> - return NULL;
>> + return -ENOMEM;
>>
>> flags = RPCRDMA_DEF_GFP;
>> if (RPC_IS_SWAPPER(task))
>> @@ -515,7 +524,8 @@ out:
>> dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
>> req->rl_connect_cookie = 0; /* our reserved value */
>> req->rl_task = task;
>> - return req->rl_sendbuf->rg_base;
>> + rqst->rq_buffer = req->rl_sendbuf->rg_base;
>> + return 0;
>>
>> out_rdmabuf:
>> min_size = r_xprt->rx_data.inline_wsize;
>> @@ -558,7 +568,7 @@ out_sendbuf:
>>
>> out_fail:
>> rpcrdma_buffer_put(req);
>> - return NULL;
>> + return -ENOMEM;
>> }
>>
>> /*
>> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
>> index 8ede3bc..d6db5cf 100644
>> --- a/net/sunrpc/xprtsock.c
>> +++ b/net/sunrpc/xprtsock.c
>> @@ -2533,23 +2533,28 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
>> * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
>> * to use the server side send routines.
>> */
>> -static void *bc_malloc(struct rpc_task *task, size_t size)
>> +static int bc_malloc(struct rpc_task *task)
>> {
>> + struct rpc_rqst *rqst = task->tk_rqstp;
>> + size_t size = rqst->rq_callsize;
>> struct page *page;
>> struct rpc_buffer *buf;
>>
>> - WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer));
>> - if (size > PAGE_SIZE - sizeof(struct rpc_buffer))
>> - return NULL;
>> + if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) {
>> + WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n",
>> + size);
>> + return -EIO;
>> + }
>>
>> page = alloc_page(GFP_KERNEL);
>> if (!page)
>> - return NULL;
>> + return -ENOMEM;
>>
>> buf = page_address(page);
>> buf->len = PAGE_SIZE;
>>
>> - return buf->data;
>> + rqst->rq_buffer = buf->data;
>> + return 0;
>> }
>>
>> /*
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever




2016-08-16 01:25:00

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v1 02/22] SUNRPC: Refactor rpc_xdr_buf_init()


> On Aug 15, 2016, at 5:51 PM, Trond Myklebust <[email protected]> wrote:
>
>>
>> On Aug 15, 2016, at 16:50, Chuck Lever <[email protected]> wrote:
>>
>> Clean up: there is some XDR initialization logic that is commoon
>> to the forward channel and backchannel. Move it to an XDR header
>> so it can be shared.
>>
>> rpc_rqst::rq_buffer points to a buffer containing big-endian data.
>> Update its annotation as part of the clean up.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> include/linux/sunrpc/xdr.h | 10 ++++++++++
>> include/linux/sunrpc/xprt.h | 2 +-
>> net/sunrpc/backchannel_rqst.c | 8 +-------
>> net/sunrpc/clnt.c | 24 ++++++------------------
>> net/sunrpc/xprtrdma/backchannel.c | 12 +-----------
>> 5 files changed, 19 insertions(+), 37 deletions(-)
>>
>> diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
>> index 70c6b92..551569c 100644
>> --- a/include/linux/sunrpc/xdr.h
>> +++ b/include/linux/sunrpc/xdr.h
>> @@ -67,6 +67,16 @@ struct xdr_buf {
>> len; /* Length of XDR encoded message */
>> };
>>
>> +static inline void
>> +xdr_buf_init(struct xdr_buf *buf, __be32 *start, size_t len)
>> +{
>> + memset(buf, 0, sizeof(*buf));
>
> Can we please leave this as it was. There is no need to zero out unused fields such as the pages pointer, the tail iov_base, etc.

In the next version, I'll make this look exactly like rpc_xdr_buf_init().


>> +
>> + buf->head[0].iov_base = start;
>> + buf->head[0].iov_len = len;
>> + buf->buflen = len;
>> +}
>> +
>> /*
>> * pre-xdr'ed macros.
>> */
>> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
>> index a16070d..559dad3 100644
>> --- a/include/linux/sunrpc/xprt.h
>> +++ b/include/linux/sunrpc/xprt.h
>> @@ -83,7 +83,7 @@ struct rpc_rqst {
>> void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
>> struct list_head rq_list;
>>
>> - __u32 * rq_buffer; /* XDR encode buffer */
>> + __be32 *rq_buffer; /* Call XDR encode buffer */
>
> If changing it, why not make it a void* ? Nothing should ever be touching the contents of this buffer directly.

I'll consider void *, I think that can be assigned to
without an explicit type cast.


>> size_t rq_callsize,
>> rq_rcvsize;
>> size_t rq_xmit_bytes_sent; /* total bytes sent */
>> diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
>> index 229956b..ac701c2 100644
>> --- a/net/sunrpc/backchannel_rqst.c
>> +++ b/net/sunrpc/backchannel_rqst.c
>> @@ -76,13 +76,7 @@ static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
>> page = alloc_page(gfp_flags);
>> if (page == NULL)
>> return -ENOMEM;
>> - buf->head[0].iov_base = page_address(page);
>> - buf->head[0].iov_len = PAGE_SIZE;
>> - buf->tail[0].iov_base = NULL;
>> - buf->tail[0].iov_len = 0;
>> - buf->page_len = 0;
>> - buf->len = 0;
>> - buf->buflen = PAGE_SIZE;
>> + xdr_buf_init(buf, page_address(page), PAGE_SIZE);
>> return 0;
>> }
>>
>> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
>> index 7f79fb7..0e75369 100644
>> --- a/net/sunrpc/clnt.c
>> +++ b/net/sunrpc/clnt.c
>> @@ -1748,18 +1748,6 @@ rpc_task_force_reencode(struct rpc_task *task)
>> task->tk_rqstp->rq_bytes_sent = 0;
>> }
>>
>> -static inline void
>> -rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
>> -{
>> - buf->head[0].iov_base = start;
>> - buf->head[0].iov_len = len;
>> - buf->tail[0].iov_len = 0;
>> - buf->page_len = 0;
>> - buf->flags = 0;
>> - buf->len = 0;
>> - buf->buflen = len;
>> -}
>> -
>> /*
>> * 3. Encode arguments of an RPC call
>> */
>> @@ -1772,12 +1760,12 @@ rpc_xdr_encode(struct rpc_task *task)
>>
>> dprint_status(task);
>>
>> - rpc_xdr_buf_init(&req->rq_snd_buf,
>> - req->rq_buffer,
>> - req->rq_callsize);
>> - rpc_xdr_buf_init(&req->rq_rcv_buf,
>> - (char *)req->rq_buffer + req->rq_callsize,
>> - req->rq_rcvsize);
>> + xdr_buf_init(&req->rq_snd_buf,
>> + req->rq_buffer,
>> + req->rq_callsize);
>> + xdr_buf_init(&req->rq_rcv_buf,
>> + (__be32 *)((char *)req->rq_buffer + req->rq_callsize),
>> + req->rq_rcvsize);
>>
>> p = rpc_encode_header(task);
>> if (p == NULL) {
>> diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
>> index 5f60ab2..d3cfaf2 100644
>> --- a/net/sunrpc/xprtrdma/backchannel.c
>> +++ b/net/sunrpc/xprtrdma/backchannel.c
>> @@ -38,7 +38,6 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
>> struct rpcrdma_ia *ia = &r_xprt->rx_ia;
>> struct rpcrdma_regbuf *rb;
>> struct rpcrdma_req *req;
>> - struct xdr_buf *buf;
>> size_t size;
>>
>> req = rpcrdma_create_req(r_xprt);
>> @@ -60,16 +59,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
>> req->rl_sendbuf = rb;
>> /* so that rpcr_to_rdmar works when receiving a request */
>> rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
>> -
>> - buf = &rqst->rq_snd_buf;
>> - buf->head[0].iov_base = rqst->rq_buffer;
>> - buf->head[0].iov_len = 0;
>> - buf->tail[0].iov_base = NULL;
>> - buf->tail[0].iov_len = 0;
>> - buf->page_len = 0;
>> - buf->len = 0;
>> - buf->buflen = size;
>> -
>> + xdr_buf_init(&rqst->rq_snd_buf, rqst->rq_buffer, size);
>> return 0;
>>
>> out_fail:
>>
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
>>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html

--
Chuck Lever