2015-12-07 20:42:24

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 0/6] NFS/RDMA server patches for 4.5

Here are patches to support server-side bi-directional RPC/RDMA
operation (to enable NFSv4.1 on RPC/RDMA transports). Thanks to
all who reviewed v1 and v2.

Also available in the "nfsd-rdma-for-4.5" topic branch of this git repo:

git://git.linux-nfs.org/projects/cel/cel-2.6.git

Or for browsing:

http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfsd-rdma-for-4.5


Changes since v2:
- Rebased on v4.4-rc4
- Backchannel code in new source file to address dprintk issues
- svc_rdma_get_context() now uses a pre-allocated cache
- Dropped svc_rdma_send clean up


Changes since v1:

- Rebased on v4.4-rc3
- Removed the use of CONFIG_SUNRPC_BACKCHANNEL
- Fixed computation of forward and backward max_requests
- Updated some comments and patch descriptions
- pr_err and pr_info converted to dprintk
- Simplified svc_rdma_get_context()
- Dropped patch removing access_flags field
- NFSv4.1 callbacks tested with for-4.5 client

---

Chuck Lever (6):
svcrdma: Do not send XDR roundup bytes for a write chunk
svcrdma: Improve allocation of struct svc_rdma_op_ctxt
svcrdma: Define maximum number of backchannel requests
svcrdma: Add infrastructure to send backwards direction RPC/RDMA calls
svcrdma: Add infrastructure to receive backwards direction RPC/RDMA replies
xprtrdma: Add class for RDMA backwards direction transport


include/linux/sunrpc/svc_rdma.h | 13 +
include/linux/sunrpc/xprt.h | 1
net/sunrpc/xprt.c | 1
net/sunrpc/xprtrdma/Makefile | 2
net/sunrpc/xprtrdma/svc_rdma.c | 17 --
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 305 ++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 51 +++++
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 68 ++++++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 104 ++++++++--
net/sunrpc/xprtrdma/transport.c | 31 ++-
net/sunrpc/xprtrdma/xprt_rdma.h | 15 +
11 files changed, 559 insertions(+), 49 deletions(-)
create mode 100644 net/sunrpc/xprtrdma/svc_rdma_backchannel.c

--
Chuck Lever


2015-12-07 20:42:33

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 1/6] svcrdma: Do not send XDR roundup bytes for a write chunk

Minor optimization: when dealing with write chunk XDR roundup, do
not post a Write WR for the zero bytes in the pad. Simply update
the write segment in the RPC-over-RDMA header to reflect the extra
pad bytes.

The Reply chunk is also a write chunk, but the server does not use
send_write_chunks() to send the Reply chunk. That's OK in this case:
the server Upper Layer typically marshals the Reply chunk contents
in a single contiguous buffer, without a separate tail for the XDR
pad.

The comments and the variable naming refer to "chunks" but what is
really meant is "segments." The existing code sends only one
xdr_write_chunk per RPC reply.

The fix assumes this as well. When the XDR pad in the first write
chunk is reached, the assumption is the Write list is complete and
send_write_chunks() returns.

That will remain a valid assumption until the server Upper Layer can
support multiple bulk payload results per RPC.

Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 7 +++++++
1 file changed, 7 insertions(+)

diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 969a1ab..bad5eaa 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -342,6 +342,13 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
arg_ch->rs_handle,
arg_ch->rs_offset,
write_len);
+
+ /* Do not send XDR pad bytes */
+ if (chunk_no && write_len < 4) {
+ chunk_no++;
+ break;
+ }
+
chunk_off = 0;
while (write_len) {
ret = send_write(xprt, rqstp,


2015-12-07 20:42:42

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 2/6] svcrdma: Improve allocation of struct svc_rdma_op_ctxt

Turns out that when the maximum payload size of NFS READ and WRITE
was increased to 1MB, the size of struct svc_rdma_op_ctxt
increased to 6KB (x86_64). That makes allocating one of these from
a kmem_cache more likely to fail.

Allocating one of these has to be fast in general, and none of the
current caller sites expect allocation failure. The existing logic
ensures no failure by looping and sleeping.

Since I'm about to add a caller where this allocation must always
work _and_ it cannot sleep, pre-allocate them for each connection,
like other RDMA transport-related resources.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 4 ++
net/sunrpc/xprtrdma/svc_rdma.c | 17 -------
net/sunrpc/xprtrdma/svc_rdma_transport.c | 76 ++++++++++++++++++++++++++----
net/sunrpc/xprtrdma/xprt_rdma.h | 2 -
4 files changed, 70 insertions(+), 29 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index f869807..2bb0ff3 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -69,6 +69,7 @@ extern atomic_t rdma_stat_sq_prod;
* completes.
*/
struct svc_rdma_op_ctxt {
+ struct list_head free_q;
struct svc_rdma_op_ctxt *read_hdr;
struct svc_rdma_fastreg_mr *frmr;
int hdr_count;
@@ -142,6 +143,9 @@ struct svcxprt_rdma {

atomic_t sc_dma_used;
atomic_t sc_ctxt_used;
+ struct list_head sc_ctxt_q;
+ spinlock_t sc_ctxt_lock;
+
struct list_head sc_rq_dto_q;
spinlock_t sc_rq_dto_lock;
struct ib_qp *sc_qp;
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 1b7051b..aed1d96 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -71,9 +71,7 @@ atomic_t rdma_stat_rq_prod;
atomic_t rdma_stat_sq_poll;
atomic_t rdma_stat_sq_prod;

-/* Temporary NFS request map and context caches */
struct kmem_cache *svc_rdma_map_cachep;
-struct kmem_cache *svc_rdma_ctxt_cachep;

struct workqueue_struct *svc_rdma_wq;

@@ -244,7 +242,6 @@ void svc_rdma_cleanup(void)
#endif
svc_unreg_xprt_class(&svc_rdma_class);
kmem_cache_destroy(svc_rdma_map_cachep);
- kmem_cache_destroy(svc_rdma_ctxt_cachep);
}

int svc_rdma_init(void)
@@ -275,26 +272,12 @@ int svc_rdma_init(void)
goto err0;
}

- /* Create the temporary context cache */
- svc_rdma_ctxt_cachep =
- kmem_cache_create("svc_rdma_ctxt_cache",
- sizeof(struct svc_rdma_op_ctxt),
- 0,
- SLAB_HWCACHE_ALIGN,
- NULL);
- if (!svc_rdma_ctxt_cachep) {
- printk(KERN_INFO "Could not allocate WR ctxt cache.\n");
- goto err1;
- }
-
/* Register RDMA with the SVC transport switch */
svc_reg_xprt_class(&svc_rdma_class);
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
svc_reg_xprt_class(&svc_rdma_bc_class);
#endif
return 0;
- err1:
- kmem_cache_destroy(svc_rdma_map_cachep);
err0:
unregister_sysctl_table(svcrdma_table_header);
destroy_workqueue(svc_rdma_wq);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index b348b4a..ede88f3 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -155,16 +155,27 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt)

struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
{
- struct svc_rdma_op_ctxt *ctxt;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ if (list_empty(&xprt->sc_ctxt_q))
+ goto out_empty;
+
+ ctxt = list_first_entry(&xprt->sc_ctxt_q,
+ struct svc_rdma_op_ctxt, free_q);
+ list_del_init(&ctxt->free_q);
+ spin_unlock_bh(&xprt->sc_ctxt_lock);

- ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep,
- GFP_KERNEL | __GFP_NOFAIL);
- ctxt->xprt = xprt;
- INIT_LIST_HEAD(&ctxt->dto_q);
ctxt->count = 0;
ctxt->frmr = NULL;
+
atomic_inc(&xprt->sc_ctxt_used);
return ctxt;
+
+out_empty:
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+ pr_err("svcrdma: empty RDMA ctxt list?\n");
+ return NULL;
}

void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
@@ -198,7 +209,27 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
for (i = 0; i < ctxt->count; i++)
put_page(ctxt->pages[i]);

- kmem_cache_free(svc_rdma_ctxt_cachep, ctxt);
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ list_add(&ctxt->free_q, &xprt->sc_ctxt_q);
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+ atomic_dec(&xprt->sc_ctxt_used);
+}
+
+static void svc_rdma_put_context_irq(struct svc_rdma_op_ctxt *ctxt, int free_pages)
+{
+ struct svcxprt_rdma *xprt;
+ int i;
+
+ xprt = ctxt->xprt;
+ if (free_pages)
+ for (i = 0; i < ctxt->count; i++)
+ put_page(ctxt->pages[i]);
+
+ spin_lock(&xprt->sc_ctxt_lock);
+ list_add(&ctxt->free_q, &xprt->sc_ctxt_q);
+ spin_unlock(&xprt->sc_ctxt_lock);
+
atomic_dec(&xprt->sc_ctxt_used);
}

@@ -357,7 +388,7 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
/* Close the transport */
dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- svc_rdma_put_context(ctxt, 1);
+ svc_rdma_put_context_irq(ctxt, 1);
svc_xprt_put(&xprt->sc_xprt);
continue;
}
@@ -392,13 +423,13 @@ static void process_context(struct svcxprt_rdma *xprt,
case IB_WR_SEND:
if (ctxt->frmr)
pr_err("svcrdma: SEND: ctxt->frmr != NULL\n");
- svc_rdma_put_context(ctxt, 1);
+ svc_rdma_put_context_irq(ctxt, 1);
break;

case IB_WR_RDMA_WRITE:
if (ctxt->frmr)
pr_err("svcrdma: WRITE: ctxt->frmr != NULL\n");
- svc_rdma_put_context(ctxt, 0);
+ svc_rdma_put_context_irq(ctxt, 0);
break;

case IB_WR_RDMA_READ:
@@ -417,7 +448,7 @@ static void process_context(struct svcxprt_rdma *xprt,
}
svc_xprt_enqueue(&xprt->sc_xprt);
}
- svc_rdma_put_context(ctxt, 0);
+ svc_rdma_put_context_irq(ctxt, 0);
break;

default:
@@ -523,9 +554,11 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_ctxt_q);
init_waitqueue_head(&cma_xprt->sc_send_wait);

spin_lock_init(&cma_xprt->sc_lock);
+ spin_lock_init(&cma_xprt->sc_ctxt_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock);
spin_lock_init(&cma_xprt->sc_frmr_q_lock);

@@ -927,6 +960,21 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
(size_t)svcrdma_max_requests);
newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;

+ for (i = newxprt->sc_sq_depth; i; i--) {
+ struct svc_rdma_op_ctxt *ctxt;
+
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+ if (!ctxt) {
+ dprintk("svcrdma: No memory for RDMA ctxt\n");
+ goto errout;
+ }
+
+ ctxt->xprt = newxprt;
+ INIT_LIST_HEAD(&ctxt->free_q);
+ INIT_LIST_HEAD(&ctxt->dto_q);
+ list_add(&ctxt->free_q, &newxprt->sc_ctxt_q);
+ }
+
/*
* Limit ORD based on client limit, local device limit, and
* configured svcrdma limit.
@@ -1222,6 +1270,14 @@ static void __svc_rdma_free(struct work_struct *work)
/* Destroy the CM ID */
rdma_destroy_id(rdma->sc_cm_id);

+ while (!list_empty(&rdma->sc_ctxt_q)) {
+ struct svc_rdma_op_ctxt *ctxt;
+ ctxt = list_first_entry(&rdma->sc_ctxt_q,
+ struct svc_rdma_op_ctxt, free_q);
+ list_del(&ctxt->free_q);
+ kfree(ctxt);
+ }
+
kfree(rdma);
}

diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index ac7f8d4..a1fd74a 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -531,8 +531,6 @@ void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);

/* Temporary NFS request map cache. Created in svc_rdma.c */
extern struct kmem_cache *svc_rdma_map_cachep;
-/* WR context cache. Created in svc_rdma.c */
-extern struct kmem_cache *svc_rdma_ctxt_cachep;
/* Workqueue created in svc_rdma.c */
extern struct workqueue_struct *svc_rdma_wq;



2015-12-07 20:42:50

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 3/6] svcrdma: Define maximum number of backchannel requests

Extra resources for handling backchannel requests have to be
pre-allocated when a transport instance is created. Set a limit.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 2 ++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 14 +++++++++-----
2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 2bb0ff3..f71c625 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -138,6 +138,7 @@ struct svcxprt_rdma {

int sc_max_requests; /* Depth of RQ */
int sc_max_req_size; /* Size of each RQ WR buf */
+ int sc_max_bc_requests;

struct ib_pd *sc_pd;

@@ -182,6 +183,7 @@ struct svcxprt_rdma {
#define RPCRDMA_SQ_DEPTH_MULT 8
#define RPCRDMA_MAX_REQUESTS 32
#define RPCRDMA_MAX_REQ_SIZE 4096
+#define RPCRDMA_MAX_BC_REQUESTS 2

#define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD

diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index ede88f3..ed5dd93 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -566,6 +566,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,

cma_xprt->sc_max_req_size = svcrdma_max_req_size;
cma_xprt->sc_max_requests = svcrdma_max_requests;
+ cma_xprt->sc_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS;
cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
atomic_set(&cma_xprt->sc_sq_count, 0);
atomic_set(&cma_xprt->sc_ctxt_used, 0);
@@ -922,6 +923,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
struct ib_device_attr devattr;
int uninitialized_var(dma_mr_acc);
int need_dma_mr = 0;
+ int total_reqs;
int ret;
int i;

@@ -957,8 +959,10 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
newxprt->sc_max_sge_rd = min_t(size_t, devattr.max_sge_rd,
RPCSVC_MAXPAGES);
newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
- (size_t)svcrdma_max_requests);
- newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
+ (size_t)svcrdma_max_requests);
+ newxprt->sc_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS;
+ total_reqs = newxprt->sc_max_requests + newxprt->sc_max_bc_requests;
+ newxprt->sc_sq_depth = total_reqs * RPCRDMA_SQ_DEPTH_MULT;

for (i = newxprt->sc_sq_depth; i; i--) {
struct svc_rdma_op_ctxt *ctxt;
@@ -997,7 +1001,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout;
}
- cq_attr.cqe = newxprt->sc_max_requests;
+ cq_attr.cqe = total_reqs;
newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
rq_comp_handler,
cq_event_handler,
@@ -1012,7 +1016,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
qp_attr.event_handler = qp_event_handler;
qp_attr.qp_context = &newxprt->sc_xprt;
qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
- qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+ qp_attr.cap.max_recv_wr = total_reqs;
qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
@@ -1108,7 +1112,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
newxprt->sc_cm_id->device->local_dma_lkey;

/* Post receive buffers */
- for (i = 0; i < newxprt->sc_max_requests; i++) {
+ for (i = 0; i < total_reqs; i++) {
ret = svc_rdma_post_recv(newxprt);
if (ret) {
dprintk("svcrdma: failure posting receive buffers\n");


2015-12-07 20:42:58

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 4/6] svcrdma: Add infrastructure to send backwards direction RPC/RDMA calls

To support the NFSv4.1 backchannel on RDMA connections, add a
mechanism for sending a backwards-direction RPC/RDMA call on a
connection established by a client.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 2 +
net/sunrpc/xprtrdma/svc_rdma_sendto.c | 61 +++++++++++++++++++++++++++++++++
2 files changed, 63 insertions(+)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index f71c625..bf9b17b 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -215,6 +215,8 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
extern int svc_rdma_sendto(struct svc_rqst *);
extern struct rpcrdma_read_chunk *
svc_rdma_get_read_chunk(struct rpcrdma_msg *);
+extern int svc_rdma_bc_post_send(struct svcxprt_rdma *,
+ struct svc_rdma_op_ctxt *, struct xdr_buf *);

/* svc_rdma_transport.c */
extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index bad5eaa..846df63 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -648,3 +648,64 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
svc_rdma_put_context(ctxt, 0);
return ret;
}
+
+/* Send a backwards direction RPC call.
+ *
+ * Caller holds the connection's mutex and has already marshaled the
+ * RPC/RDMA request. Before sending the request, this API also posts
+ * an extra receive buffer to catch the bc reply for this request.
+ */
+int svc_rdma_bc_post_send(struct svcxprt_rdma *rdma,
+ struct svc_rdma_op_ctxt *ctxt, struct xdr_buf *sndbuf)
+{
+ struct svc_rdma_req_map *vec;
+ struct ib_send_wr send_wr;
+ int ret;
+
+ vec = svc_rdma_get_req_map();
+ ret = map_xdr(rdma, sndbuf, vec);
+ if (ret)
+ goto out;
+
+ /* Post a recv buffer to handle reply for this request */
+ ret = svc_rdma_post_recv(rdma);
+ if (ret) {
+ pr_err("svcrdma: Failed to post bc receive buffer, err=%d. "
+ "Closing transport %p.\n", ret, rdma);
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ ret = -ENOTCONN;
+ goto out;
+ }
+
+ ctxt->wr_op = IB_WR_SEND;
+ ctxt->direction = DMA_TO_DEVICE;
+ ctxt->sge[0].lkey = rdma->sc_dma_lkey;
+ ctxt->sge[0].length = sndbuf->len;
+ ctxt->sge[0].addr =
+ ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0,
+ sndbuf->len, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) {
+ svc_rdma_unmap_dma(ctxt);
+ ret = -EIO;
+ goto out;
+ }
+ atomic_inc(&rdma->sc_dma_used);
+
+ memset(&send_wr, 0, sizeof send_wr);
+ send_wr.wr_id = (unsigned long)ctxt;
+ send_wr.sg_list = ctxt->sge;
+ send_wr.num_sge = 1;
+ send_wr.opcode = IB_WR_SEND;
+ send_wr.send_flags = IB_SEND_SIGNALED;
+
+ ret = svc_rdma_send(rdma, &send_wr);
+ if (ret) {
+ svc_rdma_unmap_dma(ctxt);
+ ret = -EIO;
+ goto out;
+ }
+out:
+ svc_rdma_put_req_map(vec);
+ dprintk("svcrdma: %s returns %d\n", __func__, ret);
+ return ret;
+}


2015-12-07 20:43:06

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 5/6] svcrdma: Add infrastructure to receive backwards direction RPC/RDMA replies

To support the NFSv4.1 backchannel on RDMA connections, add a
capability for receiving an RPC/RDMA reply on a connection
established by a client.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 5 ++
net/sunrpc/xprtrdma/Makefile | 2 -
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 86 ++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 51 +++++++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +
5 files changed, 145 insertions(+), 1 deletion(-)
create mode 100644 net/sunrpc/xprtrdma/svc_rdma_backchannel.c

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index bf9b17b..5371d42 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -187,6 +187,11 @@ struct svcxprt_rdma {

#define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD

+/* svc_rdma_backchannel.c */
+extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
+ struct rpcrdma_msg *rmsgp,
+ struct xdr_buf *rcvbuf);
+
/* svc_rdma_marshal.c */
extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg **, struct svc_rqst *);
extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 33f99d3..dc9f3b5 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -2,7 +2,7 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o

rpcrdma-y := transport.o rpc_rdma.o verbs.o \
fmr_ops.o frwr_ops.o physical_ops.o \
- svc_rdma.o svc_rdma_transport.o \
+ svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
module.o
rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
new file mode 100644
index 0000000..69dab71
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2015 Oracle. All rights reserved.
+ *
+ * Support for backward direction RPCs on RPC/RDMA (server-side).
+ */
+
+#include <linux/sunrpc/svc_rdma.h>
+#include "xprt_rdma.h"
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+#undef SVCRDMA_BACKCHANNEL_DEBUG
+
+int
+svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
+ struct xdr_buf *rcvbuf)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct kvec *dst, *src = &rcvbuf->head[0];
+ struct rpc_rqst *req;
+ unsigned long cwnd;
+ u32 credits;
+ size_t len;
+ __be32 xid;
+ __be32 *p;
+ int ret;
+
+ p = (__be32 *)src->iov_base;
+ len = src->iov_len;
+ xid = rmsgp->rm_xid;
+
+#ifdef SVCRDMA_BACKCHANNEL_DEBUG
+ pr_info("%s: xid=%08x, length=%zu\n",
+ __func__, be32_to_cpu(xid), len);
+ pr_info("%s: RPC/RDMA: %*ph\n",
+ __func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp);
+ pr_info("%s: RPC: %*ph\n",
+ __func__, (int)len, p);
+#endif
+
+ ret = -EAGAIN;
+ if (src->iov_len < 24)
+ goto out_shortreply;
+
+ spin_lock_bh(&xprt->transport_lock);
+ req = xprt_lookup_rqst(xprt, xid);
+ if (!req)
+ goto out_notfound;
+
+ dst = &req->rq_private_buf.head[0];
+ memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
+ if (dst->iov_len < len)
+ goto out_unlock;
+ memcpy(dst->iov_base, p, len);
+
+ credits = be32_to_cpu(rmsgp->rm_credit);
+ if (credits == 0)
+ credits = 1; /* don't deadlock */
+ else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
+ credits = r_xprt->rx_buf.rb_bc_max_requests;
+
+ cwnd = xprt->cwnd;
+ xprt->cwnd = credits << RPC_CWNDSHIFT;
+ if (xprt->cwnd > cwnd)
+ xprt_release_rqst_cong(req->rq_task);
+
+ ret = 0;
+ xprt_complete_rqst(req->rq_task, rcvbuf->len);
+ rcvbuf->len = 0;
+
+out_unlock:
+ spin_unlock_bh(&xprt->transport_lock);
+out:
+ return ret;
+
+out_shortreply:
+ dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
+ xprt, src->iov_len);
+ goto out;
+
+out_notfound:
+ dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
+ xprt, be32_to_cpu(xid));
+
+ goto out_unlock;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index ff4f01e..faa1fc0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -567,6 +567,38 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
return ret;
}

+/* By convention, backchannel calls arrive via rdma_msg type
+ * messages, and never populate the chunk lists. This makes
+ * the RPC/RDMA header small and fixed in size, so it is
+ * straightforward to check the RPC header's direction field.
+ */
+static bool
+svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp)
+{
+ __be32 *p = (__be32 *)rmsgp;
+
+ if (!xprt->xpt_bc_xprt)
+ return false;
+
+ if (rmsgp->rm_type != rdma_msg)
+ return false;
+ if (rmsgp->rm_body.rm_chunks[0] != xdr_zero)
+ return false;
+ if (rmsgp->rm_body.rm_chunks[1] != xdr_zero)
+ return false;
+ if (rmsgp->rm_body.rm_chunks[2] != xdr_zero)
+ return false;
+
+ /* sanity */
+ if (p[7] != rmsgp->rm_xid)
+ return false;
+ /* call direction */
+ if (p[8] == cpu_to_be32(RPC_CALL))
+ return false;
+
+ return true;
+}
+
/*
* Set up the rqstp thread context to point to the RQ buffer. If
* necessary, pull additional data from the client with an RDMA_READ
@@ -632,6 +664,15 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
goto close_out;
}

+ if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
+ ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
+ &rqstp->rq_arg);
+ svc_rdma_put_context(ctxt, 0);
+ if (ret)
+ goto repost;
+ return ret;
+ }
+
/* Read read-list data. */
ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
if (ret > 0) {
@@ -668,4 +709,14 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
set_bit(XPT_CLOSE, &xprt->xpt_flags);
defer:
return 0;
+
+repost:
+ ret = svc_rdma_post_recv(rdma_xprt);
+ if (ret) {
+ pr_err("svcrdma: could not post a receive buffer, err=%d."
+ "Closing transport %p.\n", ret, rdma_xprt);
+ set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags);
+ ret = -ENOTCONN;
+ }
+ return ret;
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a1fd74a..3895574 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -309,6 +309,8 @@ struct rpcrdma_buffer {
u32 rb_bc_srv_max_requests;
spinlock_t rb_reqslock; /* protect rb_allreqs */
struct list_head rb_allreqs;
+
+ u32 rb_bc_max_requests;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)



2015-12-07 20:43:14

by Chuck Lever III

[permalink] [raw]
Subject: [PATCH v3 6/6] xprtrdma: Add class for RDMA backwards direction transport

To support the server-side of an NFSv4.1 backchannel on RDMA
connections, add a transport class that enables backward
direction messages on an existing forward channel connection.

Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/xprt.h | 1
net/sunrpc/xprt.c | 1
net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 219 ++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 14 +-
net/sunrpc/xprtrdma/transport.c | 31 +++-
net/sunrpc/xprtrdma/xprt_rdma.h | 11 +
6 files changed, 263 insertions(+), 14 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 69ef5b3..7637ccd 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -85,6 +85,7 @@ struct rpc_rqst {
__u32 * rq_buffer; /* XDR encode buffer */
size_t rq_callsize,
rq_rcvsize;
+ void * rq_privdata; /* xprt-specific per-rqst data */
size_t rq_xmit_bytes_sent; /* total bytes sent */
size_t rq_reply_bytes_recvd; /* total reply bytes */
/* received */
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 2e98f4a..37edea6 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1425,3 +1425,4 @@ void xprt_put(struct rpc_xprt *xprt)
if (atomic_dec_and_test(&xprt->count))
xprt_destroy(xprt);
}
+EXPORT_SYMBOL_GPL(xprt_put);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 69dab71..3534e75 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -84,3 +84,222 @@ out_notfound:

goto out_unlock;
}
+
+/* Server-side transport endpoint wants a whole page for its send
+ * buffer. The client RPC code constructs the RPC header in this
+ * buffer before it invokes ->send_request.
+ *
+ * Returns NULL if there was a temporary allocation failure.
+ */
+static void *
+xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
+{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct svc_rdma_op_ctxt *ctxt;
+ struct svcxprt_rdma *rdma;
+ struct svc_xprt *sxprt;
+ struct page *page;
+
+ /* Prevent an infinite loop: don't return NULL */
+ if (size > PAGE_SIZE)
+ WARN_ONCE(1, "svcrdma: bc buffer request too large (size %zu)\n",
+ size);
+
+ page = alloc_page(RPCRDMA_DEF_GFP);
+ if (!page)
+ return NULL;
+
+ sxprt = rqst->rq_xprt->bc_xprt;
+ rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
+ ctxt = svc_rdma_get_context(rdma);
+ if (!ctxt) {
+ put_page(page);
+ return NULL;
+ }
+
+ rqst->rq_privdata = ctxt;
+ ctxt->pages[0] = page;
+ ctxt->count = 1;
+ return page_address(page);
+}
+
+static void
+xprt_rdma_bc_free(void *buffer)
+{
+ /* No-op: ctxt and page have already been freed. */
+}
+
+static int
+rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)rqst->rq_buffer;
+ struct svc_rdma_op_ctxt *ctxt;
+ int rc;
+
+ /* Space in the send buffer for an RPC/RDMA header is reserved
+ * via xprt->tsh_size */
+ headerp->rm_xid = rqst->rq_xid;
+ headerp->rm_vers = rpcrdma_version;
+ headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
+ headerp->rm_type = rdma_msg;
+ headerp->rm_body.rm_chunks[0] = xdr_zero;
+ headerp->rm_body.rm_chunks[1] = xdr_zero;
+ headerp->rm_body.rm_chunks[2] = xdr_zero;
+
+#ifdef SVCRDMA_BACKCHANNEL_DEBUG
+ pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
+#endif
+
+ ctxt = (struct svc_rdma_op_ctxt *)rqst->rq_privdata;
+ rc = svc_rdma_bc_post_send(rdma, ctxt, &rqst->rq_snd_buf);
+ if (rc)
+ goto drop_connection;
+ return rc;
+
+drop_connection:
+ dprintk("svcrdma: failed to send bc call\n");
+ svc_rdma_put_context(ctxt, 1);
+ xprt_disconnect_done(xprt);
+ return -ENOTCONN;
+}
+
+/* Send an RPC call on the passive end of a transport
+ * connection.
+ */
+static int
+xprt_rdma_bc_send_request(struct rpc_task *task)
+{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
+ struct svcxprt_rdma *rdma;
+ u32 len;
+
+ dprintk("svcrdma: sending bc call with xid: %08x\n",
+ be32_to_cpu(rqst->rq_xid));
+
+ if (!mutex_trylock(&sxprt->xpt_mutex)) {
+ rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
+ if (!mutex_trylock(&sxprt->xpt_mutex))
+ return -EAGAIN;
+ rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
+ }
+
+ len = -ENOTCONN;
+ rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
+ if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
+ len = rpcrdma_bc_send_request(rdma, rqst);
+
+ mutex_unlock(&sxprt->xpt_mutex);
+
+ if (len < 0)
+ return len;
+ return 0;
+}
+
+static void
+xprt_rdma_bc_close(struct rpc_xprt *xprt)
+{
+ dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
+}
+
+static void
+xprt_rdma_bc_put(struct rpc_xprt *xprt)
+{
+ dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
+
+ xprt_free(xprt);
+ module_put(THIS_MODULE);
+}
+
+static struct rpc_xprt_ops xprt_rdma_bc_procs = {
+ .reserve_xprt = xprt_reserve_xprt_cong,
+ .release_xprt = xprt_release_xprt_cong,
+ .alloc_slot = xprt_alloc_slot,
+ .release_request = xprt_release_rqst_cong,
+ .buf_alloc = xprt_rdma_bc_allocate,
+ .buf_free = xprt_rdma_bc_free,
+ .send_request = xprt_rdma_bc_send_request,
+ .set_retrans_timeout = xprt_set_retrans_timeout_def,
+ .close = xprt_rdma_bc_close,
+ .destroy = xprt_rdma_bc_put,
+ .print_stats = xprt_rdma_print_stats
+};
+
+static const struct rpc_timeout xprt_rdma_bc_timeout = {
+ .to_initval = 60 * HZ,
+ .to_maxval = 60 * HZ,
+};
+
+/* It shouldn't matter if the number of backchannel session slots
+ * doesn't match the number of RPC/RDMA credits. That just means
+ * one or the other will have extra slots that aren't used.
+ */
+static struct rpc_xprt *
+xprt_setup_rdma_bc(struct xprt_create *args)
+{
+ struct rpc_xprt *xprt;
+ struct rpcrdma_xprt *new_xprt;
+
+ if (args->addrlen > sizeof(xprt->addr)) {
+ dprintk("RPC: %s: address too large\n", __func__);
+ return ERR_PTR(-EBADF);
+ }
+
+ xprt = xprt_alloc(args->net, sizeof(*new_xprt),
+ RPCRDMA_MAX_BC_REQUESTS,
+ RPCRDMA_MAX_BC_REQUESTS);
+ if (xprt == NULL) {
+ dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
+ __func__);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ xprt->timeout = &xprt_rdma_bc_timeout;
+ xprt_set_bound(xprt);
+ xprt_set_connected(xprt);
+ xprt->bind_timeout = RPCRDMA_BIND_TO;
+ xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
+ xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
+
+ xprt->prot = XPRT_TRANSPORT_BC_RDMA;
+ xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32);
+ xprt->ops = &xprt_rdma_bc_procs;
+
+ memcpy(&xprt->addr, args->dstaddr, args->addrlen);
+ xprt->addrlen = args->addrlen;
+ xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
+ xprt->resvport = 0;
+
+ xprt->max_payload = xprt_rdma_max_inline_read;
+
+ new_xprt = rpcx_to_rdmax(xprt);
+ new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
+
+ xprt_get(xprt);
+ args->bc_xprt->xpt_bc_xprt = xprt;
+ xprt->bc_xprt = args->bc_xprt;
+
+ if (!try_module_get(THIS_MODULE))
+ goto out_fail;
+
+ /* Final put for backchannel xprt is in __svc_rdma_free */
+ xprt_get(xprt);
+ return xprt;
+
+out_fail:
+ xprt_rdma_free_addresses(xprt);
+ args->bc_xprt->xpt_bc_xprt = NULL;
+ xprt_put(xprt);
+ xprt_free(xprt);
+ return ERR_PTR(-EINVAL);
+}
+
+struct xprt_class xprt_rdma_bc = {
+ .list = LIST_HEAD_INIT(xprt_rdma_bc.list),
+ .name = "rdma backchannel",
+ .owner = THIS_MODULE,
+ .ident = XPRT_TRANSPORT_BC_RDMA,
+ .setup = xprt_setup_rdma_bc,
+};
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index ed5dd93..8aea4ad 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1212,12 +1212,14 @@ static void __svc_rdma_free(struct work_struct *work)
{
struct svcxprt_rdma *rdma =
container_of(work, struct svcxprt_rdma, sc_work);
- dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+ struct svc_xprt *xprt = &rdma->sc_xprt;
+
+ dprintk("svcrdma: %s(%p)\n", __func__, rdma);

/* We should only be called from kref_put */
- if (atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0)
+ if (atomic_read(&xprt->xpt_ref.refcount) != 0)
pr_err("svcrdma: sc_xprt still in use? (%d)\n",
- atomic_read(&rdma->sc_xprt.xpt_ref.refcount));
+ atomic_read(&xprt->xpt_ref.refcount));

/*
* Destroy queued, but not processed read completions. Note
@@ -1252,6 +1254,12 @@ static void __svc_rdma_free(struct work_struct *work)
pr_err("svcrdma: dma still in use? (%d)\n",
atomic_read(&rdma->sc_dma_used));

+ /* Final put of backchannel client transport */
+ if (xprt->xpt_bc_xprt) {
+ xprt_put(xprt->xpt_bc_xprt);
+ xprt->xpt_bc_xprt = NULL;
+ }
+
/* De-allocate fastreg mr */
rdma_dealloc_frmr_q(rdma);

diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 8c545f7..43d25f4 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -63,7 +63,7 @@
*/

static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
-static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
+ unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
static unsigned int xprt_rdma_inline_write_padding;
static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
@@ -143,12 +143,8 @@ static struct ctl_table sunrpc_table[] = {

#endif

-#define RPCRDMA_BIND_TO (60U * HZ)
-#define RPCRDMA_INIT_REEST_TO (5U * HZ)
-#define RPCRDMA_MAX_REEST_TO (30U * HZ)
-#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ)
-
-static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */
+static struct rpc_xprt_ops xprt_rdma_procs;
+extern struct xprt_class xprt_rdma_bc;

static void
xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
@@ -174,7 +170,7 @@ xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
}

-static void
+void
xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
{
char buf[128];
@@ -203,7 +199,7 @@ xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
}

-static void
+void
xprt_rdma_free_addresses(struct rpc_xprt *xprt)
{
unsigned int i;
@@ -499,7 +495,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
if (req == NULL)
return NULL;

- flags = GFP_NOIO | __GFP_NOWARN;
+ flags = RPCRDMA_DEF_GFP;
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;

@@ -639,7 +635,7 @@ drop_connection:
return -ENOTCONN; /* implies disconnect */
}

-static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
+void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
long idle_time = 0;
@@ -740,6 +736,11 @@ void xprt_rdma_cleanup(void)

rpcrdma_destroy_wq();
frwr_destroy_recovery_wq();
+
+ rc = xprt_unregister_transport(&xprt_rdma_bc);
+ if (rc)
+ dprintk("RPC: %s: xprt_unregister(bc) returned %i\n",
+ __func__, rc);
}

int xprt_rdma_init(void)
@@ -763,6 +764,14 @@ int xprt_rdma_init(void)
return rc;
}

+ rc = xprt_register_transport(&xprt_rdma_bc);
+ if (rc) {
+ xprt_unregister_transport(&xprt_rdma);
+ rpcrdma_destroy_wq();
+ frwr_destroy_recovery_wq();
+ return rc;
+ }
+
dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");

dprintk("Defaults:\n");
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3895574..d83abbc 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -55,6 +55,11 @@
#define RDMA_RESOLVE_TIMEOUT (5000) /* 5 seconds */
#define RDMA_CONNECT_RETRY_MAX (2) /* retries if no listener backlog */

+#define RPCRDMA_BIND_TO (60U * HZ)
+#define RPCRDMA_INIT_REEST_TO (5U * HZ)
+#define RPCRDMA_MAX_REEST_TO (30U * HZ)
+#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ)
+
/*
* Interface Adapter -- one per transport instance
*/
@@ -148,6 +153,8 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
return (struct rpcrdma_msg *)rb->rg_base;
}

+#define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN)
+
/*
* struct rpcrdma_rep -- this structure encapsulates state required to recv
* and complete a reply, asychronously. It needs several pieces of
@@ -516,6 +523,10 @@ int rpcrdma_marshal_req(struct rpc_rqst *);

/* RPC/RDMA module init - xprtrdma/transport.c
*/
+extern unsigned int xprt_rdma_max_inline_read;
+void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
+void xprt_rdma_free_addresses(struct rpc_xprt *xprt);
+void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq);
int xprt_rdma_init(void);
void xprt_rdma_cleanup(void);



2015-12-13 03:14:09

by Tom Talpey

[permalink] [raw]
Subject: Re: [PATCH v3 1/6] svcrdma: Do not send XDR roundup bytes for a write chunk

Two small comments.

On 12/7/2015 12:42 PM, Chuck Lever wrote:
> Minor optimization: when dealing with write chunk XDR roundup, do
> not post a Write WR for the zero bytes in the pad. Simply update
> the write segment in the RPC-over-RDMA header to reflect the extra
> pad bytes.
>
> The Reply chunk is also a write chunk, but the server does not use
> send_write_chunks() to send the Reply chunk. That's OK in this case:
> the server Upper Layer typically marshals the Reply chunk contents
> in a single contiguous buffer, without a separate tail for the XDR
> pad.
>
> The comments and the variable naming refer to "chunks" but what is
> really meant is "segments." The existing code sends only one
> xdr_write_chunk per RPC reply.
>
> The fix assumes this as well. When the XDR pad in the first write
> chunk is reached, the assumption is the Write list is complete and
> send_write_chunks() returns.
>
> That will remain a valid assumption until the server Upper Layer can
> support multiple bulk payload results per RPC.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/svc_rdma_sendto.c | 7 +++++++
> 1 file changed, 7 insertions(+)
>
> diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> index 969a1ab..bad5eaa 100644
> --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> @@ -342,6 +342,13 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
> arg_ch->rs_handle,
> arg_ch->rs_offset,
> write_len);
> +
> + /* Do not send XDR pad bytes */

It might be clearer to say "marshal" instead of "send".

> + if (chunk_no && write_len < 4) {

Why is it necessary to check for chunk_no == 0? It is not
possible for leading data to ever be padding, nor is a leading
data element ever less than 4 bytes long. Right?

Tom.

> + chunk_no++;
> + break;
> + }
> +
> chunk_off = 0;
> while (write_len) {
> ret = send_write(xprt, rqstp,
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>

2015-12-13 03:24:16

by Tom Talpey

[permalink] [raw]
Subject: Re: [PATCH v3 5/6] svcrdma: Add infrastructure to receive backwards direction RPC/RDMA replies

Three comments.

On 12/7/2015 12:43 PM, Chuck Lever wrote:
> To support the NFSv4.1 backchannel on RDMA connections, add a
> capability for receiving an RPC/RDMA reply on a connection
> established by a client.
> (snip)

> +/* By convention, backchannel calls arrive via rdma_msg type

"By convention" is ok, but it's important to note that this is
actually not "by protocol". Therefore, the following check may
reject valid messages. Even though it is unlikely an implementation
will insert chunks, it's not illegal, and ignoring them will
be less harmful. So I'm going to remake my earlier observation
that three checks below should be removed:

> + * messages, and never populate the chunk lists. This makes
> + * the RPC/RDMA header small and fixed in size, so it is
> + * straightforward to check the RPC header's direction field.
> + */
> +static bool
> +svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp)
> +{
> + __be32 *p = (__be32 *)rmsgp;
> +
> + if (!xprt->xpt_bc_xprt)
> + return false;
> +
> + if (rmsgp->rm_type != rdma_msg)
> + return false;

These three:

> + if (rmsgp->rm_body.rm_chunks[0] != xdr_zero)
> + return false;
> + if (rmsgp->rm_body.rm_chunks[1] != xdr_zero)
> + return false;
> + if (rmsgp->rm_body.rm_chunks[2] != xdr_zero)
> + return false;
> +

(snip)
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index a1fd74a..3895574 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -309,6 +309,8 @@ struct rpcrdma_buffer {
> u32 rb_bc_srv_max_requests;
> spinlock_t rb_reqslock; /* protect rb_allreqs */
> struct list_head rb_allreqs;
> +
> + u32 rb_bc_max_requests;

Why does this need to be u32? Shouldn't it be an int, and also the
rb_bc_srv_max_requests just above? The forward channel max_requests
are int, btw.

Tom.

2015-12-13 19:44:24

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v3 1/6] svcrdma: Do not send XDR roundup bytes for a write chunk

Hi Tom-


> On Dec 12, 2015, at 10:14 PM, Tom Talpey <[email protected]> wrote:
>
> Two small comments.
>
> On 12/7/2015 12:42 PM, Chuck Lever wrote:
>> Minor optimization: when dealing with write chunk XDR roundup, do
>> not post a Write WR for the zero bytes in the pad. Simply update
>> the write segment in the RPC-over-RDMA header to reflect the extra
>> pad bytes.
>>
>> The Reply chunk is also a write chunk, but the server does not use
>> send_write_chunks() to send the Reply chunk. That's OK in this case:
>> the server Upper Layer typically marshals the Reply chunk contents
>> in a single contiguous buffer, without a separate tail for the XDR
>> pad.
>>
>> The comments and the variable naming refer to "chunks" but what is
>> really meant is "segments." The existing code sends only one
>> xdr_write_chunk per RPC reply.
>>
>> The fix assumes this as well. When the XDR pad in the first write
>> chunk is reached, the assumption is the Write list is complete and
>> send_write_chunks() returns.
>>
>> That will remain a valid assumption until the server Upper Layer can
>> support multiple bulk payload results per RPC.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> net/sunrpc/xprtrdma/svc_rdma_sendto.c | 7 +++++++
>> 1 file changed, 7 insertions(+)
>>
>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
>> index 969a1ab..bad5eaa 100644
>> --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
>> +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
>> @@ -342,6 +342,13 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
>> arg_ch->rs_handle,
>> arg_ch->rs_offset,
>> write_len);
>> +
>> + /* Do not send XDR pad bytes */
>
> It might be clearer to say "marshal" instead of "send".

Marshaling each segment happens unconditionally in the
svc_rdma_xdr_encode_array_chunk() call just before this
comment. I really do mean "Do not send" here: the patch
is intended to squelch the RDMA Write of the XDR pad for
this chunk.

Perhaps "Do not write" would be more precise, but Bruce
has already committed this patch, IIRC.


>> + if (chunk_no && write_len < 4) {
>
> Why is it necessary to check for chunk_no == 0? It is not
> possible for leading data to ever be padding, nor is a leading
> data element ever less than 4 bytes long. Right?

I'm checking for chunk_no != 0, for exactly the reasons
you mentioned.


> Tom.
>
>> + chunk_no++;
>> + break;
>> + }
>> +
>> chunk_off = 0;
>> while (write_len) {
>> ret = send_write(xprt, rqstp,

--
Chuck Lever





2015-12-13 20:27:19

by Chuck Lever III

[permalink] [raw]
Subject: Re: [PATCH v3 5/6] svcrdma: Add infrastructure to receive backwards direction RPC/RDMA replies

Hi Tom-


> On Dec 12, 2015, at 10:24 PM, Tom Talpey <[email protected]> wrote:
>
> Three comments.
>
> On 12/7/2015 12:43 PM, Chuck Lever wrote:
>> To support the NFSv4.1 backchannel on RDMA connections, add a
>> capability for receiving an RPC/RDMA reply on a connection
>> established by a client.
>> (snip)
>
>> +/* By convention, backchannel calls arrive via rdma_msg type
>
> "By convention" is ok, but it's important to note that this is
> actually not "by protocol". Therefore, the following check may
> reject valid messages. Even though it is unlikely an implementation
> will insert chunks, it's not illegal, and ignoring them will
> be less harmful. So I'm going to remake my earlier observation
> that three checks below should be removed:

The convention is established in

https://datatracker.ietf.org/doc/draft-ietf-nfsv4-rpcrdma-bidirection/

Specifically, it states that backward direction messages in
RPC-over-RDMA Version One cannot have chunks and cannot be
larger than the connection's inline threshold. Thus these
checks are all valid and will not reject valid forward or
backward messages.

The reason for that stipulation is it makes the RPC-over-RDMA
header in backward direction messages a fixed size.

a. The call direction field in incoming RPC headers is then
always at a predictable offset in backward direction calls.
This means chunk lists don't have to be parsed to find the
call direction field. All that is needed is to ensure that
the chunk lists in the header are empty before going after
the call direction field. If any chunk list is present, it
must be a forward direction message.

b. The maximum size of backward direction messages is
predictable: it is always the inline threshold minus the
size of the RPC-over-RDMA header (always 28 bytes when
there are no chunks).

The client side has a very similar looking set of checks in
its reply handler to distinguish incoming backward direction
RPC calls from forward direction RPC replies.


>> + * messages, and never populate the chunk lists. This makes
>> + * the RPC/RDMA header small and fixed in size, so it is
>> + * straightforward to check the RPC header's direction field.
>> + */
>> +static bool
>> +svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp)
>> +{
>> + __be32 *p = (__be32 *)rmsgp;
>> +
>> + if (!xprt->xpt_bc_xprt)
>> + return false;
>> +
>> + if (rmsgp->rm_type != rdma_msg)
>> + return false;
>
> These three:
>
>> + if (rmsgp->rm_body.rm_chunks[0] != xdr_zero)
>> + return false;
>> + if (rmsgp->rm_body.rm_chunks[1] != xdr_zero)
>> + return false;
>> + if (rmsgp->rm_body.rm_chunks[2] != xdr_zero)
>> + return false;
>> +
>
> (snip)
>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>> index a1fd74a..3895574 100644
>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>> @@ -309,6 +309,8 @@ struct rpcrdma_buffer {
>> u32 rb_bc_srv_max_requests;
>> spinlock_t rb_reqslock; /* protect rb_allreqs */
>> struct list_head rb_allreqs;
>> +
>> + u32 rb_bc_max_requests;
>
> Why does this need to be u32? Shouldn't it be an int, and also the
> rb_bc_srv_max_requests just above? The forward channel max_requests
> are int, btw.

The XDR definition of RPC-over-RDMA in Section 4.3 of
RFC 5666 defines the on-the-wire credits field as a
uint32.

I prefer that the host representation of this
field match the sign and be able to contain the full
range values that can be conveyed in the on-the-wire
field. That makes marshaling and unmarshaling of the
wire value straightforward; and reasoning about the
C code in our implementation also applies directly to
wire values as well.

The forward channel max_requests field on the client,
rb_max_requests, is also a u32 since commit 9b1dcbc8cf46
from February 2015.

I've changed the equivalent server side field,
sc_max_requests, to a u32 in the next version of this
series.

ib_device_attr::max_qp_wr and svcxprt_rdma::sc_sq_depth
are both signed, but I can't imagine what a negative
value in these fields could mean.

sc_sq_depth is plugged into ib_qp_cap::max_send_wr and
ib_cq_init_attr::cqe, which are both unsigned, so
sc_sq_depth really should be unsigned too, IMHO.

--
Chuck Lever