Return-Path: Received: from mail-yk0-f182.google.com ([209.85.160.182]:34040 "EHLO mail-yk0-f182.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S932144AbbIUK3P (ORCPT ); Mon, 21 Sep 2015 06:29:15 -0400 Received: by ykdg206 with SMTP id g206so97828638ykd.1 for ; Mon, 21 Sep 2015 03:29:14 -0700 (PDT) MIME-Version: 1.0 In-Reply-To: <20150917204508.19671.23235.stgit@manet.1015granger.net> References: <20150917202829.19671.90044.stgit@manet.1015granger.net> <20150917204508.19671.23235.stgit@manet.1015granger.net> From: Devesh Sharma Date: Mon, 21 Sep 2015 15:58:34 +0530 Message-ID: Subject: Re: [PATCH v1 07/18] xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers To: Chuck Lever Cc: linux-rdma@vger.kernel.org, Linux NFS Mailing List Content-Type: text/plain; charset=UTF-8 Sender: linux-nfs-owner@vger.kernel.org List-ID: Looks good. On Fri, Sep 18, 2015 at 2:15 AM, Chuck Lever wrote: > xprtrdma's backward direction send and receive buffers are the same > size as the forechannel's inline threshold, and must be pre- > registered. > > The consumer has no control over which receive buffer the adapter > chooses to catch an incoming backwards-direction call. Any receive > buffer can be used for either a forward reply or a backward call. > Thus both types of RPC message must all be the same size. > > Signed-off-by: Chuck Lever > --- > net/sunrpc/xprtrdma/Makefile | 1 > net/sunrpc/xprtrdma/backchannel.c | 204 +++++++++++++++++++++++++++++++++++++ > net/sunrpc/xprtrdma/transport.c | 7 + > net/sunrpc/xprtrdma/verbs.c | 92 ++++++++++++++--- > net/sunrpc/xprtrdma/xprt_rdma.h | 20 ++++ > 5 files changed, 309 insertions(+), 15 deletions(-) > create mode 100644 net/sunrpc/xprtrdma/backchannel.c > > diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile > index 48913de..33f99d3 100644 > --- a/net/sunrpc/xprtrdma/Makefile > +++ b/net/sunrpc/xprtrdma/Makefile > @@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \ > svc_rdma.o svc_rdma_transport.o \ > svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ > module.o > +rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o > diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c > new file mode 100644 > index 0000000..c0a42ad > --- /dev/null > +++ b/net/sunrpc/xprtrdma/backchannel.c > @@ -0,0 +1,204 @@ > +/* > + * Copyright (c) 2015 Oracle. All rights reserved. > + * > + * Support for backward direction RPCs on RPC/RDMA. > + */ > + > +#include > + > +#include "xprt_rdma.h" > + > +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) > +# define RPCDBG_FACILITY RPCDBG_TRANS > +#endif > + > +static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, > + struct rpc_rqst *rqst) > +{ > + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; > + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); > + > + spin_lock(&buf->rb_reqslock); > + list_del(&req->rl_all); > + spin_unlock(&buf->rb_reqslock); > + > + rpcrdma_destroy_req(&r_xprt->rx_ia, req); > + > + kfree(rqst); > +} > + > +static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, > + struct rpc_rqst *rqst) > +{ > + struct rpcrdma_ia *ia = &r_xprt->rx_ia; > + struct rpcrdma_regbuf *rb; > + struct rpcrdma_req *req; > + struct xdr_buf *buf; > + size_t size; > + > + req = rpcrdma_create_req(r_xprt); > + if (!req) > + return -ENOMEM; > + req->rl_backchannel = true; > + > + size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); > + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); > + if (IS_ERR(rb)) > + goto out_fail; > + req->rl_rdmabuf = rb; > + > + size += RPCRDMA_INLINE_READ_THRESHOLD(rqst); > + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); > + if (IS_ERR(rb)) > + goto out_fail; > + rb->rg_owner = req; > + req->rl_sendbuf = rb; > + /* so that rpcr_to_rdmar works when receiving a request */ > + rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base; > + > + buf = &rqst->rq_snd_buf; > + buf->head[0].iov_base = rqst->rq_buffer; > + buf->head[0].iov_len = 0; > + buf->tail[0].iov_base = NULL; > + buf->tail[0].iov_len = 0; > + buf->page_len = 0; > + buf->len = 0; > + buf->buflen = size; > + > + return 0; > + > +out_fail: > + rpcrdma_bc_free_rqst(r_xprt, rqst); > + return -ENOMEM; > +} > + > +/* Allocate and add receive buffers to the rpcrdma_buffer's existing > + * list of rep's. These are released when the transport is destroyed. */ > +static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, > + unsigned int count) > +{ > + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; > + struct rpcrdma_rep *rep; > + unsigned long flags; > + int rc = 0; > + > + while (count--) { > + rep = rpcrdma_create_rep(r_xprt); > + if (IS_ERR(rep)) { > + pr_err("RPC: %s: reply buffer alloc failed\n", > + __func__); > + rc = PTR_ERR(rep); > + break; > + } > + > + spin_lock_irqsave(&buffers->rb_lock, flags); > + list_add(&rep->rr_list, &buffers->rb_recv_bufs); > + spin_unlock_irqrestore(&buffers->rb_lock, flags); > + } > + > + return rc; > +} > + > +/** > + * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests > + * @xprt: transport associated with these backchannel resources > + * @reqs: number of concurrent incoming requests to expect > + * > + * Returns 0 on success; otherwise a negative errno > + */ > +int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) > +{ > + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); > + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; > + struct rpc_rqst *rqst; > + unsigned int i; > + int rc; > + > + /* The backchannel reply path returns each rpc_rqst to the > + * bc_pa_list _after_ the reply is sent. If the server is > + * faster than the client, it can send another backward > + * direction request before the rpc_rqst is returned to the > + * list. The client rejects the request in this case. > + * > + * Twice as many rpc_rqsts are prepared to ensure there is > + * always an rpc_rqst available as soon as a reply is sent. > + */ > + for (i = 0; i < (reqs << 1); i++) { > + rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); > + if (!rqst) { > + pr_err("RPC: %s: Failed to create bc rpc_rqst\n", > + __func__); > + goto out_free; > + } > + > + rqst->rq_xprt = &r_xprt->rx_xprt; > + INIT_LIST_HEAD(&rqst->rq_list); > + INIT_LIST_HEAD(&rqst->rq_bc_list); > + > + if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) > + goto out_free; > + > + spin_lock_bh(&xprt->bc_pa_lock); > + list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); > + spin_unlock_bh(&xprt->bc_pa_lock); > + } > + > + rc = rpcrdma_bc_setup_reps(r_xprt, reqs); > + if (rc) > + goto out_free; > + > + rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs); > + if (rc) > + goto out_free; > + > + buffer->rb_bc_srv_max_requests = reqs; > + request_module("svcrdma"); > + > + return 0; > + > +out_free: > + xprt_rdma_bc_destroy(xprt, reqs); > + > + pr_err("RPC: %s: setup backchannel transport failed\n", __func__); > + return -ENOMEM; > +} > + > +/** > + * xprt_rdma_bc_destroy - Release resources for handling backchannel requests > + * @xprt: transport associated with these backchannel resources > + * @reqs: number of incoming requests to destroy; ignored > + */ > +void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) > +{ > + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); > + struct rpc_rqst *rqst, *tmp; > + > + spin_lock_bh(&xprt->bc_pa_lock); > + list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { > + list_del(&rqst->rq_bc_pa_list); > + spin_unlock_bh(&xprt->bc_pa_lock); > + > + rpcrdma_bc_free_rqst(r_xprt, rqst); > + > + spin_lock_bh(&xprt->bc_pa_lock); > + } > + spin_unlock_bh(&xprt->bc_pa_lock); > +} > + > +/** > + * xprt_rdma_bc_free_rqst - Release a backchannel rqst > + * @rqst: request to release > + */ > +void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) > +{ > + struct rpc_xprt *xprt = rqst->rq_xprt; > + > + smp_mb__before_atomic(); > + WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); > + clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); > + smp_mb__after_atomic(); > + > + spin_lock_bh(&xprt->bc_pa_lock); > + list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); > + spin_unlock_bh(&xprt->bc_pa_lock); > +} > diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c > index e9e5ed7..e3871a6 100644 > --- a/net/sunrpc/xprtrdma/transport.c > +++ b/net/sunrpc/xprtrdma/transport.c > @@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = { > .print_stats = xprt_rdma_print_stats, > .enable_swap = xprt_rdma_enable_swap, > .disable_swap = xprt_rdma_disable_swap, > - .inject_disconnect = xprt_rdma_inject_disconnect > + .inject_disconnect = xprt_rdma_inject_disconnect, > +#if defined(CONFIG_SUNRPC_BACKCHANNEL) > + .bc_setup = xprt_rdma_bc_setup, > + .bc_free_rqst = xprt_rdma_bc_free_rqst, > + .bc_destroy = xprt_rdma_bc_destroy, > +#endif > }; > > static struct xprt_class xprt_rdma = { > diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c > index 8d99214..1e4a948 100644 > --- a/net/sunrpc/xprtrdma/verbs.c > +++ b/net/sunrpc/xprtrdma/verbs.c > @@ -877,7 +877,22 @@ retry: > } > rc = ep->rep_connected; > } else { > + struct rpcrdma_xprt *r_xprt; > + unsigned int extras; > + > dprintk("RPC: %s: connected\n", __func__); > + > + r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); > + extras = r_xprt->rx_buf.rb_bc_srv_max_requests; > + > + if (extras) { > + rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); > + if (rc) > + pr_err("%s: could not post " > + "extra receive buffers: %i\n", > + __func__, rc); > + rc = 0; > + } > } > > out: > @@ -914,20 +929,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) > } > } > > -static struct rpcrdma_req * > +struct rpcrdma_req * > rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) > { > + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; > struct rpcrdma_req *req; > > req = kzalloc(sizeof(*req), GFP_KERNEL); > if (req == NULL) > return ERR_PTR(-ENOMEM); > > + INIT_LIST_HEAD(&req->rl_free); > + spin_lock(&buffer->rb_reqslock); > + list_add(&req->rl_all, &buffer->rb_allreqs); > + spin_unlock(&buffer->rb_reqslock); > req->rl_buffer = &r_xprt->rx_buf; > return req; > } > > -static struct rpcrdma_rep * > +struct rpcrdma_rep * > rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) > { > struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; > @@ -965,6 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) > int i, rc; > > buf->rb_max_requests = r_xprt->rx_data.max_requests; > + buf->rb_bc_srv_max_requests = 0; > spin_lock_init(&buf->rb_lock); > > rc = ia->ri_ops->ro_init(r_xprt); > @@ -972,6 +993,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) > goto out; > > INIT_LIST_HEAD(&buf->rb_send_bufs); > + INIT_LIST_HEAD(&buf->rb_allreqs); > + spin_lock_init(&buf->rb_reqslock); > for (i = 0; i < buf->rb_max_requests; i++) { > struct rpcrdma_req *req; > > @@ -982,6 +1005,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) > rc = PTR_ERR(req); > goto out; > } > + req->rl_backchannel = false; > list_add(&req->rl_free, &buf->rb_send_bufs); > } > > @@ -1008,19 +1032,13 @@ out: > static void > rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) > { > - if (!rep) > - return; > - > rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); > kfree(rep); > } > > -static void > +void > rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) > { > - if (!req) > - return; > - > rpcrdma_free_regbuf(ia, req->rl_sendbuf); > rpcrdma_free_regbuf(ia, req->rl_rdmabuf); > kfree(req); > @@ -1040,14 +1058,20 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) > rpcrdma_destroy_rep(ia, rep); > } > > - while (!list_empty(&buf->rb_send_bufs)) { > - struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next, > + spin_lock(&buf->rb_reqslock); > + while (!list_empty(&buf->rb_allreqs)) { > + struct rpcrdma_req *req = list_entry(buf->rb_allreqs.next, > struct rpcrdma_req, > - rl_free); > + rl_all); > + > + list_del(&req->rl_all); > + spin_unlock(&buf->rb_reqslock); > > - list_del(&req->rl_free); > rpcrdma_destroy_req(ia, req); > + > + spin_lock(&buf->rb_reqslock); > } > + spin_unlock(&buf->rb_reqslock); > > ia->ri_ops->ro_destroy(buf); > } > @@ -1094,7 +1118,7 @@ rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf) > > rep = list_first_entry(&buf->rb_recv_bufs, > struct rpcrdma_rep, rr_list); > - list_del(&rep->rr_list); > + list_del_init(&rep->rr_list); > > return rep; > } > @@ -1337,6 +1361,46 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, > return rc; > } > > +/** > + * rpcrdma_bc_post_recv - Post buffers to catch incoming backchannel requests > + * @r_xprt: transport associated with these backchannel resources > + * @min_reqs: minimum number of incoming requests expected > + * > + * Returns zero if all requested buffers were posted, or a negative errno. > + */ > +int > +rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) > +{ > + struct rpcrdma_ia *ia = &r_xprt->rx_ia; > + struct rpcrdma_ep *ep = &r_xprt->rx_ep; > + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; > + struct rpcrdma_rep *rep; > + unsigned long flags; > + int rc; > + > + while (count--) { > + rep = NULL; > + spin_lock_irqsave(&buffers->rb_lock, flags); > + if (!list_empty(&buffers->rb_recv_bufs)) > + rep = rpcrdma_buffer_get_locked(buffers); > + spin_unlock_irqrestore(&buffers->rb_lock, flags); > + if (!rep) { > + pr_err("%s: no extra receive buffers\n", __func__); > + return -ENOMEM; > + } > + > + rc = rpcrdma_ep_post_recv(ia, ep, rep); > + if (rc) { > + spin_lock_irqsave(&buffers->rb_lock, flags); > + rpcrdma_buffer_put_locked(rep, buffers); > + spin_unlock_irqrestore(&buffers->rb_lock, flags); > + return rc; > + } > + } > + > + return 0; > +} > + > /* How many chunk list items fit within our inline buffers? > */ > unsigned int > diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h > index e6a358f..2ca0567 100644 > --- a/net/sunrpc/xprtrdma/xprt_rdma.h > +++ b/net/sunrpc/xprtrdma/xprt_rdma.h > @@ -262,6 +262,9 @@ struct rpcrdma_req { > struct rpcrdma_regbuf *rl_rdmabuf; > struct rpcrdma_regbuf *rl_sendbuf; > struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; > + > + struct list_head rl_all; > + bool rl_backchannel; > }; > > static inline struct rpcrdma_req * > @@ -290,6 +293,10 @@ struct rpcrdma_buffer { > struct list_head rb_send_bufs; > struct list_head rb_recv_bufs; > u32 rb_max_requests; > + > + u32 rb_bc_srv_max_requests; > + spinlock_t rb_reqslock; /* protect rb_allreqs */ > + struct list_head rb_allreqs; > }; > #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) > > @@ -410,6 +417,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *, > /* > * Buffer calls - xprtrdma/verbs.c > */ > +struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *); > +struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); > +void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *); > int rpcrdma_buffer_create(struct rpcrdma_xprt *); > void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); > > @@ -426,6 +436,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, > struct rpcrdma_regbuf *); > > unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); > +int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); > > int frwr_alloc_recovery_wq(void); > void frwr_destroy_recovery_wq(void); > @@ -490,6 +501,15 @@ int rpcrdma_marshal_req(struct rpc_rqst *); > int xprt_rdma_init(void); > void xprt_rdma_cleanup(void); > > +/* Backchannel calls - xprtrdma/backchannel.c > + */ > +#if defined(CONFIG_SUNRPC_BACKCHANNEL) > +int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); > +int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); > +void xprt_rdma_bc_free_rqst(struct rpc_rqst *); > +void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); > +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ > + > /* Temporary NFS request map cache. Created in svc_rdma.c */ > extern struct kmem_cache *svc_rdma_map_cachep; > /* WR context cache. Created in svc_rdma.c */ > > -- > To unsubscribe from this list: send the line "unsubscribe linux-rdma" in > the body of a message to majordomo@vger.kernel.org > More majordomo info at http://vger.kernel.org/majordomo-info.html