Introduce client-side support for bi-directional RPC/RDMA.
Bi-directional RPC/RDMA is a pre-requisite for NFSv4.1 on RDMA
transports.
Also available in the "nfs-rdma-for-4.4" topic branch of this git repo:
git://git.linux-nfs.org/projects/cel/cel-2.6.git
Or for browsing:
http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfs-rdma-for-4.4
Changes since v1:
- Dropped "RFC" in Subject: line
- Rebased on v4.3-rc4 + Steve W's recent fixes
- NFS Server-side backchannel support postponed
- "xprtrdma: Replace global lkey" dropped, already merged
- Addressed Sagi's comments on "Replace send and receive arrays"
- Addressed Jason's comment regarding ib_req_notify_cq return code
- Moved RPC/RDMA reply handling into a work queue
I'm assuming recent list discussion has addressed Devesh's and
Sagi's concerns with "Prevent loss of completion signals". Let me
know if there is still a problem.
---
Chuck Lever (16):
xprtrdma: Enable swap-on-NFS/RDMA
xprtrdma: Re-arm after missed events
xprtrdma: Prevent loss of completion signals
xprtrdma: Refactor reply handler error handling
xprtrdma: Replace send and receive arrays
xprtrdma: Use workqueue to process RPC/RDMA replies
xprtrdma: Remove reply tasklet
xprtrdma: Saving IRQs no longer needed for rb_lock
SUNRPC: Abstract backchannel operations
xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers
xprtrdma: Pre-allocate Work Requests for backchannel
xprtrdma: Add support for sending backward direction RPC replies
xprtrdma: Handle incoming backward direction RPC calls
svcrdma: Add backward direction service for RPC/RDMA transport
SUNRPC: Remove the TCP-only restriction in bc_svc_process()
NFS: Enable client side NFSv4.1 backchannel to use other transports
fs/nfs/callback.c | 33 +-
include/linux/sunrpc/bc_xprt.h | 5
include/linux/sunrpc/svc_rdma.h | 6
include/linux/sunrpc/xprt.h | 7
net/sunrpc/backchannel_rqst.c | 24 +-
net/sunrpc/svc.c | 5
net/sunrpc/xprtrdma/Makefile | 1
net/sunrpc/xprtrdma/backchannel.c | 373 +++++++++++++++++++++++
net/sunrpc/xprtrdma/rpc_rdma.c | 148 ++++++---
net/sunrpc/xprtrdma/svc_rdma.c | 6
net/sunrpc/xprtrdma/svc_rdma_transport.c | 58 ++++
net/sunrpc/xprtrdma/transport.c | 18 +
net/sunrpc/xprtrdma/verbs.c | 479 +++++++++++++++---------------
net/sunrpc/xprtrdma/xprt_rdma.h | 53 +++
net/sunrpc/xprtsock.c | 16 +
15 files changed, 916 insertions(+), 316 deletions(-)
create mode 100644 net/sunrpc/xprtrdma/backchannel.c
--
Chuck Lever
After adding a swapfile on an NFS/RDMA mount and removing the
normal swap partition, I was able to push the NFS client well
into swap without any issue.
I forgot to swapoff the NFS file before rebooting. This pinned
the NFS mount and the IB core and provider, causing shutdown to
hang. I think this is expected and safe behavior. Probably
shutdown scripts should "swapoff -a" before unmounting any
filesystems.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/transport.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 41e452b..e9e5ed7 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
static int
xprt_rdma_enable_swap(struct rpc_xprt *xprt)
{
- return -EINVAL;
+ return 0;
}
static void
ib_req_notify_cq(IB_CQ_REPORT_MISSED_EVENTS) returns a positive
value if WCs were added to a CQ after the last completion upcall
but before the CQ has been re-armed.
Commit 7f23f6f6e388 ("xprtrmda: Reduce lock contention in
completion handlers") assumed that when ib_req_notify_cq() returned
a positive RC, the CQ had also been successfully re-armed, making
it safe to return control to the provider without losing any
completion signals. That is an invalid assumption.
Change both completion handlers to continue polling while
ib_req_notify_cq() returns a positive value.
Fixes: 7f23f6f6e388 ("xprtrmda: Reduce lock contention in ...")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 66 +++++++------------------------------------
1 file changed, 10 insertions(+), 56 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 8a477e2..c713909 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -179,38 +179,17 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
return 0;
}
-/*
- * Handle send, fast_reg_mr, and local_inv completions.
- *
- * Send events are typically suppressed and thus do not result
- * in an upcall. Occasionally one is signaled, however. This
- * prevents the provider's completion queue from wrapping and
- * losing a completion.
+/* Handle provider send completion upcalls.
*/
static void
rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
{
struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
- int rc;
-
- rc = rpcrdma_sendcq_poll(cq, ep);
- if (rc) {
- dprintk("RPC: %s: ib_poll_cq failed: %i\n",
- __func__, rc);
- return;
- }
- rc = ib_req_notify_cq(cq,
- IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
- if (rc == 0)
- return;
- if (rc < 0) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- return;
- }
-
- rpcrdma_sendcq_poll(cq, ep);
+ do {
+ rpcrdma_sendcq_poll(cq, ep);
+ } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+ IB_CQ_REPORT_MISSED_EVENTS) > 0);
}
static void
@@ -274,42 +253,17 @@ out_schedule:
return rc;
}
-/*
- * Handle receive completions.
- *
- * It is reentrant but processes single events in order to maintain
- * ordering of receives to keep server credits.
- *
- * It is the responsibility of the scheduled tasklet to return
- * recv buffers to the pool. NOTE: this affects synchronization of
- * connection shutdown. That is, the structures required for
- * the completion of the reply handler must remain intact until
- * all memory has been reclaimed.
+/* Handle provider receive completion upcalls.
*/
static void
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
{
struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
- int rc;
-
- rc = rpcrdma_recvcq_poll(cq, ep);
- if (rc) {
- dprintk("RPC: %s: ib_poll_cq failed: %i\n",
- __func__, rc);
- return;
- }
- rc = ib_req_notify_cq(cq,
- IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
- if (rc == 0)
- return;
- if (rc < 0) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- return;
- }
-
- rpcrdma_recvcq_poll(cq, ep);
+ do {
+ rpcrdma_recvcq_poll(cq, ep);
+ } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+ IB_CQ_REPORT_MISSED_EVENTS) > 0);
}
static void
Commit 8301a2c047cc ("xprtrdma: Limit work done by completion
handler") was supposed to prevent xprtrdma's upcall handlers from
starving other softIRQ work by letting them return to the provider
before all CQEs have been polled.
The logic assumes the provider will call the upcall handler again
immediately if the CQ is re-armed while there are still queued CQEs.
This assumption is invalid. The IBTA spec says that after a CQ is
armed, the hardware must interrupt only when a new CQE is inserted.
xprtrdma can't rely on the provider calling again, even though some
providers do.
Therefore, leaving CQEs on queue makes sense only when there is
another mechanism that ensures all remaining CQEs are consumed in a
timely fashion. xprtrdma does not have such a mechanism. If a CQE
remains queued, the transport can wait forever to send the next RPC.
Finally, move the wcs array back onto the stack to ensure that the
poll array is always local to the CPU where the completion upcall is
running.
Fixes: 8301a2c047cc ("xprtrdma: Limit work done by completion ...")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 74 ++++++++++++++++++++-------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 5 ---
2 files changed, 38 insertions(+), 41 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c713909..e9599e9 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -158,25 +158,30 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
}
}
-static int
-rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The common case is a single send completion is waiting. By
+ * passing two WC entries to ib_poll_cq, a return code of 1
+ * means there is exactly one WC waiting and no more. We don't
+ * have to invoke ib_poll_cq again to know that the CQ has been
+ * properly drained.
+ */
+static void
+rpcrdma_sendcq_poll(struct ib_cq *cq)
{
- struct ib_wc *wcs;
- int budget, count, rc;
+ struct ib_wc *pos, wcs[2];
+ int count, rc;
- budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
do {
- wcs = ep->rep_send_wcs;
+ pos = wcs;
- rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
- if (rc <= 0)
- return rc;
+ rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+ if (rc < 0)
+ break;
count = rc;
while (count-- > 0)
- rpcrdma_sendcq_process_wc(wcs++);
- } while (rc == RPCRDMA_POLLSIZE && --budget);
- return 0;
+ rpcrdma_sendcq_process_wc(pos++);
+ } while (rc == ARRAY_SIZE(wcs));
+ return;
}
/* Handle provider send completion upcalls.
@@ -184,10 +189,8 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
static void
rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
{
- struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-
do {
- rpcrdma_sendcq_poll(cq, ep);
+ rpcrdma_sendcq_poll(cq);
} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
IB_CQ_REPORT_MISSED_EVENTS) > 0);
}
@@ -226,31 +229,32 @@ out_fail:
goto out_schedule;
}
-static int
-rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The wc array is on stack: automatic memory is always CPU-local.
+ *
+ * struct ib_wc is 64 bytes, making the poll array potentially
+ * large. But this is at the bottom of the call chain. Further
+ * substantial work is done in another thread.
+ */
+static void
+rpcrdma_recvcq_poll(struct ib_cq *cq)
{
- struct list_head sched_list;
- struct ib_wc *wcs;
- int budget, count, rc;
+ struct ib_wc *pos, wcs[4];
+ LIST_HEAD(sched_list);
+ int count, rc;
- INIT_LIST_HEAD(&sched_list);
- budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
do {
- wcs = ep->rep_recv_wcs;
+ pos = wcs;
- rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
- if (rc <= 0)
- goto out_schedule;
+ rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+ if (rc < 0)
+ break;
count = rc;
while (count-- > 0)
- rpcrdma_recvcq_process_wc(wcs++, &sched_list);
- } while (rc == RPCRDMA_POLLSIZE && --budget);
- rc = 0;
+ rpcrdma_recvcq_process_wc(pos++, &sched_list);
+ } while (rc == ARRAY_SIZE(wcs));
-out_schedule:
rpcrdma_schedule_tasklet(&sched_list);
- return rc;
}
/* Handle provider receive completion upcalls.
@@ -258,10 +262,8 @@ out_schedule:
static void
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
{
- struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-
do {
- rpcrdma_recvcq_poll(cq, ep);
+ rpcrdma_recvcq_poll(cq);
} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
IB_CQ_REPORT_MISSED_EVENTS) > 0);
}
@@ -625,7 +627,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
- rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+ rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -642,7 +644,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
- rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+ rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c09414e..42c8d44 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
* RDMA Endpoint -- one per transport instance
*/
-#define RPCRDMA_WC_BUDGET (128)
-#define RPCRDMA_POLLSIZE (16)
-
struct rpcrdma_ep {
atomic_t rep_cqcount;
int rep_cqinit;
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
struct rdma_conn_param rep_remote_cma;
struct sockaddr_storage rep_remote_addr;
struct delayed_work rep_connect_worker;
- struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
- struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
};
/*
Clean up: The error cases in rpcrdma_reply_handler() almost never
execute. Ensure the compiler places them out of the hot path.
No behavior change expected.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 89 ++++++++++++++++++++++-----------------
net/sunrpc/xprtrdma/verbs.c | 2 -
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +
3 files changed, 53 insertions(+), 40 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index bc8bd65..60ffa63 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -741,52 +741,27 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
unsigned long cwnd;
u32 credits;
- /* Check status. If bad, signal disconnect and return rep to pool */
- if (rep->rr_len == ~0U) {
- rpcrdma_recv_buffer_put(rep);
- if (r_xprt->rx_ep.rep_connected == 1) {
- r_xprt->rx_ep.rep_connected = -EIO;
- rpcrdma_conn_func(&r_xprt->rx_ep);
- }
- return;
- }
- if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
- dprintk("RPC: %s: short/invalid reply\n", __func__);
- goto repost;
- }
+ dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
+
+ if (rep->rr_len == RPCRDMA_BAD_LEN)
+ goto out_badstatus;
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+ goto out_shortreply;
+
headerp = rdmab_to_msg(rep->rr_rdmabuf);
- if (headerp->rm_vers != rpcrdma_version) {
- dprintk("RPC: %s: invalid version %d\n",
- __func__, be32_to_cpu(headerp->rm_vers));
- goto repost;
- }
+ if (headerp->rm_vers != rpcrdma_version)
+ goto out_badversion;
/* Get XID and try for a match. */
spin_lock(&xprt->transport_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
- if (rqst == NULL) {
- spin_unlock(&xprt->transport_lock);
- dprintk("RPC: %s: reply 0x%p failed "
- "to match any request xid 0x%08x len %d\n",
- __func__, rep, be32_to_cpu(headerp->rm_xid),
- rep->rr_len);
-repost:
- r_xprt->rx_stats.bad_reply_count++;
- if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
- rpcrdma_recv_buffer_put(rep);
-
- return;
- }
+ if (!rqst)
+ goto out_nomatch;
/* get request object */
req = rpcr_to_rdmar(rqst);
- if (req->rl_reply) {
- spin_unlock(&xprt->transport_lock);
- dprintk("RPC: %s: duplicate reply 0x%p to RPC "
- "request 0x%p: xid 0x%08x\n", __func__, rep, req,
- be32_to_cpu(headerp->rm_xid));
- goto repost;
- }
+ if (req->rl_reply)
+ goto out_duplicate;
dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
" RPC request 0x%p xid 0x%08x\n",
@@ -883,8 +858,44 @@ badheader:
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
+ xprt_complete_rqst(rqst->rq_task, status);
+ spin_unlock(&xprt->transport_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status);
- xprt_complete_rqst(rqst->rq_task, status);
+ return;
+
+out_badstatus:
+ rpcrdma_recv_buffer_put(rep);
+ if (r_xprt->rx_ep.rep_connected == 1) {
+ r_xprt->rx_ep.rep_connected = -EIO;
+ rpcrdma_conn_func(&r_xprt->rx_ep);
+ }
+ return;
+
+out_shortreply:
+ dprintk("RPC: %s: short/invalid reply\n", __func__);
+ goto repost;
+
+out_badversion:
+ dprintk("RPC: %s: invalid version %d\n",
+ __func__, be32_to_cpu(headerp->rm_vers));
+ goto repost;
+
+out_nomatch:
+ spin_unlock(&xprt->transport_lock);
+ dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
+ __func__, be32_to_cpu(headerp->rm_xid),
+ rep->rr_len);
+ goto repost;
+
+out_duplicate:
spin_unlock(&xprt->transport_lock);
+ dprintk("RPC: %s: "
+ "duplicate reply %p to RPC request %p: xid 0x%08x\n",
+ __func__, rep, req, be32_to_cpu(headerp->rm_xid));
+
+repost:
+ r_xprt->rx_stats.bad_reply_count++;
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ rpcrdma_recv_buffer_put(rep);
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e9599e9..0076129 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -225,7 +225,7 @@ out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: rep %p: %s\n",
__func__, rep, ib_wc_status_msg(wc->status));
- rep->rr_len = ~0U;
+ rep->rr_len = RPCRDMA_BAD_LEN;
goto out_schedule;
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 42c8d44..a13508b 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -168,6 +168,8 @@ struct rpcrdma_rep {
struct rpcrdma_regbuf *rr_rdmabuf;
};
+#define RPCRDMA_BAD_LEN (~0U)
+
/*
* struct rpcrdma_mw - external memory region metadata
*
The rb_send_bufs and rb_recv_bufs arrays are used to implement a
pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep
structs. Replace those arrays with free lists.
To allow more than 512 RPCs in-flight at once, each of these arrays
would be larger than a page (assuming 8-byte addresses and 4KB
pages). Allowing up to 64K in-flight RPCs (as TCP now does), each
buffer array would have to be 128 pages. That's an order-6
allocation. (Not that we're going there.)
A list is easier to expand dynamically. Instead of allocating a
larger array of pointers and copying the existing pointers to the
new array, simply append more buffers to each list.
This also makes it simpler to manage receive buffers that might
catch backwards-direction calls, or to post receive buffers in
bulk to amortize the overhead of ib_post_recv.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 155 +++++++++++++++++----------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 9 +-
2 files changed, 73 insertions(+), 91 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 0076129..ab26392 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -928,44 +928,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- char *p;
- size_t len;
int i, rc;
- buf->rb_max_requests = cdata->max_requests;
+ buf->rb_max_requests = r_xprt->rx_data.max_requests;
spin_lock_init(&buf->rb_lock);
- /* Need to allocate:
- * 1. arrays for send and recv pointers
- * 2. arrays of struct rpcrdma_req to fill in pointers
- * 3. array of struct rpcrdma_rep for replies
- * Send/recv buffers in req/rep need to be registered
- */
- len = buf->rb_max_requests *
- (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-
- p = kzalloc(len, GFP_KERNEL);
- if (p == NULL) {
- dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
- __func__, len);
- rc = -ENOMEM;
- goto out;
- }
- buf->rb_pool = p; /* for freeing it later */
-
- buf->rb_send_bufs = (struct rpcrdma_req **) p;
- p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
- buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
- p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
-
rc = ia->ri_ops->ro_init(r_xprt);
if (rc)
goto out;
+ INIT_LIST_HEAD(&buf->rb_send_bufs);
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
- struct rpcrdma_rep *rep;
req = rpcrdma_create_req(r_xprt);
if (IS_ERR(req)) {
@@ -974,7 +948,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(req);
goto out;
}
- buf->rb_send_bufs[i] = req;
+ list_add(&req->rl_free, &buf->rb_send_bufs);
+ }
+
+ INIT_LIST_HEAD(&buf->rb_recv_bufs);
+ for (i = 0; i < buf->rb_max_requests + 2; i++) {
+ struct rpcrdma_rep *rep;
rep = rpcrdma_create_rep(r_xprt);
if (IS_ERR(rep)) {
@@ -983,7 +962,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(rep);
goto out;
}
- buf->rb_recv_bufs[i] = rep;
+ list_add(&rep->rr_list, &buf->rb_recv_bufs);
}
return 0;
@@ -992,6 +971,28 @@ out:
return rc;
}
+static struct rpcrdma_req *
+rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_req *req;
+
+ req = list_first_entry(&buf->rb_send_bufs,
+ struct rpcrdma_req, rl_free);
+ list_del(&req->rl_free);
+ return req;
+}
+
+static struct rpcrdma_rep *
+rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_rep *rep;
+
+ rep = list_first_entry(&buf->rb_recv_bufs,
+ struct rpcrdma_rep, rr_list);
+ list_del(&rep->rr_list);
+ return rep;
+}
+
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
@@ -1017,25 +1018,22 @@ void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
- int i;
- /* clean up in reverse order from create
- * 1. recv mr memory (mr free, then kfree)
- * 2. send mr memory (mr free, then kfree)
- * 3. MWs
- */
- dprintk("RPC: %s: entering\n", __func__);
+ while (!list_empty(&buf->rb_recv_bufs)) {
+ struct rpcrdma_rep *rep;
- for (i = 0; i < buf->rb_max_requests; i++) {
- if (buf->rb_recv_bufs)
- rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
- if (buf->rb_send_bufs)
- rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
+ rep = rpcrdma_buffer_get_rep_locked(buf);
+ rpcrdma_destroy_rep(ia, rep);
}
- ia->ri_ops->ro_destroy(buf);
+ while (!list_empty(&buf->rb_send_bufs)) {
+ struct rpcrdma_req *req;
- kfree(buf->rb_pool);
+ req = rpcrdma_buffer_get_req_locked(buf);
+ rpcrdma_destroy_req(ia, req);
+ }
+
+ ia->ri_ops->ro_destroy(buf);
}
struct rpcrdma_mw *
@@ -1067,25 +1065,10 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
spin_unlock(&buf->rb_mwlock);
}
-static void
-rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
- buf->rb_send_bufs[--buf->rb_send_index] = req;
- req->rl_niovs = 0;
- if (req->rl_reply) {
- buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
- req->rl_reply = NULL;
- }
-}
-
/*
* Get a set of request/reply buffers.
*
- * Reply buffer (if needed) is attached to send buffer upon return.
- * Rule:
- * rb_send_index and rb_recv_index MUST always be pointing to the
- * *next* available buffer (non-NULL). They are incremented after
- * removing buffers, and decremented *before* returning them.
+ * Reply buffer (if available) is attached to send buffer upon return.
*/
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
@@ -1094,26 +1077,23 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
unsigned long flags;
spin_lock_irqsave(&buffers->rb_lock, flags);
+ if (list_empty(&buffers->rb_send_bufs))
+ goto out_reqbuf;
+ req = rpcrdma_buffer_get_req_locked(buffers);
+ if (list_empty(&buffers->rb_recv_bufs))
+ goto out_repbuf;
+ req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ return req;
- if (buffers->rb_send_index == buffers->rb_max_requests) {
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
- dprintk("RPC: %s: out of request buffers\n", __func__);
- return ((struct rpcrdma_req *)NULL);
- }
-
- req = buffers->rb_send_bufs[buffers->rb_send_index];
- if (buffers->rb_send_index < buffers->rb_recv_index) {
- dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
- __func__,
- buffers->rb_recv_index - buffers->rb_send_index);
- req->rl_reply = NULL;
- } else {
- req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
- buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
- }
- buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
-
+out_reqbuf:
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ pr_warn("RPC: %s: out of request buffers\n", __func__);
+ return NULL;
+out_repbuf:
spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ pr_warn("RPC: %s: out of reply buffers\n", __func__);
+ req->rl_reply = NULL;
return req;
}
@@ -1125,17 +1105,22 @@ void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
+ struct rpcrdma_rep *rep = req->rl_reply;
unsigned long flags;
+ req->rl_niovs = 0;
+ req->rl_reply = NULL;
+
spin_lock_irqsave(&buffers->rb_lock, flags);
- rpcrdma_buffer_put_sendbuf(req, buffers);
+ list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+ if (rep)
+ list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock_irqrestore(&buffers->rb_lock, flags);
}
/*
* Recover reply buffers from pool.
- * This happens when recovering from error conditions.
- * Post-increment counter/array index.
+ * This happens when recovering from disconnect.
*/
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
@@ -1144,10 +1129,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
unsigned long flags;
spin_lock_irqsave(&buffers->rb_lock, flags);
- if (buffers->rb_recv_index < buffers->rb_max_requests) {
- req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
- buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
- }
+ if (!list_empty(&buffers->rb_recv_bufs))
+ req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
spin_unlock_irqrestore(&buffers->rb_lock, flags);
}
@@ -1162,7 +1145,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
unsigned long flags;
spin_lock_irqsave(&buffers->rb_lock, flags);
- buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
+ list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock_irqrestore(&buffers->rb_lock, flags);
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a13508b..e6a358f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -252,6 +252,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
#define RPCRDMA_MAX_IOVS (2)
struct rpcrdma_req {
+ struct list_head rl_free;
unsigned int rl_niovs;
unsigned int rl_nchunks;
unsigned int rl_connect_cookie;
@@ -285,12 +286,10 @@ struct rpcrdma_buffer {
struct list_head rb_all;
char *rb_pool;
- spinlock_t rb_lock; /* protect buf arrays */
+ spinlock_t rb_lock; /* protect buf lists */
+ struct list_head rb_send_bufs;
+ struct list_head rb_recv_bufs;
u32 rb_max_requests;
- int rb_send_index;
- int rb_recv_index;
- struct rpcrdma_req **rb_send_bufs;
- struct rpcrdma_rep **rb_recv_bufs;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
The reply tasklet is fast, but it's single threaded. After reply
traffic saturates a single CPU, there's no more reply processing
capacity.
Replace the tasklet with a workqueue to spread reply handling across
all CPUs. This also moves RPC/RDMA reply handling out of the soft
IRQ context and into a context that allows sleeps.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 17 +++++++-----
net/sunrpc/xprtrdma/transport.c | 8 ++++++
net/sunrpc/xprtrdma/verbs.c | 54 ++++++++++++++++++++++++++++++++-------
net/sunrpc/xprtrdma/xprt_rdma.h | 4 +++
4 files changed, 65 insertions(+), 18 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 60ffa63..95774fc 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -723,8 +723,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
schedule_delayed_work(&ep->rep_connect_worker, 0);
}
-/*
- * Called as a tasklet to do req/reply match and complete a request
+/* Process received RPC/RDMA messages.
+ *
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
*/
@@ -752,13 +752,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
if (headerp->rm_vers != rpcrdma_version)
goto out_badversion;
- /* Get XID and try for a match. */
- spin_lock(&xprt->transport_lock);
+ /* Match incoming rpcrdma_rep to an rpcrdma_req to
+ * get context for handling any incoming chunks.
+ */
+ spin_lock_bh(&xprt->transport_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
if (!rqst)
goto out_nomatch;
- /* get request object */
req = rpcr_to_rdmar(rqst);
if (req->rl_reply)
goto out_duplicate;
@@ -859,7 +860,7 @@ badheader:
xprt_release_rqst_cong(rqst->rq_task);
xprt_complete_rqst(rqst->rq_task, status);
- spin_unlock(&xprt->transport_lock);
+ spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status);
return;
@@ -882,14 +883,14 @@ out_badversion:
goto repost;
out_nomatch:
- spin_unlock(&xprt->transport_lock);
+ spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
__func__, be32_to_cpu(headerp->rm_xid),
rep->rr_len);
goto repost;
out_duplicate:
- spin_unlock(&xprt->transport_lock);
+ spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: "
"duplicate reply %p to RPC request %p: xid 0x%08x\n",
__func__, rep, req, be32_to_cpu(headerp->rm_xid));
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index e9e5ed7..897a2f3 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -732,6 +732,7 @@ void xprt_rdma_cleanup(void)
dprintk("RPC: %s: xprt_unregister returned %i\n",
__func__, rc);
+ rpcrdma_destroy_wq();
frwr_destroy_recovery_wq();
}
@@ -743,8 +744,15 @@ int xprt_rdma_init(void)
if (rc)
return rc;
+ rc = rpcrdma_alloc_wq();
+ if (rc) {
+ frwr_destroy_recovery_wq();
+ return rc;
+ }
+
rc = xprt_register_transport(&xprt_rdma);
if (rc) {
+ rpcrdma_destroy_wq();
frwr_destroy_recovery_wq();
return rc;
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ab26392..cf2f5b3 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data)
static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
+static struct workqueue_struct *rpcrdma_receive_wq;
+
+int
+rpcrdma_alloc_wq(void)
+{
+ struct workqueue_struct *recv_wq;
+
+ recv_wq = alloc_workqueue("xprtrdma_receive",
+ WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+ 0);
+ if (!recv_wq)
+ return -ENOMEM;
+
+ rpcrdma_receive_wq = recv_wq;
+ return 0;
+}
+
+void
+rpcrdma_destroy_wq(void)
+{
+ struct workqueue_struct *wq;
+
+ if (rpcrdma_receive_wq) {
+ wq = rpcrdma_receive_wq;
+ rpcrdma_receive_wq = NULL;
+ destroy_workqueue(wq);
+ }
+}
+
static void
rpcrdma_schedule_tasklet(struct list_head *sched_list)
{
@@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
}
static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_receive_worker(struct work_struct *work)
+{
+ struct rpcrdma_rep *rep =
+ container_of(work, struct rpcrdma_rep, rr_work);
+
+ rpcrdma_reply_handler(rep);
+}
+
+static void
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
{
struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
prefetch(rdmab_to_msg(rep->rr_rdmabuf));
out_schedule:
- list_add_tail(&rep->rr_list, sched_list);
+ queue_work(rpcrdma_receive_wq, &rep->rr_work);
return;
+
out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: rep %p: %s\n",
@@ -239,7 +278,6 @@ static void
rpcrdma_recvcq_poll(struct ib_cq *cq)
{
struct ib_wc *pos, wcs[4];
- LIST_HEAD(sched_list);
int count, rc;
do {
@@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq)
count = rc;
while (count-- > 0)
- rpcrdma_recvcq_process_wc(pos++, &sched_list);
+ rpcrdma_recvcq_process_wc(pos++);
} while (rc == ARRAY_SIZE(wcs));
-
- rpcrdma_schedule_tasklet(&sched_list);
}
/* Handle provider receive completion upcalls.
@@ -272,12 +308,9 @@ static void
rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
{
struct ib_wc wc;
- LIST_HEAD(sched_list);
while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
- rpcrdma_recvcq_process_wc(&wc, &sched_list);
- if (!list_empty(&sched_list))
- rpcrdma_schedule_tasklet(&sched_list);
+ rpcrdma_recvcq_process_wc(&wc);
while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
rpcrdma_sendcq_process_wc(&wc);
}
@@ -915,6 +948,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_device = ia->ri_device;
rep->rr_rxprt = r_xprt;
+ INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
return rep;
out_free:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index e6a358f..6ea1dbe 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -164,6 +164,7 @@ struct rpcrdma_rep {
unsigned int rr_len;
struct ib_device *rr_device;
struct rpcrdma_xprt *rr_rxprt;
+ struct work_struct rr_work;
struct list_head rr_list;
struct rpcrdma_regbuf *rr_rdmabuf;
};
@@ -430,6 +431,9 @@ unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
int frwr_alloc_recovery_wq(void);
void frwr_destroy_recovery_wq(void);
+int rpcrdma_alloc_wq(void);
+void rpcrdma_destroy_wq(void);
+
/*
* Wrappers for chunk registration, shared by read/write chunk code.
*/
Clean up: The reply tasklet is no longer used.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 43 -------------------------------------------
1 file changed, 43 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index cf2f5b3..a4102fc 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -68,38 +68,6 @@
* internal functions
*/
-/*
- * handle replies in tasklet context, using a single, global list
- * rdma tasklet function -- just turn around and call the func
- * for all replies on the list
- */
-
-static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
-static LIST_HEAD(rpcrdma_tasklets_g);
-
-static void
-rpcrdma_run_tasklet(unsigned long data)
-{
- struct rpcrdma_rep *rep;
- unsigned long flags;
-
- data = data;
- spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
- while (!list_empty(&rpcrdma_tasklets_g)) {
- rep = list_entry(rpcrdma_tasklets_g.next,
- struct rpcrdma_rep, rr_list);
- list_del(&rep->rr_list);
- spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
-
- rpcrdma_reply_handler(rep);
-
- spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
- }
- spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
-}
-
-static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
-
static struct workqueue_struct *rpcrdma_receive_wq;
int
@@ -130,17 +98,6 @@ rpcrdma_destroy_wq(void)
}
static void
-rpcrdma_schedule_tasklet(struct list_head *sched_list)
-{
- unsigned long flags;
-
- spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
- list_splice_tail(sched_list, &rpcrdma_tasklets_g);
- spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
- tasklet_schedule(&rpcrdma_tasklet_g);
-}
-
-static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
struct rpcrdma_ep *ep = context;
Now that RPC replies are processed in a workqueue, there's no need
to disable IRQs when managing send and receive buffers. This saves
noticeable overhead per RPC.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 24 ++++++++++--------------
1 file changed, 10 insertions(+), 14 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index a4102fc..2dca18d 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1065,24 +1065,23 @@ struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
struct rpcrdma_req *req;
- unsigned long flags;
- spin_lock_irqsave(&buffers->rb_lock, flags);
+ spin_lock(&buffers->rb_lock);
if (list_empty(&buffers->rb_send_bufs))
goto out_reqbuf;
req = rpcrdma_buffer_get_req_locked(buffers);
if (list_empty(&buffers->rb_recv_bufs))
goto out_repbuf;
req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ spin_unlock(&buffers->rb_lock);
return req;
out_reqbuf:
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ spin_unlock(&buffers->rb_lock);
pr_warn("RPC: %s: out of request buffers\n", __func__);
return NULL;
out_repbuf:
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ spin_unlock(&buffers->rb_lock);
pr_warn("RPC: %s: out of reply buffers\n", __func__);
req->rl_reply = NULL;
return req;
@@ -1097,16 +1096,15 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
struct rpcrdma_rep *rep = req->rl_reply;
- unsigned long flags;
req->rl_niovs = 0;
req->rl_reply = NULL;
- spin_lock_irqsave(&buffers->rb_lock, flags);
+ spin_lock(&buffers->rb_lock);
list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
if (rep)
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ spin_unlock(&buffers->rb_lock);
}
/*
@@ -1117,12 +1115,11 @@ void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
- unsigned long flags;
- spin_lock_irqsave(&buffers->rb_lock, flags);
+ spin_lock(&buffers->rb_lock);
if (!list_empty(&buffers->rb_recv_bufs))
req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ spin_unlock(&buffers->rb_lock);
}
/*
@@ -1133,11 +1130,10 @@ void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
- unsigned long flags;
- spin_lock_irqsave(&buffers->rb_lock, flags);
+ spin_lock(&buffers->rb_lock);
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ spin_unlock(&buffers->rb_lock);
}
/*
xprt_{setup,destroy}_backchannel() won't be adequate for RPC/RMDA
bi-direction. In particular, receive buffers have to be pre-
registered and posted in order to receive incoming backchannel
requests.
Add a virtual function call to allow the insertion of appropriate
backchannel setup and destruction methods for each transport.
In addition, freeing a backchannel request is a little different
for RPC/RDMA. Introduce an rpc_xprt_op to handle the difference.
Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/bc_xprt.h | 5 +++++
include/linux/sunrpc/xprt.h | 5 +++++
net/sunrpc/backchannel_rqst.c | 24 ++++++++++++++++++++++--
net/sunrpc/xprtsock.c | 15 +++++++++++++++
4 files changed, 47 insertions(+), 2 deletions(-)
diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
index 8df43c9f..4397a48 100644
--- a/include/linux/sunrpc/bc_xprt.h
+++ b/include/linux/sunrpc/bc_xprt.h
@@ -38,6 +38,11 @@ void xprt_free_bc_request(struct rpc_rqst *req);
int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs);
void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
+/* Socket backchannel transport methods */
+int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs);
+void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs);
+void xprt_free_bc_rqst(struct rpc_rqst *req);
+
/*
* Determine if a shared backchannel is in use
*/
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 0fb9acb..3f79c4a 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -136,6 +136,11 @@ struct rpc_xprt_ops {
int (*enable_swap)(struct rpc_xprt *xprt);
void (*disable_swap)(struct rpc_xprt *xprt);
void (*inject_disconnect)(struct rpc_xprt *xprt);
+ int (*bc_setup)(struct rpc_xprt *xprt,
+ unsigned int min_reqs);
+ void (*bc_free_rqst)(struct rpc_rqst *rqst);
+ void (*bc_destroy)(struct rpc_xprt *xprt,
+ unsigned int max_reqs);
};
/*
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 6255d14..229956b 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -138,6 +138,14 @@ out_free:
*/
int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
{
+ if (!xprt->ops->bc_setup)
+ return 0;
+ return xprt->ops->bc_setup(xprt, min_reqs);
+}
+EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
+
+int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
+{
struct rpc_rqst *req;
struct list_head tmp_list;
int i;
@@ -192,7 +200,6 @@ out_free:
dprintk("RPC: setup backchannel transport failed\n");
return -ENOMEM;
}
-EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
/**
* xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
@@ -205,6 +212,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
*/
void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
{
+ if (xprt->ops->bc_destroy)
+ xprt->ops->bc_destroy(xprt, max_reqs);
+}
+EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
+
+void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
+{
struct rpc_rqst *req = NULL, *tmp = NULL;
dprintk("RPC: destroy backchannel transport\n");
@@ -227,7 +241,6 @@ out:
dprintk("RPC: backchannel list empty= %s\n",
list_empty(&xprt->bc_pa_list) ? "true" : "false");
}
-EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
{
@@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req)
{
struct rpc_xprt *xprt = req->rq_xprt;
+ xprt->ops->bc_free_rqst(req);
+}
+
+void xprt_free_bc_rqst(struct rpc_rqst *req)
+{
+ struct rpc_xprt *xprt = req->rq_xprt;
+
dprintk("RPC: free backchannel req=%p\n", req);
req->rq_connect_cookie = xprt->connect_cookie - 1;
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 1a85e0e..3084163 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2539,6 +2539,11 @@ static struct rpc_xprt_ops xs_local_ops = {
.print_stats = xs_local_print_stats,
.enable_swap = xs_enable_swap,
.disable_swap = xs_disable_swap,
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+ .bc_setup = xprt_setup_bc,
+ .bc_free_rqst = xprt_free_bc_rqst,
+ .bc_destroy = xprt_destroy_bc,
+#endif
};
static struct rpc_xprt_ops xs_udp_ops = {
@@ -2561,6 +2566,11 @@ static struct rpc_xprt_ops xs_udp_ops = {
.enable_swap = xs_enable_swap,
.disable_swap = xs_disable_swap,
.inject_disconnect = xs_inject_disconnect,
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+ .bc_setup = xprt_setup_bc,
+ .bc_free_rqst = xprt_free_bc_rqst,
+ .bc_destroy = xprt_destroy_bc,
+#endif
};
static struct rpc_xprt_ops xs_tcp_ops = {
@@ -2580,6 +2590,11 @@ static struct rpc_xprt_ops xs_tcp_ops = {
.enable_swap = xs_enable_swap,
.disable_swap = xs_disable_swap,
.inject_disconnect = xs_inject_disconnect,
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+ .bc_setup = xprt_setup_bc,
+ .bc_free_rqst = xprt_free_bc_rqst,
+ .bc_destroy = xprt_destroy_bc,
+#endif
};
/*
xprtrdma's backward direction send and receive buffers are the same
size as the forechannel's inline threshold, and must be pre-
registered.
The consumer has no control over which receive buffer the adapter
chooses to catch an incoming backwards-direction call. Any receive
buffer can be used for either a forward reply or a backward call.
Thus both types of RPC message must all be the same size.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/Makefile | 1
net/sunrpc/xprtrdma/backchannel.c | 206 +++++++++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/transport.c | 7 +
net/sunrpc/xprtrdma/verbs.c | 87 ++++++++++++++--
net/sunrpc/xprtrdma/xprt_rdma.h | 20 ++++
5 files changed, 309 insertions(+), 12 deletions(-)
create mode 100644 net/sunrpc/xprtrdma/backchannel.c
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 48913de..33f99d3 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
svc_rdma.o svc_rdma_transport.o \
svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
module.o
+rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
new file mode 100644
index 0000000..3d01b32
--- /dev/null
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -0,0 +1,206 @@
+/*
+ * Copyright (c) 2015 Oracle. All rights reserved.
+ *
+ * Support for backward direction RPCs on RPC/RDMA.
+ */
+
+#include <linux/module.h>
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
+ struct rpc_rqst *rqst)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+
+ spin_lock(&buf->rb_reqslock);
+ list_del(&req->rl_all);
+ spin_unlock(&buf->rb_reqslock);
+
+ rpcrdma_destroy_req(&r_xprt->rx_ia, req);
+
+ kfree(rqst);
+}
+
+static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
+ struct rpc_rqst *rqst)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_regbuf *rb;
+ struct rpcrdma_req *req;
+ struct xdr_buf *buf;
+ size_t size;
+
+ req = rpcrdma_create_req(r_xprt);
+ if (!req)
+ return -ENOMEM;
+ req->rl_backchannel = true;
+
+ size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+ rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ if (IS_ERR(rb))
+ goto out_fail;
+ req->rl_rdmabuf = rb;
+
+ size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+ rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ if (IS_ERR(rb))
+ goto out_fail;
+ rb->rg_owner = req;
+ req->rl_sendbuf = rb;
+ /* so that rpcr_to_rdmar works when receiving a request */
+ rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
+
+ buf = &rqst->rq_snd_buf;
+ buf->head[0].iov_base = rqst->rq_buffer;
+ buf->head[0].iov_len = 0;
+ buf->tail[0].iov_base = NULL;
+ buf->tail[0].iov_len = 0;
+ buf->page_len = 0;
+ buf->len = 0;
+ buf->buflen = size;
+
+ return 0;
+
+out_fail:
+ rpcrdma_bc_free_rqst(r_xprt, rqst);
+ return -ENOMEM;
+}
+
+/* Allocate and add receive buffers to the rpcrdma_buffer's
+ * existing list of rep's. These are released when the
+ * transport is destroyed.
+ */
+static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
+ unsigned int count)
+{
+ struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+ struct rpcrdma_rep *rep;
+ unsigned long flags;
+ int rc = 0;
+
+ while (count--) {
+ rep = rpcrdma_create_rep(r_xprt);
+ if (IS_ERR(rep)) {
+ pr_err("RPC: %s: reply buffer alloc failed\n",
+ __func__);
+ rc = PTR_ERR(rep);
+ break;
+ }
+
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ }
+
+ return rc;
+}
+
+/**
+ * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of concurrent incoming requests to expect
+ *
+ * Returns 0 on success; otherwise a negative errno
+ */
+int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
+ struct rpc_rqst *rqst;
+ unsigned int i;
+ int rc;
+
+ /* The backchannel reply path returns each rpc_rqst to the
+ * bc_pa_list _after_ the reply is sent. If the server is
+ * faster than the client, it can send another backward
+ * direction request before the rpc_rqst is returned to the
+ * list. The client rejects the request in this case.
+ *
+ * Twice as many rpc_rqsts are prepared to ensure there is
+ * always an rpc_rqst available as soon as a reply is sent.
+ */
+ for (i = 0; i < (reqs << 1); i++) {
+ rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
+ if (!rqst) {
+ pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
+ __func__);
+ goto out_free;
+ }
+
+ rqst->rq_xprt = &r_xprt->rx_xprt;
+ INIT_LIST_HEAD(&rqst->rq_list);
+ INIT_LIST_HEAD(&rqst->rq_bc_list);
+
+ if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
+ goto out_free;
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+ spin_unlock_bh(&xprt->bc_pa_lock);
+ }
+
+ rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
+ if (rc)
+ goto out_free;
+
+ rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
+ if (rc)
+ goto out_free;
+
+ buffer->rb_bc_srv_max_requests = reqs;
+ request_module("svcrdma");
+
+ return 0;
+
+out_free:
+ xprt_rdma_bc_destroy(xprt, reqs);
+
+ pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
+ return -ENOMEM;
+}
+
+/**
+ * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of incoming requests to destroy; ignored
+ */
+void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpc_rqst *rqst, *tmp;
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
+ list_del(&rqst->rq_bc_pa_list);
+ spin_unlock_bh(&xprt->bc_pa_lock);
+
+ rpcrdma_bc_free_rqst(r_xprt, rqst);
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ }
+ spin_unlock_bh(&xprt->bc_pa_lock);
+}
+
+/**
+ * xprt_rdma_bc_free_rqst - Release a backchannel rqst
+ * @rqst: request to release
+ */
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+
+ smp_mb__before_atomic();
+ WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
+ clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+ smp_mb__after_atomic();
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+ spin_unlock_bh(&xprt->bc_pa_lock);
+}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 897a2f3..845278e 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,
- .inject_disconnect = xprt_rdma_inject_disconnect
+ .inject_disconnect = xprt_rdma_inject_disconnect,
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ .bc_setup = xprt_rdma_bc_setup,
+ .bc_free_rqst = xprt_rdma_bc_free_rqst,
+ .bc_destroy = xprt_rdma_bc_destroy,
+#endif
};
static struct xprt_class xprt_rdma = {
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 2dca18d..d920367 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -833,7 +833,21 @@ retry:
}
rc = ep->rep_connected;
} else {
+ struct rpcrdma_xprt *r_xprt;
+ unsigned int extras;
+
dprintk("RPC: %s: connected\n", __func__);
+
+ r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+ extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+
+ if (extras) {
+ rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+ if (rc)
+ pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
+ __func__, rc);
+ rc = 0;
+ }
}
out:
@@ -870,20 +884,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
}
}
-static struct rpcrdma_req *
+struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_req *req;
req = kzalloc(sizeof(*req), GFP_KERNEL);
if (req == NULL)
return ERR_PTR(-ENOMEM);
+ INIT_LIST_HEAD(&req->rl_free);
+ spin_lock(&buffer->rb_reqslock);
+ list_add(&req->rl_all, &buffer->rb_allreqs);
+ spin_unlock(&buffer->rb_reqslock);
req->rl_buffer = &r_xprt->rx_buf;
return req;
}
-static struct rpcrdma_rep *
+struct rpcrdma_rep *
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -922,6 +941,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
int i, rc;
buf->rb_max_requests = r_xprt->rx_data.max_requests;
+ buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);
rc = ia->ri_ops->ro_init(r_xprt);
@@ -929,6 +949,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
goto out;
INIT_LIST_HEAD(&buf->rb_send_bufs);
+ INIT_LIST_HEAD(&buf->rb_allreqs);
+ spin_lock_init(&buf->rb_reqslock);
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
@@ -939,6 +961,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(req);
goto out;
}
+ req->rl_backchannel = false;
list_add(&req->rl_free, &buf->rb_send_bufs);
}
@@ -987,19 +1010,13 @@ rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
- if (!rep)
- return;
-
rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
kfree(rep);
}
-static void
+void
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
- if (!req)
- return;
-
rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
kfree(req);
@@ -1017,12 +1034,19 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
rpcrdma_destroy_rep(ia, rep);
}
- while (!list_empty(&buf->rb_send_bufs)) {
+ spin_lock(&buf->rb_reqslock);
+ while (!list_empty(&buf->rb_allreqs)) {
struct rpcrdma_req *req;
- req = rpcrdma_buffer_get_req_locked(buf);
+ req = list_first_entry(&buf->rb_allreqs,
+ struct rpcrdma_req, rl_all);
+ list_del(&req->rl_all);
+
+ spin_unlock(&buf->rb_reqslock);
rpcrdma_destroy_req(ia, req);
+ spin_lock(&buf->rb_reqslock);
}
+ spin_unlock(&buf->rb_reqslock);
ia->ri_ops->ro_destroy(buf);
}
@@ -1290,6 +1314,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
return rc;
}
+/**
+ * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
+ * @r_xprt: transport associated with these backchannel resources
+ * @min_reqs: minimum number of incoming requests expected
+ *
+ * Returns zero if all requested buffers were posted, or a negative errno.
+ */
+int
+rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+{
+ struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_rep *rep;
+ unsigned long flags;
+ int rc;
+
+ while (count--) {
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ if (list_empty(&buffers->rb_recv_bufs))
+ goto out_reqbuf;
+ rep = rpcrdma_buffer_get_rep_locked(buffers);
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+
+ rc = rpcrdma_ep_post_recv(ia, ep, rep);
+ if (rc)
+ goto out_rc;
+ }
+
+ return 0;
+
+out_reqbuf:
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ pr_warn("%s: no extra receive buffers\n", __func__);
+ return -ENOMEM;
+
+out_rc:
+ rpcrdma_recv_buffer_put(rep);
+ return rc;
+}
+
/* How many chunk list items fit within our inline buffers?
*/
unsigned int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 6ea1dbe..1eb86c79f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -263,6 +263,9 @@ struct rpcrdma_req {
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
+
+ struct list_head rl_all;
+ bool rl_backchannel;
};
static inline struct rpcrdma_req *
@@ -291,6 +294,10 @@ struct rpcrdma_buffer {
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
u32 rb_max_requests;
+
+ u32 rb_bc_srv_max_requests;
+ spinlock_t rb_reqslock; /* protect rb_allreqs */
+ struct list_head rb_allreqs;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -411,6 +418,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
/*
* Buffer calls - xprtrdma/verbs.c
*/
+struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
+struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
+void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
@@ -427,6 +437,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
struct rpcrdma_regbuf *);
unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
int frwr_alloc_recovery_wq(void);
void frwr_destroy_recovery_wq(void);
@@ -494,6 +505,15 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
int xprt_rdma_init(void);
void xprt_rdma_cleanup(void);
+/* Backchannel calls - xprtrdma/backchannel.c
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
+int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
+void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/* Temporary NFS request map cache. Created in svc_rdma.c */
extern struct kmem_cache *svc_rdma_map_cachep;
/* WR context cache. Created in svc_rdma.c */
Pre-allocate extra send and receive Work Requests needed to handle
backchannel receives and sends.
The transport doesn't know how many extra WRs to pre-allocate until
the xprt_setup_backchannel() call, but that's long after the WRs are
allocated during forechannel setup.
So, use a fixed value for now.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 4 ++++
net/sunrpc/xprtrdma/verbs.c | 14 ++++++++++++--
net/sunrpc/xprtrdma/xprt_rdma.h | 10 ++++++++++
3 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 3d01b32..3165ed6 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -125,6 +125,9 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
* Twice as many rpc_rqsts are prepared to ensure there is
* always an rpc_rqst available as soon as a reply is sent.
*/
+ if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
+ goto out_err;
+
for (i = 0; i < (reqs << 1); i++) {
rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
if (!rqst) {
@@ -161,6 +164,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
out_free:
xprt_rdma_bc_destroy(xprt, reqs);
+out_err:
pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
return -ENOMEM;
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index d920367..d031856 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -570,6 +570,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct ib_device_attr *devattr = &ia->ri_devattr;
struct ib_cq *sendcq, *recvcq;
struct ib_cq_init_attr cq_attr = {};
+ unsigned int max_qp_wr;
int rc, err;
if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -578,18 +579,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
return -ENOMEM;
}
+ if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
+ dprintk("RPC: %s: insufficient wqe's available\n",
+ __func__);
+ return -ENOMEM;
+ }
+ max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
+
/* check provider's send/recv wr limits */
- if (cdata->max_requests > devattr->max_qp_wr)
- cdata->max_requests = devattr->max_qp_wr;
+ if (cdata->max_requests > max_qp_wr)
+ cdata->max_requests = max_qp_wr;
ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
ep->rep_attr.qp_context = ep;
ep->rep_attr.srq = NULL;
ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
rc = ia->ri_ops->ro_open(ia, ep, cdata);
if (rc)
return rc;
ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 1eb86c79f..55d2660 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -101,6 +101,16 @@ struct rpcrdma_ep {
*/
#define RPCRDMA_IGNORE_COMPLETION (0ULL)
+/* Pre-allocate extra Work Requests for handling backward receives
+ * and sends. This is a fixed value because the Work Queues are
+ * allocated when the forward channel is set up.
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+#define RPCRDMA_BACKWARD_WRS (8)
+#else
+#define RPCRDMA_BACKWARD_WRS (0)
+#endif
+
/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
*
* The below structure appears at the front of a large region of kmalloc'd
Backward direction RPC replies are sent via the client transport's
send_request method, the same way forward direction RPC calls are
sent.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 45 +++++++++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/rpc_rdma.c | 5 ++++
net/sunrpc/xprtrdma/xprt_rdma.h | 1 +
3 files changed, 51 insertions(+)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 3165ed6..ffc4853 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -170,6 +170,51 @@ out_err:
}
/**
+ * rpcrdma_bc_marshal_reply - Send backwards direction reply
+ * @rqst: buffer containing RPC reply data
+ *
+ * Returns zero on success.
+ */
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ struct rpcrdma_msg *headerp;
+ size_t rpclen;
+
+ headerp = rdmab_to_msg(req->rl_rdmabuf);
+ headerp->rm_xid = rqst->rq_xid;
+ headerp->rm_vers = rpcrdma_version;
+ headerp->rm_credit =
+ cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
+ headerp->rm_type = rdma_msg;
+ headerp->rm_body.rm_chunks[0] = xdr_zero;
+ headerp->rm_body.rm_chunks[1] = xdr_zero;
+ headerp->rm_body.rm_chunks[2] = xdr_zero;
+
+ rpclen = rqst->rq_svec[0].iov_len;
+
+ pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
+ __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
+ pr_info("RPC: %s: RPC/RDMA: %*ph\n",
+ __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
+ pr_info("RPC: %s: RPC: %*ph\n",
+ __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
+
+ req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
+ req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
+ req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
+
+ req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
+ req->rl_send_iov[1].length = rpclen;
+ req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
+
+ req->rl_niovs = 2;
+ return 0;
+}
+
+/**
* xprt_rdma_bc_destroy - Release resources for handling backchannel requests
* @xprt: transport associated with these backchannel resources
* @reqs: number of incoming requests to destroy; ignored
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 95774fc..b7a21e5 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
+ return rpcrdma_bc_marshal_reply(rqst);
+#endif
+
/*
* rpclen gets amount of data in first buffer, which is the
* pre-registered buffer.
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 55d2660..e2d23ea 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -520,6 +520,7 @@ void xprt_rdma_cleanup(void);
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
Introduce a code path in the rpcrdma_reply_handler() to catch
incoming backward direction RPC calls and route them to the ULP's
backchannel server.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 118 +++++++++++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/rpc_rdma.c | 41 +++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +
3 files changed, 161 insertions(+)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index ffc4853..0b3387f 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -5,6 +5,8 @@
*/
#include <linux/module.h>
+#include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/svc.h>
#include "xprt_rdma.h"
@@ -12,6 +14,8 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
+#define RPCRDMA_BACKCHANNEL_DEBUG
+
static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
@@ -253,3 +257,117 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
spin_unlock_bh(&xprt->bc_pa_lock);
}
+
+/**
+ * rpcrdma_bc_receive_call - Handle a backward direction call
+ * @xprt: transport receiving the call
+ * @rep: receive buffer containing the call
+ *
+ * Called in the RPC reply handler, which runs in a tasklet.
+ * Be quick about it.
+ *
+ * Operational assumptions:
+ * o Backchannel credits are ignored, just as the NFS server
+ * forechannel currently does
+ * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
+ * No replay detection is done at the transport level
+ */
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_rep *rep)
+{
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct rpcrdma_msg *headerp;
+ struct svc_serv *bc_serv;
+ struct rpcrdma_req *req;
+ struct rpc_rqst *rqst;
+ struct xdr_buf *buf;
+ size_t size;
+ __be32 *p;
+
+ headerp = rdmab_to_msg(rep->rr_rdmabuf);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+ pr_info("RPC: %s: callback XID %08x, length=%u\n",
+ __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
+ pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp);
+#endif
+
+ /* Sanity check:
+ * Need at least enough bytes for RPC/RDMA header, as code
+ * here references the header fields by array offset. Also,
+ * backward calls are always inline, so ensure there
+ * are some bytes beyond the RPC/RDMA header.
+ */
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
+ goto out_short;
+ p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
+ size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
+
+ /* Grab a free bc rqst */
+ spin_lock(&xprt->bc_pa_lock);
+ if (list_empty(&xprt->bc_pa_list)) {
+ spin_unlock(&xprt->bc_pa_lock);
+ goto out_overflow;
+ }
+ rqst = list_first_entry(&xprt->bc_pa_list,
+ struct rpc_rqst, rq_bc_pa_list);
+ list_del(&rqst->rq_bc_pa_list);
+ spin_unlock(&xprt->bc_pa_lock);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+ pr_info("RPC: %s: using rqst %p\n", __func__, rqst);
+#endif
+
+ /* Prepare rqst */
+ rqst->rq_reply_bytes_recvd = 0;
+ rqst->rq_bytes_sent = 0;
+ rqst->rq_xid = headerp->rm_xid;
+ set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+
+ buf = &rqst->rq_rcv_buf;
+ memset(buf, 0, sizeof(*buf));
+ buf->head[0].iov_base = p;
+ buf->head[0].iov_len = size;
+ buf->len = size;
+
+ /* The receive buffer has to be hooked to the rpcrdma_req
+ * so that it can be reposted after the server is done
+ * parsing it but just before sending the backward
+ * direction reply.
+ */
+ req = rpcr_to_rdmar(rqst);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+ pr_info("RPC: %s: attaching rep %p to req %p\n",
+ __func__, rep, req);
+#endif
+ req->rl_reply = rep;
+
+ /* Defeat the retransmit detection logic in send_request */
+ req->rl_connect_cookie = 0;
+
+ /* Queue rqst for ULP's callback service */
+ bc_serv = xprt->bc_serv;
+ spin_lock(&bc_serv->sv_cb_lock);
+ list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
+ spin_unlock(&bc_serv->sv_cb_lock);
+
+ wake_up(&bc_serv->sv_cb_waitq);
+
+ r_xprt->rx_stats.bcall_count++;
+ return;
+
+out_overflow:
+ pr_warn("RPC/RDMA backchannel overflow\n");
+ xprt_disconnect_done(xprt);
+ /* This receive buffer gets reposted automatically
+ * when the connection is re-established.
+ */
+ return;
+
+out_short:
+ pr_warn("RPC/RDMA short backward direction call\n");
+
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ xprt_disconnect_done(xprt);
+ else
+ pr_warn("RPC: %s: reposting rep %p\n",
+ __func__, rep);
+}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index b7a21e5..c10d969 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -716,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
spin_unlock_bh(&xprt->transport_lock);
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+/* By convention, backchannel calls arrive via rdma_msg type
+ * messages, and never populate the chunk lists. This makes
+ * the RPC/RDMA header small and fixed in size, so it is
+ * straightforward to check the RPC header's direction field.
+ */
+static bool
+rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
+{
+ __be32 *p = (__be32 *)headerp;
+
+ if (headerp->rm_type != rdma_msg)
+ return false;
+ if (headerp->rm_body.rm_chunks[0] != xdr_zero)
+ return false;
+ if (headerp->rm_body.rm_chunks[1] != xdr_zero)
+ return false;
+ if (headerp->rm_body.rm_chunks[2] != xdr_zero)
+ return false;
+
+ /* sanity */
+ if (p[7] != headerp->rm_xid)
+ return false;
+ /* call direction */
+ if (p[8] != cpu_to_be32(RPC_CALL))
+ return false;
+
+ return true;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/*
* This function is called when an async event is posted to
* the connection which changes the connection state. All it
@@ -756,6 +787,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
headerp = rdmab_to_msg(rep->rr_rdmabuf);
if (headerp->rm_vers != rpcrdma_version)
goto out_badversion;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (rpcrdma_is_bcall(headerp))
+ goto out_bcall;
+#endif
/* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks.
@@ -878,6 +913,12 @@ out_badstatus:
}
return;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+out_bcall:
+ rpcrdma_bc_receive_call(r_xprt, rep);
+ return;
+#endif
+
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index e2d23ea..eb87d96e8 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -353,6 +353,7 @@ struct rpcrdma_stats {
unsigned long failed_marshal_count;
unsigned long bad_reply_count;
unsigned long nomsg_call_count;
+ unsigned long bcall_count;
};
/*
@@ -520,6 +521,7 @@ void xprt_rdma_cleanup(void);
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
On NFSv4.1 mount points, the Linux NFS client uses this transport
endpoint to receive backward direction calls and route replies back
to the NFSv4.1 server.
Signed-off-by: Chuck Lever <[email protected]>
Acked-by: "J. Bruce Fields" <[email protected]>
---
include/linux/sunrpc/svc_rdma.h | 6 +++
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/xprtrdma/svc_rdma.c | 6 +++
net/sunrpc/xprtrdma/svc_rdma_transport.c | 58 ++++++++++++++++++++++++++++++
4 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 7ccc961..fb4013e 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -228,9 +228,13 @@ extern void svc_rdma_put_frmr(struct svcxprt_rdma *,
struct svc_rdma_fastreg_mr *);
extern void svc_sq_reap(struct svcxprt_rdma *);
extern void svc_rq_reap(struct svcxprt_rdma *);
-extern struct svc_xprt_class svc_rdma_class;
extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
+extern struct svc_xprt_class svc_rdma_class;
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+extern struct svc_xprt_class svc_rdma_bc_class;
+#endif
+
/* svc_rdma.c */
extern int svc_rdma_init(void);
extern void svc_rdma_cleanup(void);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 3f79c4a..82c0839 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -158,6 +158,7 @@ enum xprt_transports {
XPRT_TRANSPORT_TCP = IPPROTO_TCP,
XPRT_TRANSPORT_BC_TCP = IPPROTO_TCP | XPRT_TRANSPORT_BC,
XPRT_TRANSPORT_RDMA = 256,
+ XPRT_TRANSPORT_BC_RDMA = XPRT_TRANSPORT_RDMA | XPRT_TRANSPORT_BC,
XPRT_TRANSPORT_LOCAL = 257,
};
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 2cd252f..1b7051b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -239,6 +239,9 @@ void svc_rdma_cleanup(void)
unregister_sysctl_table(svcrdma_table_header);
svcrdma_table_header = NULL;
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ svc_unreg_xprt_class(&svc_rdma_bc_class);
+#endif
svc_unreg_xprt_class(&svc_rdma_class);
kmem_cache_destroy(svc_rdma_map_cachep);
kmem_cache_destroy(svc_rdma_ctxt_cachep);
@@ -286,6 +289,9 @@ int svc_rdma_init(void)
/* Register RDMA with the SVC transport switch */
svc_reg_xprt_class(&svc_rdma_class);
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ svc_reg_xprt_class(&svc_rdma_bc_class);
+#endif
return 0;
err1:
kmem_cache_destroy(svc_rdma_map_cachep);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index fcc3eb8..a133b1e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -56,6 +56,7 @@
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
struct net *net,
struct sockaddr *sa, int salen,
@@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = {
.xcl_ident = XPRT_TRANSPORT_RDMA,
};
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
+ struct sockaddr *, int, int);
+static void svc_rdma_bc_detach(struct svc_xprt *);
+static void svc_rdma_bc_free(struct svc_xprt *);
+
+static struct svc_xprt_ops svc_rdma_bc_ops = {
+ .xpo_create = svc_rdma_bc_create,
+ .xpo_detach = svc_rdma_bc_detach,
+ .xpo_free = svc_rdma_bc_free,
+ .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+ .xpo_secure_port = svc_rdma_secure_port,
+};
+
+struct svc_xprt_class svc_rdma_bc_class = {
+ .xcl_name = "rdma-bc",
+ .xcl_owner = THIS_MODULE,
+ .xcl_ops = &svc_rdma_bc_ops,
+ .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
+};
+
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
+ struct net *net,
+ struct sockaddr *sa, int salen,
+ int flags)
+{
+ struct svcxprt_rdma *cma_xprt;
+ struct svc_xprt *xprt;
+
+ cma_xprt = rdma_create_xprt(serv, 0);
+ if (!cma_xprt)
+ return ERR_PTR(-ENOMEM);
+ xprt = &cma_xprt->sc_xprt;
+
+ svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
+ serv->sv_bc_xprt = xprt;
+
+ dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+ return xprt;
+}
+
+static void svc_rdma_bc_detach(struct svc_xprt *xprt)
+{
+ dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+}
+
+static void svc_rdma_bc_free(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+ dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+ if (xprt)
+ kfree(rdma);
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
{
struct svc_rdma_op_ctxt *ctxt;
Allow the use of other transport classes when handling a backward
direction RPC call.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/svc.c | 5 -----
1 file changed, 5 deletions(-)
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a8f579d..bc5b7b5 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1367,11 +1367,6 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
/* reset result send buffer "put" position */
resv->iov_len = 0;
- if (rqstp->rq_prot != IPPROTO_TCP) {
- printk(KERN_ERR "No support for Non-TCP transports!\n");
- BUG();
- }
-
/*
* Skip the next two words because they've already been
* processed in the transport
Pass the correct backchannel transport class to svc_create_xprt()
when setting up an NFSv4.1 backchannel transport.
Signed-off-by: Chuck Lever <[email protected]>
---
fs/nfs/callback.c | 33 +++++++++++++++++++++------------
include/linux/sunrpc/xprt.h | 1 +
net/sunrpc/xprtrdma/transport.c | 1 +
net/sunrpc/xprtsock.c | 1 +
4 files changed, 24 insertions(+), 12 deletions(-)
diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
index 75f7c0a..46ed2c5 100644
--- a/fs/nfs/callback.c
+++ b/fs/nfs/callback.c
@@ -99,15 +99,22 @@ nfs4_callback_up(struct svc_serv *serv)
}
#if defined(CONFIG_NFS_V4_1)
-static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
+/*
+ * Create an svc_sock for the back channel service that shares the
+ * fore channel connection.
+ * Returns the input port (0) and sets the svc_serv bc_xprt on success
+ */
+static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net,
+ struct rpc_xprt *xprt)
{
- /*
- * Create an svc_sock for the back channel service that shares the
- * fore channel connection.
- * Returns the input port (0) and sets the svc_serv bc_xprt on success
- */
- return svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
- SVC_SOCK_ANONYMOUS);
+ int ret = -EPROTONOSUPPORT;
+
+ if (xprt->bc_name)
+ ret = svc_create_xprt(serv, xprt->bc_name, net, PF_INET, 0,
+ SVC_SOCK_ANONYMOUS);
+ dprintk("NFS: svc_create_xprt(%s) returned %d\n",
+ xprt->bc_name, ret);
+ return ret;
}
/*
@@ -184,7 +191,8 @@ static inline void nfs_callback_bc_serv(u32 minorversion, struct rpc_xprt *xprt,
xprt->bc_serv = serv;
}
#else
-static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
+static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net,
+ struct rpc_xprt *xprt)
{
return 0;
}
@@ -259,7 +267,8 @@ static void nfs_callback_down_net(u32 minorversion, struct svc_serv *serv, struc
svc_shutdown_net(serv, net);
}
-static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct net *net)
+static int nfs_callback_up_net(int minorversion, struct svc_serv *serv,
+ struct net *net, struct rpc_xprt *xprt)
{
struct nfs_net *nn = net_generic(net, nfs_net_id);
int ret;
@@ -281,7 +290,7 @@ static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct n
break;
case 1:
case 2:
- ret = nfs41_callback_up_net(serv, net);
+ ret = nfs41_callback_up_net(serv, net, xprt);
break;
default:
printk(KERN_ERR "NFS: unknown callback version: %d\n",
@@ -364,7 +373,7 @@ int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt)
goto err_create;
}
- ret = nfs_callback_up_net(minorversion, serv, net);
+ ret = nfs_callback_up_net(minorversion, serv, net, xprt);
if (ret < 0)
goto err_net;
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 82c0839..88bae18 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -170,6 +170,7 @@ struct rpc_xprt {
struct sockaddr_storage addr; /* server address */
size_t addrlen; /* size of server address */
int prot; /* IP protocol */
+ char *bc_name; /* backchannel transport */
unsigned long cong; /* current congestion */
unsigned long cwnd; /* congestion window */
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 845278e..c60bbc8 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -337,6 +337,7 @@ xprt_setup_rdma(struct xprt_create *args)
/* Ensure xprt->addr holds valid server TCP (not RDMA)
* address, for any side protocols which peek at it */
xprt->prot = IPPROTO_TCP;
+ xprt->bc_name = "rdma-bc";
xprt->addrlen = args->addrlen;
memcpy(&xprt->addr, sap, xprt->addrlen);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 3084163..d82d00b 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2858,6 +2858,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
transport = container_of(xprt, struct sock_xprt, xprt);
xprt->prot = IPPROTO_TCP;
+ xprt->bc_name = "tcp-bc";
xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
Looks good!
Reviewed-By: Devesh Sharma <[email protected]>
On Tue, Oct 6, 2015 at 8:28 PM, Chuck Lever <[email protected]> wrote:
> Commit 8301a2c047cc ("xprtrdma: Limit work done by completion
> handler") was supposed to prevent xprtrdma's upcall handlers from
> starving other softIRQ work by letting them return to the provider
> before all CQEs have been polled.
>
> The logic assumes the provider will call the upcall handler again
> immediately if the CQ is re-armed while there are still queued CQEs.
>
> This assumption is invalid. The IBTA spec says that after a CQ is
> armed, the hardware must interrupt only when a new CQE is inserted.
> xprtrdma can't rely on the provider calling again, even though some
> providers do.
>
> Therefore, leaving CQEs on queue makes sense only when there is
> another mechanism that ensures all remaining CQEs are consumed in a
> timely fashion. xprtrdma does not have such a mechanism. If a CQE
> remains queued, the transport can wait forever to send the next RPC.
>
> Finally, move the wcs array back onto the stack to ensure that the
> poll array is always local to the CPU where the completion upcall is
> running.
>
> Fixes: 8301a2c047cc ("xprtrdma: Limit work done by completion ...")
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/verbs.c | 74 ++++++++++++++++++++-------------------
> net/sunrpc/xprtrdma/xprt_rdma.h | 5 ---
> 2 files changed, 38 insertions(+), 41 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index c713909..e9599e9 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -158,25 +158,30 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
> }
> }
>
> -static int
> -rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
> +/* The common case is a single send completion is waiting. By
> + * passing two WC entries to ib_poll_cq, a return code of 1
> + * means there is exactly one WC waiting and no more. We don't
> + * have to invoke ib_poll_cq again to know that the CQ has been
> + * properly drained.
> + */
> +static void
> +rpcrdma_sendcq_poll(struct ib_cq *cq)
> {
> - struct ib_wc *wcs;
> - int budget, count, rc;
> + struct ib_wc *pos, wcs[2];
> + int count, rc;
>
> - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
> do {
> - wcs = ep->rep_send_wcs;
> + pos = wcs;
>
> - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
> - if (rc <= 0)
> - return rc;
> + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
> + if (rc < 0)
> + break;
>
> count = rc;
> while (count-- > 0)
> - rpcrdma_sendcq_process_wc(wcs++);
> - } while (rc == RPCRDMA_POLLSIZE && --budget);
> - return 0;
> + rpcrdma_sendcq_process_wc(pos++);
> + } while (rc == ARRAY_SIZE(wcs));
> + return;
> }
>
> /* Handle provider send completion upcalls.
> @@ -184,10 +189,8 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
> static void
> rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
> {
> - struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
> -
> do {
> - rpcrdma_sendcq_poll(cq, ep);
> + rpcrdma_sendcq_poll(cq);
> } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
> IB_CQ_REPORT_MISSED_EVENTS) > 0);
> }
> @@ -226,31 +229,32 @@ out_fail:
> goto out_schedule;
> }
>
> -static int
> -rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
> +/* The wc array is on stack: automatic memory is always CPU-local.
> + *
> + * struct ib_wc is 64 bytes, making the poll array potentially
> + * large. But this is at the bottom of the call chain. Further
> + * substantial work is done in another thread.
> + */
> +static void
> +rpcrdma_recvcq_poll(struct ib_cq *cq)
> {
> - struct list_head sched_list;
> - struct ib_wc *wcs;
> - int budget, count, rc;
> + struct ib_wc *pos, wcs[4];
> + LIST_HEAD(sched_list);
> + int count, rc;
>
> - INIT_LIST_HEAD(&sched_list);
> - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
> do {
> - wcs = ep->rep_recv_wcs;
> + pos = wcs;
>
> - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
> - if (rc <= 0)
> - goto out_schedule;
> + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
> + if (rc < 0)
> + break;
>
> count = rc;
> while (count-- > 0)
> - rpcrdma_recvcq_process_wc(wcs++, &sched_list);
> - } while (rc == RPCRDMA_POLLSIZE && --budget);
> - rc = 0;
> + rpcrdma_recvcq_process_wc(pos++, &sched_list);
> + } while (rc == ARRAY_SIZE(wcs));
>
> -out_schedule:
> rpcrdma_schedule_tasklet(&sched_list);
> - return rc;
> }
>
> /* Handle provider receive completion upcalls.
> @@ -258,10 +262,8 @@ out_schedule:
> static void
> rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
> {
> - struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
> -
> do {
> - rpcrdma_recvcq_poll(cq, ep);
> + rpcrdma_recvcq_poll(cq);
> } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
> IB_CQ_REPORT_MISSED_EVENTS) > 0);
> }
> @@ -625,7 +627,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
>
> cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
> sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
> - rpcrdma_cq_async_error_upcall, ep, &cq_attr);
> + rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
> if (IS_ERR(sendcq)) {
> rc = PTR_ERR(sendcq);
> dprintk("RPC: %s: failed to create send CQ: %i\n",
> @@ -642,7 +644,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
>
> cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
> recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
> - rpcrdma_cq_async_error_upcall, ep, &cq_attr);
> + rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
> if (IS_ERR(recvcq)) {
> rc = PTR_ERR(recvcq);
> dprintk("RPC: %s: failed to create recv CQ: %i\n",
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index c09414e..42c8d44 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -77,9 +77,6 @@ struct rpcrdma_ia {
> * RDMA Endpoint -- one per transport instance
> */
>
> -#define RPCRDMA_WC_BUDGET (128)
> -#define RPCRDMA_POLLSIZE (16)
> -
> struct rpcrdma_ep {
> atomic_t rep_cqcount;
> int rep_cqinit;
> @@ -89,8 +86,6 @@ struct rpcrdma_ep {
> struct rdma_conn_param rep_remote_cma;
> struct sockaddr_storage rep_remote_addr;
> struct delayed_work rep_connect_worker;
> - struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
> - struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
> };
>
> /*
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
Looks good,
Reviewed-By: Devesh Sharma <[email protected]>
Will send a test-report of this series with ocrdma drivers.
On Tue, Oct 6, 2015 at 8:28 PM, Chuck Lever <[email protected]> wrote:
> ib_req_notify_cq(IB_CQ_REPORT_MISSED_EVENTS) returns a positive
> value if WCs were added to a CQ after the last completion upcall
> but before the CQ has been re-armed.
>
> Commit 7f23f6f6e388 ("xprtrmda: Reduce lock contention in
> completion handlers") assumed that when ib_req_notify_cq() returned
> a positive RC, the CQ had also been successfully re-armed, making
> it safe to return control to the provider without losing any
> completion signals. That is an invalid assumption.
>
> Change both completion handlers to continue polling while
> ib_req_notify_cq() returns a positive value.
>
> Fixes: 7f23f6f6e388 ("xprtrmda: Reduce lock contention in ...")
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/verbs.c | 66 +++++++------------------------------------
> 1 file changed, 10 insertions(+), 56 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 8a477e2..c713909 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -179,38 +179,17 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
> return 0;
> }
>
> -/*
> - * Handle send, fast_reg_mr, and local_inv completions.
> - *
> - * Send events are typically suppressed and thus do not result
> - * in an upcall. Occasionally one is signaled, however. This
> - * prevents the provider's completion queue from wrapping and
> - * losing a completion.
> +/* Handle provider send completion upcalls.
> */
> static void
> rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
> {
> struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
> - int rc;
> -
> - rc = rpcrdma_sendcq_poll(cq, ep);
> - if (rc) {
> - dprintk("RPC: %s: ib_poll_cq failed: %i\n",
> - __func__, rc);
> - return;
> - }
>
> - rc = ib_req_notify_cq(cq,
> - IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
> - if (rc == 0)
> - return;
> - if (rc < 0) {
> - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
> - __func__, rc);
> - return;
> - }
> -
> - rpcrdma_sendcq_poll(cq, ep);
> + do {
> + rpcrdma_sendcq_poll(cq, ep);
> + } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
> + IB_CQ_REPORT_MISSED_EVENTS) > 0);
> }
>
> static void
> @@ -274,42 +253,17 @@ out_schedule:
> return rc;
> }
>
> -/*
> - * Handle receive completions.
> - *
> - * It is reentrant but processes single events in order to maintain
> - * ordering of receives to keep server credits.
> - *
> - * It is the responsibility of the scheduled tasklet to return
> - * recv buffers to the pool. NOTE: this affects synchronization of
> - * connection shutdown. That is, the structures required for
> - * the completion of the reply handler must remain intact until
> - * all memory has been reclaimed.
> +/* Handle provider receive completion upcalls.
> */
> static void
> rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
> {
> struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
> - int rc;
> -
> - rc = rpcrdma_recvcq_poll(cq, ep);
> - if (rc) {
> - dprintk("RPC: %s: ib_poll_cq failed: %i\n",
> - __func__, rc);
> - return;
> - }
>
> - rc = ib_req_notify_cq(cq,
> - IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
> - if (rc == 0)
> - return;
> - if (rc < 0) {
> - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
> - __func__, rc);
> - return;
> - }
> -
> - rpcrdma_recvcq_poll(cq, ep);
> + do {
> + rpcrdma_recvcq_poll(cq, ep);
> + } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
> + IB_CQ_REPORT_MISSED_EVENTS) > 0);
> }
>
> static void
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
Looks Good,
Reviewed-By: Devesh Sharma <[email protected]>
On Tue, Oct 6, 2015 at 8:29 PM, Chuck Lever <[email protected]> wrote:
> Clean up: The error cases in rpcrdma_reply_handler() almost never
> execute. Ensure the compiler places them out of the hot path.
>
> No behavior change expected.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/rpc_rdma.c | 89 ++++++++++++++++++++++-----------------
> net/sunrpc/xprtrdma/verbs.c | 2 -
> net/sunrpc/xprtrdma/xprt_rdma.h | 2 +
> 3 files changed, 53 insertions(+), 40 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index bc8bd65..60ffa63 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -741,52 +741,27 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> unsigned long cwnd;
> u32 credits;
>
> - /* Check status. If bad, signal disconnect and return rep to pool */
> - if (rep->rr_len == ~0U) {
> - rpcrdma_recv_buffer_put(rep);
> - if (r_xprt->rx_ep.rep_connected == 1) {
> - r_xprt->rx_ep.rep_connected = -EIO;
> - rpcrdma_conn_func(&r_xprt->rx_ep);
> - }
> - return;
> - }
> - if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
> - dprintk("RPC: %s: short/invalid reply\n", __func__);
> - goto repost;
> - }
> + dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
> +
> + if (rep->rr_len == RPCRDMA_BAD_LEN)
> + goto out_badstatus;
> + if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
> + goto out_shortreply;
> +
> headerp = rdmab_to_msg(rep->rr_rdmabuf);
> - if (headerp->rm_vers != rpcrdma_version) {
> - dprintk("RPC: %s: invalid version %d\n",
> - __func__, be32_to_cpu(headerp->rm_vers));
> - goto repost;
> - }
> + if (headerp->rm_vers != rpcrdma_version)
> + goto out_badversion;
>
> /* Get XID and try for a match. */
> spin_lock(&xprt->transport_lock);
> rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
> - if (rqst == NULL) {
> - spin_unlock(&xprt->transport_lock);
> - dprintk("RPC: %s: reply 0x%p failed "
> - "to match any request xid 0x%08x len %d\n",
> - __func__, rep, be32_to_cpu(headerp->rm_xid),
> - rep->rr_len);
> -repost:
> - r_xprt->rx_stats.bad_reply_count++;
> - if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
> - rpcrdma_recv_buffer_put(rep);
> -
> - return;
> - }
> + if (!rqst)
> + goto out_nomatch;
>
> /* get request object */
> req = rpcr_to_rdmar(rqst);
> - if (req->rl_reply) {
> - spin_unlock(&xprt->transport_lock);
> - dprintk("RPC: %s: duplicate reply 0x%p to RPC "
> - "request 0x%p: xid 0x%08x\n", __func__, rep, req,
> - be32_to_cpu(headerp->rm_xid));
> - goto repost;
> - }
> + if (req->rl_reply)
> + goto out_duplicate;
>
> dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
> " RPC request 0x%p xid 0x%08x\n",
> @@ -883,8 +858,44 @@ badheader:
> if (xprt->cwnd > cwnd)
> xprt_release_rqst_cong(rqst->rq_task);
>
> + xprt_complete_rqst(rqst->rq_task, status);
> + spin_unlock(&xprt->transport_lock);
> dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
> __func__, xprt, rqst, status);
> - xprt_complete_rqst(rqst->rq_task, status);
> + return;
> +
> +out_badstatus:
> + rpcrdma_recv_buffer_put(rep);
> + if (r_xprt->rx_ep.rep_connected == 1) {
> + r_xprt->rx_ep.rep_connected = -EIO;
> + rpcrdma_conn_func(&r_xprt->rx_ep);
> + }
> + return;
> +
> +out_shortreply:
> + dprintk("RPC: %s: short/invalid reply\n", __func__);
> + goto repost;
> +
> +out_badversion:
> + dprintk("RPC: %s: invalid version %d\n",
> + __func__, be32_to_cpu(headerp->rm_vers));
> + goto repost;
> +
> +out_nomatch:
> + spin_unlock(&xprt->transport_lock);
> + dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
> + __func__, be32_to_cpu(headerp->rm_xid),
> + rep->rr_len);
> + goto repost;
> +
> +out_duplicate:
> spin_unlock(&xprt->transport_lock);
> + dprintk("RPC: %s: "
> + "duplicate reply %p to RPC request %p: xid 0x%08x\n",
> + __func__, rep, req, be32_to_cpu(headerp->rm_xid));
> +
> +repost:
> + r_xprt->rx_stats.bad_reply_count++;
> + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
> + rpcrdma_recv_buffer_put(rep);
> }
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index e9599e9..0076129 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -225,7 +225,7 @@ out_fail:
> if (wc->status != IB_WC_WR_FLUSH_ERR)
> pr_err("RPC: %s: rep %p: %s\n",
> __func__, rep, ib_wc_status_msg(wc->status));
> - rep->rr_len = ~0U;
> + rep->rr_len = RPCRDMA_BAD_LEN;
> goto out_schedule;
> }
>
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index 42c8d44..a13508b 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -168,6 +168,8 @@ struct rpcrdma_rep {
> struct rpcrdma_regbuf *rr_rdmabuf;
> };
>
> +#define RPCRDMA_BAD_LEN (~0U)
> +
> /*
> * struct rpcrdma_mw - external memory region metadata
> *
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
looks good,
Reviewed-By: Devesh Sharma <[email protected]>
On Tue, Oct 6, 2015 at 8:29 PM, Chuck Lever <[email protected]> wrote:
> The rb_send_bufs and rb_recv_bufs arrays are used to implement a
> pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep
> structs. Replace those arrays with free lists.
>
> To allow more than 512 RPCs in-flight at once, each of these arrays
> would be larger than a page (assuming 8-byte addresses and 4KB
> pages). Allowing up to 64K in-flight RPCs (as TCP now does), each
> buffer array would have to be 128 pages. That's an order-6
> allocation. (Not that we're going there.)
>
> A list is easier to expand dynamically. Instead of allocating a
> larger array of pointers and copying the existing pointers to the
> new array, simply append more buffers to each list.
>
> This also makes it simpler to manage receive buffers that might
> catch backwards-direction calls, or to post receive buffers in
> bulk to amortize the overhead of ib_post_recv.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/verbs.c | 155 +++++++++++++++++----------------------
> net/sunrpc/xprtrdma/xprt_rdma.h | 9 +-
> 2 files changed, 73 insertions(+), 91 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 0076129..ab26392 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -928,44 +928,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> {
> struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
> struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
> - char *p;
> - size_t len;
> int i, rc;
>
> - buf->rb_max_requests = cdata->max_requests;
> + buf->rb_max_requests = r_xprt->rx_data.max_requests;
> spin_lock_init(&buf->rb_lock);
>
> - /* Need to allocate:
> - * 1. arrays for send and recv pointers
> - * 2. arrays of struct rpcrdma_req to fill in pointers
> - * 3. array of struct rpcrdma_rep for replies
> - * Send/recv buffers in req/rep need to be registered
> - */
> - len = buf->rb_max_requests *
> - (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
> -
> - p = kzalloc(len, GFP_KERNEL);
> - if (p == NULL) {
> - dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
> - __func__, len);
> - rc = -ENOMEM;
> - goto out;
> - }
> - buf->rb_pool = p; /* for freeing it later */
> -
> - buf->rb_send_bufs = (struct rpcrdma_req **) p;
> - p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
> - buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
> - p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
> -
> rc = ia->ri_ops->ro_init(r_xprt);
> if (rc)
> goto out;
>
> + INIT_LIST_HEAD(&buf->rb_send_bufs);
> for (i = 0; i < buf->rb_max_requests; i++) {
> struct rpcrdma_req *req;
> - struct rpcrdma_rep *rep;
>
> req = rpcrdma_create_req(r_xprt);
> if (IS_ERR(req)) {
> @@ -974,7 +948,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> rc = PTR_ERR(req);
> goto out;
> }
> - buf->rb_send_bufs[i] = req;
> + list_add(&req->rl_free, &buf->rb_send_bufs);
> + }
> +
> + INIT_LIST_HEAD(&buf->rb_recv_bufs);
> + for (i = 0; i < buf->rb_max_requests + 2; i++) {
> + struct rpcrdma_rep *rep;
>
> rep = rpcrdma_create_rep(r_xprt);
> if (IS_ERR(rep)) {
> @@ -983,7 +962,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
> rc = PTR_ERR(rep);
> goto out;
> }
> - buf->rb_recv_bufs[i] = rep;
> + list_add(&rep->rr_list, &buf->rb_recv_bufs);
> }
>
> return 0;
> @@ -992,6 +971,28 @@ out:
> return rc;
> }
>
> +static struct rpcrdma_req *
> +rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
> +{
> + struct rpcrdma_req *req;
> +
> + req = list_first_entry(&buf->rb_send_bufs,
> + struct rpcrdma_req, rl_free);
> + list_del(&req->rl_free);
> + return req;
> +}
> +
> +static struct rpcrdma_rep *
> +rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
> +{
> + struct rpcrdma_rep *rep;
> +
> + rep = list_first_entry(&buf->rb_recv_bufs,
> + struct rpcrdma_rep, rr_list);
> + list_del(&rep->rr_list);
> + return rep;
> +}
> +
> static void
> rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
> {
> @@ -1017,25 +1018,22 @@ void
> rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
> {
> struct rpcrdma_ia *ia = rdmab_to_ia(buf);
> - int i;
>
> - /* clean up in reverse order from create
> - * 1. recv mr memory (mr free, then kfree)
> - * 2. send mr memory (mr free, then kfree)
> - * 3. MWs
> - */
> - dprintk("RPC: %s: entering\n", __func__);
> + while (!list_empty(&buf->rb_recv_bufs)) {
> + struct rpcrdma_rep *rep;
>
> - for (i = 0; i < buf->rb_max_requests; i++) {
> - if (buf->rb_recv_bufs)
> - rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
> - if (buf->rb_send_bufs)
> - rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
> + rep = rpcrdma_buffer_get_rep_locked(buf);
> + rpcrdma_destroy_rep(ia, rep);
> }
>
> - ia->ri_ops->ro_destroy(buf);
> + while (!list_empty(&buf->rb_send_bufs)) {
> + struct rpcrdma_req *req;
>
> - kfree(buf->rb_pool);
> + req = rpcrdma_buffer_get_req_locked(buf);
> + rpcrdma_destroy_req(ia, req);
> + }
> +
> + ia->ri_ops->ro_destroy(buf);
> }
>
> struct rpcrdma_mw *
> @@ -1067,25 +1065,10 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
> spin_unlock(&buf->rb_mwlock);
> }
>
> -static void
> -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
> -{
> - buf->rb_send_bufs[--buf->rb_send_index] = req;
> - req->rl_niovs = 0;
> - if (req->rl_reply) {
> - buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
> - req->rl_reply = NULL;
> - }
> -}
> -
> /*
> * Get a set of request/reply buffers.
> *
> - * Reply buffer (if needed) is attached to send buffer upon return.
> - * Rule:
> - * rb_send_index and rb_recv_index MUST always be pointing to the
> - * *next* available buffer (non-NULL). They are incremented after
> - * removing buffers, and decremented *before* returning them.
> + * Reply buffer (if available) is attached to send buffer upon return.
> */
> struct rpcrdma_req *
> rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
> @@ -1094,26 +1077,23 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
> unsigned long flags;
>
> spin_lock_irqsave(&buffers->rb_lock, flags);
> + if (list_empty(&buffers->rb_send_bufs))
> + goto out_reqbuf;
> + req = rpcrdma_buffer_get_req_locked(buffers);
> + if (list_empty(&buffers->rb_recv_bufs))
> + goto out_repbuf;
> + req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
> + spin_unlock_irqrestore(&buffers->rb_lock, flags);
> + return req;
>
> - if (buffers->rb_send_index == buffers->rb_max_requests) {
> - spin_unlock_irqrestore(&buffers->rb_lock, flags);
> - dprintk("RPC: %s: out of request buffers\n", __func__);
> - return ((struct rpcrdma_req *)NULL);
> - }
> -
> - req = buffers->rb_send_bufs[buffers->rb_send_index];
> - if (buffers->rb_send_index < buffers->rb_recv_index) {
> - dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
> - __func__,
> - buffers->rb_recv_index - buffers->rb_send_index);
> - req->rl_reply = NULL;
> - } else {
> - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
> - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
> - }
> - buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
> -
> +out_reqbuf:
> + spin_unlock_irqrestore(&buffers->rb_lock, flags);
> + pr_warn("RPC: %s: out of request buffers\n", __func__);
> + return NULL;
> +out_repbuf:
> spin_unlock_irqrestore(&buffers->rb_lock, flags);
> + pr_warn("RPC: %s: out of reply buffers\n", __func__);
> + req->rl_reply = NULL;
> return req;
> }
>
> @@ -1125,17 +1105,22 @@ void
> rpcrdma_buffer_put(struct rpcrdma_req *req)
> {
> struct rpcrdma_buffer *buffers = req->rl_buffer;
> + struct rpcrdma_rep *rep = req->rl_reply;
> unsigned long flags;
>
> + req->rl_niovs = 0;
> + req->rl_reply = NULL;
> +
> spin_lock_irqsave(&buffers->rb_lock, flags);
> - rpcrdma_buffer_put_sendbuf(req, buffers);
> + list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
> + if (rep)
> + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
> spin_unlock_irqrestore(&buffers->rb_lock, flags);
> }
>
> /*
> * Recover reply buffers from pool.
> - * This happens when recovering from error conditions.
> - * Post-increment counter/array index.
> + * This happens when recovering from disconnect.
> */
> void
> rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
> @@ -1144,10 +1129,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
> unsigned long flags;
>
> spin_lock_irqsave(&buffers->rb_lock, flags);
> - if (buffers->rb_recv_index < buffers->rb_max_requests) {
> - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
> - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
> - }
> + if (!list_empty(&buffers->rb_recv_bufs))
> + req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
> spin_unlock_irqrestore(&buffers->rb_lock, flags);
> }
>
> @@ -1162,7 +1145,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
> unsigned long flags;
>
> spin_lock_irqsave(&buffers->rb_lock, flags);
> - buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
> + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
> spin_unlock_irqrestore(&buffers->rb_lock, flags);
> }
>
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index a13508b..e6a358f 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
> #define RPCRDMA_MAX_IOVS (2)
>
> struct rpcrdma_req {
> + struct list_head rl_free;
> unsigned int rl_niovs;
> unsigned int rl_nchunks;
> unsigned int rl_connect_cookie;
> @@ -285,12 +286,10 @@ struct rpcrdma_buffer {
> struct list_head rb_all;
> char *rb_pool;
>
> - spinlock_t rb_lock; /* protect buf arrays */
> + spinlock_t rb_lock; /* protect buf lists */
> + struct list_head rb_send_bufs;
> + struct list_head rb_recv_bufs;
> u32 rb_max_requests;
> - int rb_send_index;
> - int rb_recv_index;
> - struct rpcrdma_req **rb_send_bufs;
> - struct rpcrdma_rep **rb_recv_bufs;
> };
> #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
Looks good! I will send a test report with ocrdma driver.
Reviewed-By: Devesh Sharma <[email protected]>
On Tue, Oct 6, 2015 at 8:29 PM, Chuck Lever <[email protected]> wrote:
> The reply tasklet is fast, but it's single threaded. After reply
> traffic saturates a single CPU, there's no more reply processing
> capacity.
>
> Replace the tasklet with a workqueue to spread reply handling across
> all CPUs. This also moves RPC/RDMA reply handling out of the soft
> IRQ context and into a context that allows sleeps.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/rpc_rdma.c | 17 +++++++-----
> net/sunrpc/xprtrdma/transport.c | 8 ++++++
> net/sunrpc/xprtrdma/verbs.c | 54 ++++++++++++++++++++++++++++++++-------
> net/sunrpc/xprtrdma/xprt_rdma.h | 4 +++
> 4 files changed, 65 insertions(+), 18 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index 60ffa63..95774fc 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -723,8 +723,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
> schedule_delayed_work(&ep->rep_connect_worker, 0);
> }
>
> -/*
> - * Called as a tasklet to do req/reply match and complete a request
> +/* Process received RPC/RDMA messages.
> + *
> * Errors must result in the RPC task either being awakened, or
> * allowed to timeout, to discover the errors at that time.
> */
> @@ -752,13 +752,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
> if (headerp->rm_vers != rpcrdma_version)
> goto out_badversion;
>
> - /* Get XID and try for a match. */
> - spin_lock(&xprt->transport_lock);
> + /* Match incoming rpcrdma_rep to an rpcrdma_req to
> + * get context for handling any incoming chunks.
> + */
> + spin_lock_bh(&xprt->transport_lock);
> rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
> if (!rqst)
> goto out_nomatch;
>
> - /* get request object */
> req = rpcr_to_rdmar(rqst);
> if (req->rl_reply)
> goto out_duplicate;
> @@ -859,7 +860,7 @@ badheader:
> xprt_release_rqst_cong(rqst->rq_task);
>
> xprt_complete_rqst(rqst->rq_task, status);
> - spin_unlock(&xprt->transport_lock);
> + spin_unlock_bh(&xprt->transport_lock);
> dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
> __func__, xprt, rqst, status);
> return;
> @@ -882,14 +883,14 @@ out_badversion:
> goto repost;
>
> out_nomatch:
> - spin_unlock(&xprt->transport_lock);
> + spin_unlock_bh(&xprt->transport_lock);
> dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
> __func__, be32_to_cpu(headerp->rm_xid),
> rep->rr_len);
> goto repost;
>
> out_duplicate:
> - spin_unlock(&xprt->transport_lock);
> + spin_unlock_bh(&xprt->transport_lock);
> dprintk("RPC: %s: "
> "duplicate reply %p to RPC request %p: xid 0x%08x\n",
> __func__, rep, req, be32_to_cpu(headerp->rm_xid));
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index e9e5ed7..897a2f3 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -732,6 +732,7 @@ void xprt_rdma_cleanup(void)
> dprintk("RPC: %s: xprt_unregister returned %i\n",
> __func__, rc);
>
> + rpcrdma_destroy_wq();
> frwr_destroy_recovery_wq();
> }
>
> @@ -743,8 +744,15 @@ int xprt_rdma_init(void)
> if (rc)
> return rc;
>
> + rc = rpcrdma_alloc_wq();
> + if (rc) {
> + frwr_destroy_recovery_wq();
> + return rc;
> + }
> +
> rc = xprt_register_transport(&xprt_rdma);
> if (rc) {
> + rpcrdma_destroy_wq();
> frwr_destroy_recovery_wq();
> return rc;
> }
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index ab26392..cf2f5b3 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data)
>
> static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
>
> +static struct workqueue_struct *rpcrdma_receive_wq;
> +
> +int
> +rpcrdma_alloc_wq(void)
> +{
> + struct workqueue_struct *recv_wq;
> +
> + recv_wq = alloc_workqueue("xprtrdma_receive",
> + WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
> + 0);
> + if (!recv_wq)
> + return -ENOMEM;
> +
> + rpcrdma_receive_wq = recv_wq;
> + return 0;
> +}
> +
> +void
> +rpcrdma_destroy_wq(void)
> +{
> + struct workqueue_struct *wq;
> +
> + if (rpcrdma_receive_wq) {
> + wq = rpcrdma_receive_wq;
> + rpcrdma_receive_wq = NULL;
> + destroy_workqueue(wq);
> + }
> +}
> +
> static void
> rpcrdma_schedule_tasklet(struct list_head *sched_list)
> {
> @@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
> }
>
> static void
> -rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
> +rpcrdma_receive_worker(struct work_struct *work)
> +{
> + struct rpcrdma_rep *rep =
> + container_of(work, struct rpcrdma_rep, rr_work);
> +
> + rpcrdma_reply_handler(rep);
> +}
> +
> +static void
> +rpcrdma_recvcq_process_wc(struct ib_wc *wc)
> {
> struct rpcrdma_rep *rep =
> (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
> @@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
> prefetch(rdmab_to_msg(rep->rr_rdmabuf));
>
> out_schedule:
> - list_add_tail(&rep->rr_list, sched_list);
> + queue_work(rpcrdma_receive_wq, &rep->rr_work);
> return;
> +
> out_fail:
> if (wc->status != IB_WC_WR_FLUSH_ERR)
> pr_err("RPC: %s: rep %p: %s\n",
> @@ -239,7 +278,6 @@ static void
> rpcrdma_recvcq_poll(struct ib_cq *cq)
> {
> struct ib_wc *pos, wcs[4];
> - LIST_HEAD(sched_list);
> int count, rc;
>
> do {
> @@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq)
>
> count = rc;
> while (count-- > 0)
> - rpcrdma_recvcq_process_wc(pos++, &sched_list);
> + rpcrdma_recvcq_process_wc(pos++);
> } while (rc == ARRAY_SIZE(wcs));
> -
> - rpcrdma_schedule_tasklet(&sched_list);
> }
>
> /* Handle provider receive completion upcalls.
> @@ -272,12 +308,9 @@ static void
> rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
> {
> struct ib_wc wc;
> - LIST_HEAD(sched_list);
>
> while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
> - rpcrdma_recvcq_process_wc(&wc, &sched_list);
> - if (!list_empty(&sched_list))
> - rpcrdma_schedule_tasklet(&sched_list);
> + rpcrdma_recvcq_process_wc(&wc);
> while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
> rpcrdma_sendcq_process_wc(&wc);
> }
> @@ -915,6 +948,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
>
> rep->rr_device = ia->ri_device;
> rep->rr_rxprt = r_xprt;
> + INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
> return rep;
>
> out_free:
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index e6a358f..6ea1dbe 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -164,6 +164,7 @@ struct rpcrdma_rep {
> unsigned int rr_len;
> struct ib_device *rr_device;
> struct rpcrdma_xprt *rr_rxprt;
> + struct work_struct rr_work;
> struct list_head rr_list;
> struct rpcrdma_regbuf *rr_rdmabuf;
> };
> @@ -430,6 +431,9 @@ unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
> int frwr_alloc_recovery_wq(void);
> void frwr_destroy_recovery_wq(void);
>
> +int rpcrdma_alloc_wq(void);
> +void rpcrdma_destroy_wq(void);
> +
> /*
> * Wrappers for chunk registration, shared by read/write chunk code.
> */
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
On Tue, Oct 6, 2015 at 6:00 PM, Chuck Lever <[email protected]> wrote:
> Pass the correct backchannel transport class to svc_create_xprt()
> when setting up an NFSv4.1 backchannel transport.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> fs/nfs/callback.c | 33 +++++++++++++++++++++------------
> include/linux/sunrpc/xprt.h | 1 +
> net/sunrpc/xprtrdma/transport.c | 1 +
> net/sunrpc/xprtsock.c | 1 +
> 4 files changed, 24 insertions(+), 12 deletions(-)
>
> diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
> index 75f7c0a..46ed2c5 100644
> --- a/fs/nfs/callback.c
> +++ b/fs/nfs/callback.c
> @@ -99,15 +99,22 @@ nfs4_callback_up(struct svc_serv *serv)
> }
>
> #if defined(CONFIG_NFS_V4_1)
> -static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
> +/*
> + * Create an svc_sock for the back channel service that shares the
> + * fore channel connection.
> + * Returns the input port (0) and sets the svc_serv bc_xprt on success
> + */
> +static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net,
> + struct rpc_xprt *xprt)
> {
> - /*
> - * Create an svc_sock for the back channel service that shares the
> - * fore channel connection.
> - * Returns the input port (0) and sets the svc_serv bc_xprt on success
> - */
> - return svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
> - SVC_SOCK_ANONYMOUS);
> + int ret = -EPROTONOSUPPORT;
> +
> + if (xprt->bc_name)
> + ret = svc_create_xprt(serv, xprt->bc_name, net, PF_INET, 0,
> + SVC_SOCK_ANONYMOUS);
> + dprintk("NFS: svc_create_xprt(%s) returned %d\n",
> + xprt->bc_name, ret);
Maybe we should convert it to more general and common pr_debug(..)
interface [1]?
Despite the fact that the other parts of code already use the dprintk macro.
> + return ret;
> }
>
> /*
> @@ -184,7 +191,8 @@ static inline void nfs_callback_bc_serv(u32 minorversion, struct rpc_xprt *xprt,
> xprt->bc_serv = serv;
> }
> #else
> -static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
> +static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net,
> + struct rpc_xprt *xprt)
> {
> return 0;
> }
> @@ -259,7 +267,8 @@ static void nfs_callback_down_net(u32 minorversion, struct svc_serv *serv, struc
> svc_shutdown_net(serv, net);
> }
>
> -static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct net *net)
> +static int nfs_callback_up_net(int minorversion, struct svc_serv *serv,
> + struct net *net, struct rpc_xprt *xprt)
> {
> struct nfs_net *nn = net_generic(net, nfs_net_id);
> int ret;
> @@ -281,7 +290,7 @@ static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct n
> break;
> case 1:
> case 2:
> - ret = nfs41_callback_up_net(serv, net);
> + ret = nfs41_callback_up_net(serv, net, xprt);
> break;
> default:
> printk(KERN_ERR "NFS: unknown callback version: %d\n",
It can be pr_err(..) [1].
> @@ -364,7 +373,7 @@ int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt)
> goto err_create;
> }
>
> - ret = nfs_callback_up_net(minorversion, serv, net);
> + ret = nfs_callback_up_net(minorversion, serv, net, xprt);
> if (ret < 0)
> goto err_net;
>
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index 82c0839..88bae18 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -170,6 +170,7 @@ struct rpc_xprt {
> struct sockaddr_storage addr; /* server address */
> size_t addrlen; /* size of server address */
> int prot; /* IP protocol */
> + char *bc_name; /* backchannel transport */
>
> unsigned long cong; /* current congestion */
> unsigned long cwnd; /* congestion window */
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index 845278e..c60bbc8 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -337,6 +337,7 @@ xprt_setup_rdma(struct xprt_create *args)
> /* Ensure xprt->addr holds valid server TCP (not RDMA)
> * address, for any side protocols which peek at it */
> xprt->prot = IPPROTO_TCP;
> + xprt->bc_name = "rdma-bc";
> xprt->addrlen = args->addrlen;
> memcpy(&xprt->addr, sap, xprt->addrlen);
>
> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> index 3084163..d82d00b 100644
> --- a/net/sunrpc/xprtsock.c
> +++ b/net/sunrpc/xprtsock.c
> @@ -2858,6 +2858,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
> transport = container_of(xprt, struct sock_xprt, xprt);
>
> xprt->prot = IPPROTO_TCP;
> + xprt->bc_name = "tcp-bc";
> xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
> xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
>
[1] https://lwn.net/Articles/487437/
> On Oct 7, 2015, at 5:14 AM, Leon Romanovsky <[email protected]> wrote:
>
> On Tue, Oct 6, 2015 at 6:00 PM, Chuck Lever <[email protected]> wrote:
>> Pass the correct backchannel transport class to svc_create_xprt()
>> when setting up an NFSv4.1 backchannel transport.
>>
>> Signed-off-by: Chuck Lever <[email protected]>
>> ---
>> fs/nfs/callback.c | 33 +++++++++++++++++++++------------
>> include/linux/sunrpc/xprt.h | 1 +
>> net/sunrpc/xprtrdma/transport.c | 1 +
>> net/sunrpc/xprtsock.c | 1 +
>> 4 files changed, 24 insertions(+), 12 deletions(-)
>>
>> diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
>> index 75f7c0a..46ed2c5 100644
>> --- a/fs/nfs/callback.c
>> +++ b/fs/nfs/callback.c
>> @@ -99,15 +99,22 @@ nfs4_callback_up(struct svc_serv *serv)
>> }
>>
>> #if defined(CONFIG_NFS_V4_1)
>> -static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
>> +/*
>> + * Create an svc_sock for the back channel service that shares the
>> + * fore channel connection.
>> + * Returns the input port (0) and sets the svc_serv bc_xprt on success
>> + */
>> +static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net,
>> + struct rpc_xprt *xprt)
>> {
>> - /*
>> - * Create an svc_sock for the back channel service that shares the
>> - * fore channel connection.
>> - * Returns the input port (0) and sets the svc_serv bc_xprt on success
>> - */
>> - return svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
>> - SVC_SOCK_ANONYMOUS);
>> + int ret = -EPROTONOSUPPORT;
>> +
>> + if (xprt->bc_name)
>> + ret = svc_create_xprt(serv, xprt->bc_name, net, PF_INET, 0,
>> + SVC_SOCK_ANONYMOUS);
>> + dprintk("NFS: svc_create_xprt(%s) returned %d\n",
>> + xprt->bc_name, ret);
>
> Maybe we should convert it to more general and common pr_debug(..)
> interface [1]?
> Despite the fact that the other parts of code already use the dprintk macro.
dprintk in this code is deprecated, and I used it here out
of laziness. I guess the maintainer might prefer a trace point
instead, or even no output.
In any event, converting dprintk to pr_debug() is out of scope
for this patch series.
>> + return ret;
>> }
>>
>> /*
>> @@ -184,7 +191,8 @@ static inline void nfs_callback_bc_serv(u32 minorversion, struct rpc_xprt *xprt,
>> xprt->bc_serv = serv;
>> }
>> #else
>> -static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
>> +static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net,
>> + struct rpc_xprt *xprt)
>> {
>> return 0;
>> }
>> @@ -259,7 +267,8 @@ static void nfs_callback_down_net(u32 minorversion, struct svc_serv *serv, struc
>> svc_shutdown_net(serv, net);
>> }
>>
>> -static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct net *net)
>> +static int nfs_callback_up_net(int minorversion, struct svc_serv *serv,
>> + struct net *net, struct rpc_xprt *xprt)
>> {
>> struct nfs_net *nn = net_generic(net, nfs_net_id);
>> int ret;
>> @@ -281,7 +290,7 @@ static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct n
>> break;
>> case 1:
>> case 2:
>> - ret = nfs41_callback_up_net(serv, net);
>> + ret = nfs41_callback_up_net(serv, net, xprt);
>> break;
>> default:
>> printk(KERN_ERR "NFS: unknown callback version: %d\n",
> It can be pr_err(..) [1].
It can be, but this patch isn’t changing this line, nor
is it’s purpose to clean up error messages.
It would be better to submit a separate patch to clean
these up, if the maintainer suggests this is something
he will accept.
>
>> @@ -364,7 +373,7 @@ int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt)
>> goto err_create;
>> }
>>
>> - ret = nfs_callback_up_net(minorversion, serv, net);
>> + ret = nfs_callback_up_net(minorversion, serv, net, xprt);
>> if (ret < 0)
>> goto err_net;
>>
>> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
>> index 82c0839..88bae18 100644
>> --- a/include/linux/sunrpc/xprt.h
>> +++ b/include/linux/sunrpc/xprt.h
>> @@ -170,6 +170,7 @@ struct rpc_xprt {
>> struct sockaddr_storage addr; /* server address */
>> size_t addrlen; /* size of server address */
>> int prot; /* IP protocol */
>> + char *bc_name; /* backchannel transport */
>>
>> unsigned long cong; /* current congestion */
>> unsigned long cwnd; /* congestion window */
>> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
>> index 845278e..c60bbc8 100644
>> --- a/net/sunrpc/xprtrdma/transport.c
>> +++ b/net/sunrpc/xprtrdma/transport.c
>> @@ -337,6 +337,7 @@ xprt_setup_rdma(struct xprt_create *args)
>> /* Ensure xprt->addr holds valid server TCP (not RDMA)
>> * address, for any side protocols which peek at it */
>> xprt->prot = IPPROTO_TCP;
>> + xprt->bc_name = "rdma-bc";
>> xprt->addrlen = args->addrlen;
>> memcpy(&xprt->addr, sap, xprt->addrlen);
>>
>> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
>> index 3084163..d82d00b 100644
>> --- a/net/sunrpc/xprtsock.c
>> +++ b/net/sunrpc/xprtsock.c
>> @@ -2858,6 +2858,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
>> transport = container_of(xprt, struct sock_xprt, xprt);
>>
>> xprt->prot = IPPROTO_TCP;
>> + xprt->bc_name = "tcp-bc";
>> xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
>> xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
>>
>
> [1] https://lwn.net/Articles/487437/
—
Chuck Lever
On 10/6/2015 5:59 PM, Chuck Lever wrote:
> The reply tasklet is fast, but it's single threaded. After reply
> traffic saturates a single CPU, there's no more reply processing
> capacity.
>
> Replace the tasklet with a workqueue to spread reply handling across
> all CPUs. This also moves RPC/RDMA reply handling out of the soft
> IRQ context and into a context that allows sleeps.
Hi Chuck,
I'm probably missing something here, but do you ever schedule in
the workqueue context? Don't you need to explicitly schedule after
a jiffie or so the code works also in a non fully preemptable kernel?
> On Oct 7, 2015, at 10:39 AM, Sagi Grimberg <[email protected]> wrote:
>
> On 10/6/2015 5:59 PM, Chuck Lever wrote:
>> The reply tasklet is fast, but it's single threaded. After reply
>> traffic saturates a single CPU, there's no more reply processing
>> capacity.
>>
>> Replace the tasklet with a workqueue to spread reply handling across
>> all CPUs. This also moves RPC/RDMA reply handling out of the soft
>> IRQ context and into a context that allows sleeps.
>
> Hi Chuck,
>
> I'm probably missing something here, but do you ever schedule in
> the workqueue context? Don't you need to explicitly schedule after
> a jiffie or so the code works also in a non fully preemptable kernel?
Each RPC reply gets its own work request. This is unlike
the tasklet, which continues to run as long as there are
items on xprtrdma’s global tasklet queue.
I can’t think of anything in the current reply handler
that would take longer than a few microseconds to run,
unless there is lock contention on the transport_lock.
wake_up_bit can also be slow sometimes, but it schedules
internally.
Eventually the reply handler will also synchronously
perform R_key invalidation. In that case, I think there
will be an implicit schedule while waiting for the
invalidation to finish.
-—
Chuck Lever
On 10/7/2015 5:48 PM, Chuck Lever wrote:
>
>> On Oct 7, 2015, at 10:39 AM, Sagi Grimberg <[email protected]> wrote:
>>
>> On 10/6/2015 5:59 PM, Chuck Lever wrote:
>>> The reply tasklet is fast, but it's single threaded. After reply
>>> traffic saturates a single CPU, there's no more reply processing
>>> capacity.
>>>
>>> Replace the tasklet with a workqueue to spread reply handling across
>>> all CPUs. This also moves RPC/RDMA reply handling out of the soft
>>> IRQ context and into a context that allows sleeps.
>>
>> Hi Chuck,
>>
>> I'm probably missing something here, but do you ever schedule in
>> the workqueue context? Don't you need to explicitly schedule after
>> a jiffie or so the code works also in a non fully preemptable kernel?
>
> Each RPC reply gets its own work request. This is unlike
> the tasklet, which continues to run as long as there are
> items on xprtrdma’s global tasklet queue.
OK I understand now.
So this and the rest of the series looks good to me.
Reviewed-by: Sagi Grimberg <[email protected]>
Hi Chuck,
With the server crash fix in place, ocrdma is passing iozone on this series.
Series Tested-By: Devesh Sharma <[email protected]>
On Tue, Oct 6, 2015 at 8:28 PM, Chuck Lever <[email protected]> wrote:
> Introduce client-side support for bi-directional RPC/RDMA.
> Bi-directional RPC/RDMA is a pre-requisite for NFSv4.1 on RDMA
> transports.
>
> Also available in the "nfs-rdma-for-4.4" topic branch of this git repo:
>
> git://git.linux-nfs.org/projects/cel/cel-2.6.git
>
> Or for browsing:
>
> http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfs-rdma-for-4.4
>
> Changes since v1:
> - Dropped "RFC" in Subject: line
> - Rebased on v4.3-rc4 + Steve W's recent fixes
> - NFS Server-side backchannel support postponed
> - "xprtrdma: Replace global lkey" dropped, already merged
> - Addressed Sagi's comments on "Replace send and receive arrays"
> - Addressed Jason's comment regarding ib_req_notify_cq return code
> - Moved RPC/RDMA reply handling into a work queue
>
> I'm assuming recent list discussion has addressed Devesh's and
> Sagi's concerns with "Prevent loss of completion signals". Let me
> know if there is still a problem.
>
> ---
>
> Chuck Lever (16):
> xprtrdma: Enable swap-on-NFS/RDMA
> xprtrdma: Re-arm after missed events
> xprtrdma: Prevent loss of completion signals
> xprtrdma: Refactor reply handler error handling
> xprtrdma: Replace send and receive arrays
> xprtrdma: Use workqueue to process RPC/RDMA replies
> xprtrdma: Remove reply tasklet
> xprtrdma: Saving IRQs no longer needed for rb_lock
> SUNRPC: Abstract backchannel operations
> xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers
> xprtrdma: Pre-allocate Work Requests for backchannel
> xprtrdma: Add support for sending backward direction RPC replies
> xprtrdma: Handle incoming backward direction RPC calls
> svcrdma: Add backward direction service for RPC/RDMA transport
> SUNRPC: Remove the TCP-only restriction in bc_svc_process()
> NFS: Enable client side NFSv4.1 backchannel to use other transports
>
>
> fs/nfs/callback.c | 33 +-
> include/linux/sunrpc/bc_xprt.h | 5
> include/linux/sunrpc/svc_rdma.h | 6
> include/linux/sunrpc/xprt.h | 7
> net/sunrpc/backchannel_rqst.c | 24 +-
> net/sunrpc/svc.c | 5
> net/sunrpc/xprtrdma/Makefile | 1
> net/sunrpc/xprtrdma/backchannel.c | 373 +++++++++++++++++++++++
> net/sunrpc/xprtrdma/rpc_rdma.c | 148 ++++++---
> net/sunrpc/xprtrdma/svc_rdma.c | 6
> net/sunrpc/xprtrdma/svc_rdma_transport.c | 58 ++++
> net/sunrpc/xprtrdma/transport.c | 18 +
> net/sunrpc/xprtrdma/verbs.c | 479 +++++++++++++++---------------
> net/sunrpc/xprtrdma/xprt_rdma.h | 53 +++
> net/sunrpc/xprtsock.c | 16 +
> 15 files changed, 916 insertions(+), 316 deletions(-)
> create mode 100644 net/sunrpc/xprtrdma/backchannel.c
>
> --
> Chuck Lever
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
> On Oct 14, 2015, at 9:32 AM, Devesh Sharma <[email protected]> wrote:
>
> Hi Chuck,
>
> With the server crash fix in place, ocrdma is passing iozone on this series.
>
> Series Tested-By: Devesh Sharma <[email protected]>
Thanks to you and Sagi. A v3 is coming soon, which should be more
or less ready to merge.
>
> On Tue, Oct 6, 2015 at 8:28 PM, Chuck Lever <[email protected]> wrote:
>> Introduce client-side support for bi-directional RPC/RDMA.
>> Bi-directional RPC/RDMA is a pre-requisite for NFSv4.1 on RDMA
>> transports.
>>
>> Also available in the "nfs-rdma-for-4.4" topic branch of this git repo:
>>
>> git://git.linux-nfs.org/projects/cel/cel-2.6.git
>>
>> Or for browsing:
>>
>> http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfs-rdma-for-4.4
>>
>> Changes since v1:
>> - Dropped "RFC" in Subject: line
>> - Rebased on v4.3-rc4 + Steve W's recent fixes
>> - NFS Server-side backchannel support postponed
>> - "xprtrdma: Replace global lkey" dropped, already merged
>> - Addressed Sagi's comments on "Replace send and receive arrays"
>> - Addressed Jason's comment regarding ib_req_notify_cq return code
>> - Moved RPC/RDMA reply handling into a work queue
>>
>> I'm assuming recent list discussion has addressed Devesh's and
>> Sagi's concerns with "Prevent loss of completion signals". Let me
>> know if there is still a problem.
>>
>> ---
>>
>> Chuck Lever (16):
>> xprtrdma: Enable swap-on-NFS/RDMA
>> xprtrdma: Re-arm after missed events
>> xprtrdma: Prevent loss of completion signals
>> xprtrdma: Refactor reply handler error handling
>> xprtrdma: Replace send and receive arrays
>> xprtrdma: Use workqueue to process RPC/RDMA replies
>> xprtrdma: Remove reply tasklet
>> xprtrdma: Saving IRQs no longer needed for rb_lock
>> SUNRPC: Abstract backchannel operations
>> xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers
>> xprtrdma: Pre-allocate Work Requests for backchannel
>> xprtrdma: Add support for sending backward direction RPC replies
>> xprtrdma: Handle incoming backward direction RPC calls
>> svcrdma: Add backward direction service for RPC/RDMA transport
>> SUNRPC: Remove the TCP-only restriction in bc_svc_process()
>> NFS: Enable client side NFSv4.1 backchannel to use other transports
>>
>>
>> fs/nfs/callback.c | 33 +-
>> include/linux/sunrpc/bc_xprt.h | 5
>> include/linux/sunrpc/svc_rdma.h | 6
>> include/linux/sunrpc/xprt.h | 7
>> net/sunrpc/backchannel_rqst.c | 24 +-
>> net/sunrpc/svc.c | 5
>> net/sunrpc/xprtrdma/Makefile | 1
>> net/sunrpc/xprtrdma/backchannel.c | 373 +++++++++++++++++++++++
>> net/sunrpc/xprtrdma/rpc_rdma.c | 148 ++++++---
>> net/sunrpc/xprtrdma/svc_rdma.c | 6
>> net/sunrpc/xprtrdma/svc_rdma_transport.c | 58 ++++
>> net/sunrpc/xprtrdma/transport.c | 18 +
>> net/sunrpc/xprtrdma/verbs.c | 479 +++++++++++++++---------------
>> net/sunrpc/xprtrdma/xprt_rdma.h | 53 +++
>> net/sunrpc/xprtsock.c | 16 +
>> 15 files changed, 916 insertions(+), 316 deletions(-)
>> create mode 100644 net/sunrpc/xprtrdma/backchannel.c
>>
>> --
>> Chuck Lever
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at http://vger.kernel.org/majordomo-info.html
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
—
Chuck Lever