This series implements the following:
- Removal of the insecure ALLPHYSICAL memory registration mode
- Fixes to FMR disconnect recovery
- Significant reductions in per-transport memory consumption
- Support for sec=krb5, sec=krb5i, and sec=krb5p with NFS/RDMA
(with no performance impact on sec=sys)
- More pre-requisites for device removal support
Kerberos with NFS/RDMA is useful mainly for secure authentication of
each RPC transaction (sec=krb5). The Kerberos integrity and privacy
services are also available, providing feature parity with TCP in
environments where the use of sec=krb5i or sec=krb5p are mandated by
IT policy.
Available in the "nfs-rdma-for-4.8" topic branch of this git repo:
git://git.linux-nfs.org/projects/cel/cel-2.6.git
Or for browsing:
http://git.linux-nfs.org/?p=cel/cel-2.6.git;a=log;h=refs/heads/nfs-rdma-for-4.8
---
Chuck Lever (20):
xprtrdma: Remove ALLPHYSICAL memory registration mode
xprtrdma: Refactor ->ro_init
xprtrdma: Create common scatterlist fields in rpcrdma_mw
xprtrdma: Use scatterlist for DMA mapping and unmapping under FMR
xprtrdma: Remove rpcrdma_map_one() and friends
xprtrdma: Refactor MR recovery work queues
xprtrdma: Place registered MWs on a per-req list
xprtrdma: Reply buffer exhaustion can be catastrophic
xprtrdma: Limit the number of rpcrdma_mws
xprtrdma: Chunk list encoders no longer share one rl_segments array
xprtrdma: rpcrdma_inline_fixup() overruns the receive page list
xprtrdma: Do not update {head,tail}.iov_len in rpcrdma_inline_fixup()
xprtrdma: Update only specific fields in private receive buffer
xprtrdma: Clean up fixup_copy_count accounting
xprtrdma: No direct data placement with krb5i and krb5p
svc: Avoid garbage replies when pc_func() returns rpc_drop_reply
NFS: Don't drop CB requests with invalid principals
xprtrdma: Eliminate rpcrdma_receive_worker()
xprtrdma: Eliminate INLINE_THRESHOLD macros
xprtrdma: Relocate connection helper functions
fs/nfs/callback_xdr.c | 6 +
include/linux/sunrpc/auth.h | 5 -
include/linux/sunrpc/gss_api.h | 2
net/sunrpc/auth_gss/auth_gss.c | 5 -
net/sunrpc/auth_gss/gss_krb5_mech.c | 2
net/sunrpc/auth_gss/gss_mech_switch.c | 12 +
net/sunrpc/svc.c | 8 +
net/sunrpc/xprtrdma/Makefile | 2
net/sunrpc/xprtrdma/backchannel.c | 4
net/sunrpc/xprtrdma/fmr_ops.c | 270 +++++++++++++-----------------
net/sunrpc/xprtrdma/frwr_ops.c | 231 +++++++++----------------
net/sunrpc/xprtrdma/physical_ops.c | 122 -------------
net/sunrpc/xprtrdma/rpc_rdma.c | 301 ++++++++++++++++-----------------
net/sunrpc/xprtrdma/transport.c | 48 ++++-
net/sunrpc/xprtrdma/verbs.c | 86 +++++----
net/sunrpc/xprtrdma/xprt_rdma.h | 134 +++++----------
16 files changed, 512 insertions(+), 726 deletions(-)
delete mode 100644 net/sunrpc/xprtrdma/physical_ops.c
--
Chuck Lever
No HCA or RNIC in the kernel tree requires the use of ALLPHYSICAL.
ALLPHYSICAL advertises in the clear on the network fabric an R_key
that is good for all of the client's memory. No known exploit
exists, but theoretically any user on the server can use that R_key
on the client's QP to read or update any part of the client's memory.
ALLPHYSICAL can't protect application memory regions from server
update after a local signal or soft timeout has terminated an RPC.
ALLPHYSICAL chunks are no larger than a page. Special cases to
handle small chunks and long chunk lists have been a source of
implementation complexity and bugs.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/Makefile | 2 -
net/sunrpc/xprtrdma/physical_ops.c | 122 ------------------------------------
net/sunrpc/xprtrdma/verbs.c | 15 ----
net/sunrpc/xprtrdma/xprt_rdma.h | 5 -
4 files changed, 2 insertions(+), 142 deletions(-)
delete mode 100644 net/sunrpc/xprtrdma/physical_ops.c
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index dc9f3b5..ef19fa4 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -1,7 +1,7 @@
obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
rpcrdma-y := transport.o rpc_rdma.o verbs.o \
- fmr_ops.o frwr_ops.o physical_ops.o \
+ fmr_ops.o frwr_ops.o \
svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
module.o
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
deleted file mode 100644
index 3750596..0000000
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright (c) 2015 Oracle. All rights reserved.
- * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
- */
-
-/* No-op chunk preparation. All client memory is pre-registered.
- * Sometimes referred to as ALLPHYSICAL mode.
- *
- * Physical registration is simple because all client memory is
- * pre-registered and never deregistered. This mode is good for
- * adapter bring up, but is considered not safe: the server is
- * trusted not to abuse its access to client memory not involved
- * in RDMA I/O.
- */
-
-#include "xprt_rdma.h"
-
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-# define RPCDBG_FACILITY RPCDBG_TRANS
-#endif
-
-static int
-physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
- struct rpcrdma_create_data_internal *cdata)
-{
- struct ib_mr *mr;
-
- /* Obtain an rkey to use for RPC data payloads.
- */
- mr = ib_get_dma_mr(ia->ri_pd,
- IB_ACCESS_LOCAL_WRITE |
- IB_ACCESS_REMOTE_WRITE |
- IB_ACCESS_REMOTE_READ);
- if (IS_ERR(mr)) {
- pr_err("%s: ib_get_dma_mr for failed with %lX\n",
- __func__, PTR_ERR(mr));
- return -ENOMEM;
- }
- ia->ri_dma_mr = mr;
-
- rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int,
- RPCRDMA_MAX_DATA_SEGS,
- RPCRDMA_MAX_HDR_SEGS));
- return 0;
-}
-
-/* PHYSICAL memory registration conveys one page per chunk segment.
- */
-static size_t
-physical_op_maxpages(struct rpcrdma_xprt *r_xprt)
-{
- return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- RPCRDMA_MAX_HDR_SEGS);
-}
-
-static int
-physical_op_init(struct rpcrdma_xprt *r_xprt)
-{
- return 0;
-}
-
-/* The client's physical memory is already exposed for
- * remote access via RDMA READ or RDMA WRITE.
- */
-static int
-physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing)
-{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-
- rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
- seg->mr_rkey = ia->ri_dma_mr->rkey;
- seg->mr_base = seg->mr_dma;
- return 1;
-}
-
-/* DMA unmap all memory regions that were mapped for "req".
- */
-static void
-physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
-{
- struct ib_device *device = r_xprt->rx_ia.ri_device;
- unsigned int i;
-
- for (i = 0; req->rl_nchunks; --req->rl_nchunks)
- rpcrdma_unmap_one(device, &req->rl_segments[i++]);
-}
-
-/* Use a slow, safe mechanism to invalidate all memory regions
- * that were registered for "req".
- *
- * For physical memory registration, there is no good way to
- * fence a single MR that has been advertised to the server. The
- * client has already handed the server an R_key that cannot be
- * invalidated and is shared by all MRs on this connection.
- * Tearing down the PD might be the only safe choice, but it's
- * not clear that a freshly acquired DMA R_key would be different
- * than the one used by the PD that was just destroyed.
- * FIXME.
- */
-static void
-physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- bool sync)
-{
- physical_op_unmap_sync(r_xprt, req);
-}
-
-static void
-physical_op_destroy(struct rpcrdma_buffer *buf)
-{
-}
-
-const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
- .ro_map = physical_op_map,
- .ro_unmap_sync = physical_op_unmap_sync,
- .ro_unmap_safe = physical_op_unmap_safe,
- .ro_open = physical_op_open,
- .ro_maxpages = physical_op_maxpages,
- .ro_init = physical_op_init,
- .ro_destroy = physical_op_destroy,
- .ro_displayname = "physical",
-};
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index b044d98a..7f09162 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -379,8 +379,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
struct rpcrdma_ia *ia = &xprt->rx_ia;
int rc;
- ia->ri_dma_mr = NULL;
-
ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
if (IS_ERR(ia->ri_id)) {
rc = PTR_ERR(ia->ri_id);
@@ -418,9 +416,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
case RPCRDMA_FRMR:
ia->ri_ops = &rpcrdma_frwr_memreg_ops;
break;
- case RPCRDMA_ALLPHYSICAL:
- ia->ri_ops = &rpcrdma_physical_memreg_ops;
- break;
case RPCRDMA_MTHCAFMR:
ia->ri_ops = &rpcrdma_fmr_memreg_ops;
break;
@@ -585,8 +580,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
out2:
ib_free_cq(sendcq);
out1:
- if (ia->ri_dma_mr)
- ib_dereg_mr(ia->ri_dma_mr);
return rc;
}
@@ -600,8 +593,6 @@ out1:
void
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
- int rc;
-
dprintk("RPC: %s: entering, connected is %d\n",
__func__, ep->rep_connected);
@@ -615,12 +606,6 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ib_free_cq(ep->rep_attr.recv_cq);
ib_free_cq(ep->rep_attr.send_cq);
-
- if (ia->ri_dma_mr) {
- rc = ib_dereg_mr(ia->ri_dma_mr);
- dprintk("RPC: %s: ib_dereg_mr returned %i\n",
- __func__, rc);
- }
}
/*
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 95cdc66..2421467 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -68,7 +68,6 @@ struct rpcrdma_ia {
struct ib_device *ri_device;
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
- struct ib_mr *ri_dma_mr;
struct completion ri_done;
int ri_async_rc;
unsigned int ri_max_frmr_depth;
@@ -270,8 +269,7 @@ struct rpcrdma_mw {
* NOTES:
* o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we
* marshal. The number needed varies depending on the iov lists that
- * are passed to us, the memory registration mode we are in, and if
- * physical addressing is used, the layout.
+ * are passed to us and the memory registration mode we are in.
*/
struct rpcrdma_mr_seg { /* chunk descriptors */
@@ -411,7 +409,6 @@ struct rpcrdma_memreg_ops {
extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;
extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops;
-extern const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops;
/*
* RPCRDMA transport -- encapsulates the structures above for
Clean up: Now that ALLPHYSICAL is gone, there is some buffer
initialization code that is common to remaining memory registration
modes. Move it into rpcrdma_buffer_create.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 4 ----
net/sunrpc/xprtrdma/frwr_ops.c | 4 ----
net/sunrpc/xprtrdma/verbs.c | 3 +++
3 files changed, 3 insertions(+), 8 deletions(-)
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 6326ebe..b8952c7 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -126,10 +126,6 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_mw *r;
int i, rc;
- spin_lock_init(&buf->rb_mwlock);
- INIT_LIST_HEAD(&buf->rb_mws);
- INIT_LIST_HEAD(&buf->rb_all);
-
i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1);
i += 2; /* head + tail */
i *= buf->rb_max_requests; /* one set for each RPC slot */
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index c094754..6e0e5e8 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -355,10 +355,6 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
int i;
- spin_lock_init(&buf->rb_mwlock);
- INIT_LIST_HEAD(&buf->rb_mws);
- INIT_LIST_HEAD(&buf->rb_all);
-
i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
i += 2; /* head + tail */
i *= buf->rb_max_requests; /* one set for each RPC slot */
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 7f09162..ecd494a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -822,6 +822,9 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
+ spin_lock_init(&buf->rb_mwlock);
+ INIT_LIST_HEAD(&buf->rb_mws);
+ INIT_LIST_HEAD(&buf->rb_all);
spin_lock_init(&buf->rb_lock);
atomic_set(&buf->rb_credits, 1);
Clean up: FMR is about to replace the rpcrdma_map_one code with
scatterlists. Move the scatterlist fields out of the FRWR-specific
union and into the generic part of rpcrdma_mw.
One minor change: -EIO is now returned if FRWR registration fails.
The RPC is terminated immediately, since the problem is likely due
to a software bug. Retrying likely won't help.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/frwr_ops.c | 85 +++++++++++++++++++--------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 8 ++--
2 files changed, 46 insertions(+), 47 deletions(-)
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 6e0e5e8..a476fa6 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -125,17 +125,16 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
}
static void
-__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
+__frwr_reset_and_unmap(struct rpcrdma_mw *mw)
{
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_frmr *f = &mw->frmr;
int rc;
rc = __frwr_reset_mr(ia, mw);
- ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir);
+ ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir);
if (rc)
return;
-
rpcrdma_put_mw(r_xprt, mw);
}
@@ -152,8 +151,7 @@ __frwr_recovery_worker(struct work_struct *work)
struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
mw_work);
- __frwr_reset_and_unmap(r->mw_xprt, r);
- return;
+ __frwr_reset_and_unmap(r);
}
/* A broken MR was discovered in a context that can't sleep.
@@ -167,8 +165,7 @@ __frwr_queue_recovery(struct rpcrdma_mw *r)
}
static int
-__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
- unsigned int depth)
+__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, unsigned int depth)
{
struct rpcrdma_frmr *f = &r->frmr;
int rc;
@@ -177,11 +174,11 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
if (IS_ERR(f->fr_mr))
goto out_mr_err;
- f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL);
- if (!f->fr_sg)
+ r->mw_sg = kcalloc(depth, sizeof(*r->mw_sg), GFP_KERNEL);
+ if (!r->mw_sg)
goto out_list_err;
- sg_init_table(f->fr_sg, depth);
+ sg_init_table(r->mw_sg, depth);
init_completion(&f->fr_linv_done);
@@ -210,7 +207,7 @@ __frwr_release(struct rpcrdma_mw *r)
if (rc)
dprintk("RPC: %s: ib_dereg_mr status %i\n",
__func__, rc);
- kfree(r->frmr.fr_sg);
+ kfree(r->mw_sg);
}
static int
@@ -350,7 +347,6 @@ static int
frwr_op_init(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct ib_device *device = r_xprt->rx_ia.ri_device;
unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
int i;
@@ -368,7 +364,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
if (!r)
return -ENOMEM;
- rc = __frwr_init(r, pd, device, depth);
+ rc = __frwr_init(r, pd, depth);
if (rc) {
kfree(r);
return rc;
@@ -382,7 +378,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
return 0;
}
-/* Post a FAST_REG Work Request to register a memory region
+/* Post a REG_MR Work Request to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
*/
static int
@@ -390,8 +386,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
int nsegs, bool writing)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct ib_device *device = ia->ri_device;
- enum dma_data_direction direction = rpcrdma_data_dir(writing);
struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_mw *mw;
struct rpcrdma_frmr *frmr;
@@ -417,15 +411,14 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
if (nsegs > ia->ri_max_frmr_depth)
nsegs = ia->ri_max_frmr_depth;
-
for (i = 0; i < nsegs;) {
if (seg->mr_page)
- sg_set_page(&frmr->fr_sg[i],
+ sg_set_page(&mw->mw_sg[i],
seg->mr_page,
seg->mr_len,
offset_in_page(seg->mr_offset));
else
- sg_set_buf(&frmr->fr_sg[i], seg->mr_offset,
+ sg_set_buf(&mw->mw_sg[i], seg->mr_offset,
seg->mr_len);
++seg;
@@ -436,26 +429,20 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
break;
}
- frmr->fr_nents = i;
- frmr->fr_dir = direction;
-
- dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction);
- if (!dma_nents) {
- pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n",
- __func__, frmr->fr_sg, frmr->fr_nents);
- return -ENOMEM;
- }
+ mw->mw_nents = i;
+ mw->mw_dir = rpcrdma_data_dir(writing);
- n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE);
- if (unlikely(n != frmr->fr_nents)) {
- pr_err("RPC: %s: failed to map mr %p (%u/%u)\n",
- __func__, frmr->fr_mr, n, frmr->fr_nents);
- rc = n < 0 ? n : -EINVAL;
- goto out_senderr;
- }
+ dma_nents = ib_dma_map_sg(ia->ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
+ if (!dma_nents)
+ goto out_dmamap_err;
+
+ n = ib_map_mr_sg(mr, mw->mw_sg, mw->mw_nents, NULL, PAGE_SIZE);
+ if (unlikely(n != mw->mw_nents))
+ goto out_mapmr_err;
dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n",
- __func__, mw, frmr->fr_nents, mr->length);
+ __func__, mw, mw->mw_nents, mr->length);
key = (u8)(mr->rkey & 0x000000FF);
ib_update_fast_reg_key(mr, ++key);
@@ -480,13 +467,25 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
seg1->rl_mw = mw;
seg1->mr_rkey = mr->rkey;
seg1->mr_base = mr->iova;
- seg1->mr_nsegs = frmr->fr_nents;
+ seg1->mr_nsegs = mw->mw_nents;
seg1->mr_len = mr->length;
- return frmr->fr_nents;
+ return mw->mw_nents;
+
+out_dmamap_err:
+ pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
+ mw->mw_sg, mw->mw_nents);
+ return -ENOMEM;
+
+out_mapmr_err:
+ pr_err("rpcrdma: failed to register mr %p (%u/%u)\n",
+ frmr->fr_mr, n, mw->mw_nents);
+ rc = n < 0 ? n : -EIO;
+ __frwr_queue_recovery(mw);
+ return rc;
out_senderr:
- dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
+ pr_err("rpcrdma: ib_post_send status %i\n", rc);
__frwr_queue_recovery(mw);
return rc;
}
@@ -578,8 +577,8 @@ unmap:
mw = seg->rl_mw;
seg->rl_mw = NULL;
- ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents,
- f->fr_dir);
+ ib_dma_unmap_sg(ia->ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
rpcrdma_put_mw(r_xprt, mw);
i += seg->mr_nsegs;
@@ -626,7 +625,7 @@ frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
mw = seg->rl_mw;
if (sync)
- __frwr_reset_and_unmap(r_xprt, mw);
+ __frwr_reset_and_unmap(mw);
else
__frwr_queue_recovery(mw);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2421467..8c979a4 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -220,9 +220,6 @@ enum rpcrdma_frmr_state {
};
struct rpcrdma_frmr {
- struct scatterlist *fr_sg;
- int fr_nents;
- enum dma_data_direction fr_dir;
struct ib_mr *fr_mr;
struct ib_cqe fr_cqe;
enum rpcrdma_frmr_state fr_state;
@@ -239,13 +236,16 @@ struct rpcrdma_fmr {
};
struct rpcrdma_mw {
+ struct list_head mw_list;
+ struct scatterlist *mw_sg;
+ int mw_nents;
+ enum dma_data_direction mw_dir;
union {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
};
struct work_struct mw_work;
struct rpcrdma_xprt *mw_xprt;
- struct list_head mw_list;
struct list_head mw_all;
};
The use of a scatterlist for handling DMA mapping and unmapping
was recently introduced in frwr_ops.c in commit 4143f34e01e9
("xprtrdma: Port to new memory registration API"). That commit did
not make a similar update to xprtrdma's FMR support because the
core ib_map_phys_fmr() and ib_unmap_fmr() APIs have not been changed
to take a scatterlist argument.
However, FMR still needs to do DMA mapping and unmapping. It appears
that RDS, for example, uses a scatterlist for this, then builds the
DMA addr array for the ib_map_phys_fmr call separately. We can do
something similar.
This modernization can be used immediately to properly defer DMA
unmapping during fmr_unmap_safe (a FIXME). It separates the DMA
unmapping coordinates from the rl_segments array. This array, being
part of an rpcrdma_req, is always re-used immediately when an RPC
exits. A scatterlist is allocated in memory independent of the
rl_segments array, so it can be preserved indefinitely (ie, until
the MR invalidation and DMA unmapping can actually be done by a
worker thread).
The FRWR and FMR DMA mapping code are slightly different from each
other now, and will diverge further when the "Check for holes" logic
can be removed from FRWR. So I chose not to create helpers for the
common-looking code.
Fixes: ead3f26e359e ("xprtrdma: Add ro_unmap_safe memreg method")
Suggested-by: Sagi Grimberg <[email protected]>
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 160 ++++++++++++++++++++++++---------------
net/sunrpc/xprtrdma/xprt_rdma.h | 4 -
2 files changed, 100 insertions(+), 64 deletions(-)
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index b8952c7..b581a28 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -64,10 +64,35 @@ __fmr_unmap(struct rpcrdma_mw *mw)
{
LIST_HEAD(l);
- list_add(&mw->fmr.fmr->list, &l);
+ list_add(&mw->fmr.fm_mr->list, &l);
return ib_unmap_fmr(&l);
}
+static void
+__fmr_dma_unmap(struct rpcrdma_mw *mw)
+{
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
+ rpcrdma_put_mw(r_xprt, mw);
+}
+
+static void
+__fmr_reset_and_unmap(struct rpcrdma_mw *mw)
+{
+ int rc;
+
+ /* ORDER */
+ rc = __fmr_unmap(mw);
+ if (rc) {
+ pr_warn("rpcrdma: ib_unmap_fmr status %d, fmr %p orphaned\n",
+ rc, mw);
+ return;
+ }
+ __fmr_dma_unmap(mw);
+ }
+
/* Deferred reset of a single FMR. Generate a fresh rkey by
* replacing the MR. There's no recovery if this fails.
*/
@@ -75,11 +100,9 @@ static void
__fmr_recovery_worker(struct work_struct *work)
{
struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
- mw_work);
- struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+ mw_work);
- __fmr_unmap(mw);
- rpcrdma_put_mw(r_xprt, mw);
+ __fmr_reset_and_unmap(mw);
return;
}
@@ -93,6 +116,22 @@ __fmr_queue_recovery(struct rpcrdma_mw *mw)
queue_work(fmr_recovery_wq, &mw->mw_work);
}
+static void
+__fmr_release(struct rpcrdma_mw *r)
+{
+ int rc;
+
+ kfree(r->fmr.fm_physaddrs);
+ kfree(r->mw_sg);
+
+ rc = ib_dealloc_fmr(r->fmr.fm_mr);
+ if (rc)
+ dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
+ __func__, rc);
+
+ kfree(r);
+}
+
static int
fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct rpcrdma_create_data_internal *cdata)
@@ -137,13 +176,20 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
if (!r)
goto out;
- r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
- sizeof(u64), GFP_KERNEL);
- if (!r->fmr.physaddrs)
+ r->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES,
+ sizeof(u64), GFP_KERNEL);
+ if (!r->fmr.fm_physaddrs)
goto out_free;
- r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
- if (IS_ERR(r->fmr.fmr))
+ r->mw_sg = kcalloc(RPCRDMA_MAX_FMR_SGES,
+ sizeof(*r->mw_sg), GFP_KERNEL);
+ if (!r->mw_sg)
+ goto out_free;
+
+ sg_init_table(r->mw_sg, RPCRDMA_MAX_FMR_SGES);
+
+ r->fmr.fm_mr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+ if (IS_ERR(r->fmr.fm_mr))
goto out_fmr_err;
r->mw_xprt = r_xprt;
@@ -153,10 +199,11 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
return 0;
out_fmr_err:
- rc = PTR_ERR(r->fmr.fmr);
+ rc = PTR_ERR(r->fmr.fm_mr);
dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc);
- kfree(r->fmr.physaddrs);
out_free:
+ kfree(r->mw_sg);
+ kfree(r->fmr.fm_physaddrs);
kfree(r);
out:
return rc;
@@ -169,12 +216,10 @@ static int
fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
int nsegs, bool writing)
{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct ib_device *device = ia->ri_device;
- enum dma_data_direction direction = rpcrdma_data_dir(writing);
struct rpcrdma_mr_seg *seg1 = seg;
int len, pageoff, i, rc;
struct rpcrdma_mw *mw;
+ u64 *dma_pages;
mw = seg1->rl_mw;
seg1->rl_mw = NULL;
@@ -196,8 +241,14 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
if (nsegs > RPCRDMA_MAX_FMR_SGES)
nsegs = RPCRDMA_MAX_FMR_SGES;
for (i = 0; i < nsegs;) {
- rpcrdma_map_one(device, seg, direction);
- mw->fmr.physaddrs[i] = seg->mr_dma;
+ if (seg->mr_page)
+ sg_set_page(&mw->mw_sg[i],
+ seg->mr_page,
+ seg->mr_len,
+ offset_in_page(seg->mr_offset));
+ else
+ sg_set_buf(&mw->mw_sg[i], seg->mr_offset,
+ seg->mr_len);
len += seg->mr_len;
++seg;
++i;
@@ -206,38 +257,40 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
break;
}
+ mw->mw_nents = i;
+ mw->mw_dir = rpcrdma_data_dir(writing);
- rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs,
- i, seg1->mr_dma);
+ if (!ib_dma_map_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir))
+ goto out_dmamap_err;
+
+ for (i = 0, dma_pages = mw->fmr.fm_physaddrs; i < mw->mw_nents; i++)
+ dma_pages[i] = sg_dma_address(&mw->mw_sg[i]);
+ rc = ib_map_phys_fmr(mw->fmr.fm_mr, dma_pages, mw->mw_nents,
+ dma_pages[0]);
if (rc)
goto out_maperr;
seg1->rl_mw = mw;
- seg1->mr_rkey = mw->fmr.fmr->rkey;
- seg1->mr_base = seg1->mr_dma + pageoff;
- seg1->mr_nsegs = i;
+ seg1->mr_rkey = mw->fmr.fm_mr->rkey;
+ seg1->mr_base = dma_pages[0] + pageoff;
+ seg1->mr_nsegs = mw->mw_nents;
seg1->mr_len = len;
- return i;
+ return mw->mw_nents;
+
+out_dmamap_err:
+ pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
+ mw->mw_sg, mw->mw_nents);
+ return -ENOMEM;
out_maperr:
- dprintk("RPC: %s: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
- __func__, len, (unsigned long long)seg1->mr_dma,
- pageoff, i, rc);
- while (i--)
- rpcrdma_unmap_one(device, --seg);
+ pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
+ len, (unsigned long long)dma_pages[0],
+ pageoff, mw->mw_nents, rc);
+ __fmr_dma_unmap(mw);
return rc;
}
-static void
-__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
-{
- struct ib_device *device = r_xprt->rx_ia.ri_device;
- int nsegs = seg->mr_nsegs;
-
- while (nsegs--)
- rpcrdma_unmap_one(device, seg++);
-}
-
/* Invalidate all memory regions that were registered for "req".
*
* Sleeps until it is safe for the host CPU to access the
@@ -263,7 +316,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
seg = &req->rl_segments[i];
mw = seg->rl_mw;
- list_add(&mw->fmr.fmr->list, &unmap_list);
+ list_add(&mw->fmr.fm_mr->list, &unmap_list);
i += seg->mr_nsegs;
}
@@ -276,9 +329,9 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
seg = &req->rl_segments[i];
+ mw = seg->rl_mw;
- __fmr_dma_unmap(r_xprt, seg);
- rpcrdma_put_mw(r_xprt, seg->rl_mw);
+ __fmr_dma_unmap(mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
@@ -290,11 +343,6 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* Use a slow, safe mechanism to invalidate all memory regions
* that were registered for "req".
- *
- * In the asynchronous case, DMA unmapping occurs first here
- * because the rpcrdma_mr_seg is released immediately after this
- * call. It's contents won't be available in __fmr_dma_unmap later.
- * FIXME.
*/
static void
fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
@@ -308,15 +356,10 @@ fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
seg = &req->rl_segments[i];
mw = seg->rl_mw;
- if (sync) {
- /* ORDER */
- __fmr_unmap(mw);
- __fmr_dma_unmap(r_xprt, seg);
- rpcrdma_put_mw(r_xprt, mw);
- } else {
- __fmr_dma_unmap(r_xprt, seg);
+ if (sync)
+ __fmr_reset_and_unmap(mw);
+ else
__fmr_queue_recovery(mw);
- }
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
@@ -328,19 +371,12 @@ static void
fmr_op_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_mw *r;
- int rc;
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
- kfree(r->fmr.physaddrs);
-
- rc = ib_dealloc_fmr(r->fmr.fmr);
- if (rc)
- dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
- __func__, rc);
- kfree(r);
+ __fmr_release(r);
}
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 8c979a4..6378469 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -231,8 +231,8 @@ struct rpcrdma_frmr {
};
struct rpcrdma_fmr {
- struct ib_fmr *fmr;
- u64 *physaddrs;
+ struct ib_fmr *fm_mr;
+ u64 *fm_physaddrs;
};
struct rpcrdma_mw {
Clean up: ALLPHYSICAL is gone and FMR has been converted to use
scatterlists. There are no more users of these functions.
This patch shrinks the size of struct rpcrdma_req by about 3500
bytes on x86_64. There is one of these structs for each RPC credit
(128 credits per transport connection).
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 8 --------
net/sunrpc/xprtrdma/xprt_rdma.h | 36 ------------------------------------
2 files changed, 44 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ecd494a..4f7e1ba 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1048,14 +1048,6 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
* Wrappers for internal-use kmalloc memory registration, used by buffer code.
*/
-void
-rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
-{
- dprintk("RPC: map_one: offset %p iova %llx len %zu\n",
- seg->mr_offset,
- (unsigned long long)seg->mr_dma, seg->mr_dmalen);
-}
-
/**
* rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
* @ia: controlling rpcrdma_ia
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 6378469..3472844 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -278,9 +278,6 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
u32 mr_rkey; /* registration result */
u32 mr_len; /* length of chunk or segment */
int mr_nsegs; /* number of segments in chunk or 0 */
- enum dma_data_direction mr_dir; /* segment mapping direction */
- dma_addr_t mr_dma; /* segment mapping address */
- size_t mr_dmalen; /* segment mapping length */
struct page *mr_page; /* owning page, if any */
char *mr_offset; /* kva if no page, else offset */
};
@@ -491,45 +488,12 @@ void rpcrdma_destroy_wq(void);
* Wrappers for chunk registration, shared by read/write chunk code.
*/
-void rpcrdma_mapping_error(struct rpcrdma_mr_seg *);
-
static inline enum dma_data_direction
rpcrdma_data_dir(bool writing)
{
return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
}
-static inline void
-rpcrdma_map_one(struct ib_device *device, struct rpcrdma_mr_seg *seg,
- enum dma_data_direction direction)
-{
- seg->mr_dir = direction;
- seg->mr_dmalen = seg->mr_len;
-
- if (seg->mr_page)
- seg->mr_dma = ib_dma_map_page(device,
- seg->mr_page, offset_in_page(seg->mr_offset),
- seg->mr_dmalen, seg->mr_dir);
- else
- seg->mr_dma = ib_dma_map_single(device,
- seg->mr_offset,
- seg->mr_dmalen, seg->mr_dir);
-
- if (ib_dma_mapping_error(device, seg->mr_dma))
- rpcrdma_mapping_error(seg);
-}
-
-static inline void
-rpcrdma_unmap_one(struct ib_device *device, struct rpcrdma_mr_seg *seg)
-{
- if (seg->mr_page)
- ib_dma_unmap_page(device,
- seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
- else
- ib_dma_unmap_single(device,
- seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
-}
-
/*
* RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
*/
I found that commit ead3f26e359e ("xprtrdma: Add ro_unmap_safe
memreg method"), which introduces ro_unmap_safe, never wired up the
FMR recovery worker.
The FMR and FRWR recovery work queues both do the same thing.
Instead of setting up separate individual work queues for this,
schedule a delayed worker to deal with them, since recovering MRs is
not performance-critical.
Fixes: ead3f26e359e ("xprtrdma: Add ro_unmap_safe memreg method")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 85 +++++++++++----------------------------
net/sunrpc/xprtrdma/frwr_ops.c | 68 ++++++-------------------------
net/sunrpc/xprtrdma/transport.c | 11 -----
net/sunrpc/xprtrdma/verbs.c | 42 +++++++++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 11 +++--
5 files changed, 87 insertions(+), 130 deletions(-)
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index b581a28..45800c9 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -35,30 +35,6 @@
/* Maximum scatter/gather per FMR */
#define RPCRDMA_MAX_FMR_SGES (64)
-static struct workqueue_struct *fmr_recovery_wq;
-
-#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND)
-
-int
-fmr_alloc_recovery_wq(void)
-{
- fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
- return !fmr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-fmr_destroy_recovery_wq(void)
-{
- struct workqueue_struct *wq;
-
- if (!fmr_recovery_wq)
- return;
-
- wq = fmr_recovery_wq;
- fmr_recovery_wq = NULL;
- destroy_workqueue(wq);
-}
-
static int
__fmr_unmap(struct rpcrdma_mw *mw)
{
@@ -68,54 +44,30 @@ __fmr_unmap(struct rpcrdma_mw *mw)
return ib_unmap_fmr(&l);
}
-static void
-__fmr_dma_unmap(struct rpcrdma_mw *mw)
-{
- struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
-
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
- mw->mw_sg, mw->mw_nents, mw->mw_dir);
- rpcrdma_put_mw(r_xprt, mw);
-}
-
+/* Reset of a single FMR.
+ *
+ * There's no recovery if this fails. The FMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
static void
__fmr_reset_and_unmap(struct rpcrdma_mw *mw)
{
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
int rc;
/* ORDER */
rc = __fmr_unmap(mw);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
if (rc) {
pr_warn("rpcrdma: ib_unmap_fmr status %d, fmr %p orphaned\n",
rc, mw);
return;
}
- __fmr_dma_unmap(mw);
+ rpcrdma_put_mw(r_xprt, mw);
}
-/* Deferred reset of a single FMR. Generate a fresh rkey by
- * replacing the MR. There's no recovery if this fails.
- */
-static void
-__fmr_recovery_worker(struct work_struct *work)
-{
- struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
- mw_work);
-
- __fmr_reset_and_unmap(mw);
- return;
-}
-
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
-static void
-__fmr_queue_recovery(struct rpcrdma_mw *mw)
-{
- INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
- queue_work(fmr_recovery_wq, &mw->mw_work);
-}
-
static void
__fmr_release(struct rpcrdma_mw *r)
{
@@ -142,6 +94,12 @@ fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
return 0;
}
+static void
+fmr_op_recover_mr(struct rpcrdma_mw *mw)
+{
+ __fmr_reset_and_unmap(mw);
+}
+
/* FMR mode conveys up to 64 pages of payload per chunk segment.
*/
static size_t
@@ -287,7 +245,9 @@ out_maperr:
pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
len, (unsigned long long)dma_pages[0],
pageoff, mw->mw_nents, rc);
- __fmr_dma_unmap(mw);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
+ rpcrdma_put_mw(r_xprt, mw);
return rc;
}
@@ -331,7 +291,9 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
seg = &req->rl_segments[i];
mw = seg->rl_mw;
- __fmr_dma_unmap(mw);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
+ rpcrdma_put_mw(r_xprt, mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
@@ -359,7 +321,7 @@ fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (sync)
__fmr_reset_and_unmap(mw);
else
- __fmr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
@@ -384,6 +346,7 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_map = fmr_op_map,
.ro_unmap_sync = fmr_op_unmap_sync,
.ro_unmap_safe = fmr_op_unmap_safe,
+ .ro_recover_mr = fmr_op_recover_mr,
.ro_open = fmr_op_open,
.ro_maxpages = fmr_op_maxpages,
.ro_init = fmr_op_init,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index a476fa6..19bddfe 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -73,31 +73,6 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
-static struct workqueue_struct *frwr_recovery_wq;
-
-#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM)
-
-int
-frwr_alloc_recovery_wq(void)
-{
- frwr_recovery_wq = alloc_workqueue("frwr_recovery",
- FRWR_RECOVERY_WQ_FLAGS, 0);
- return !frwr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-frwr_destroy_recovery_wq(void)
-{
- struct workqueue_struct *wq;
-
- if (!frwr_recovery_wq)
- return;
-
- wq = frwr_recovery_wq;
- frwr_recovery_wq = NULL;
- destroy_workqueue(wq);
-}
-
static int
__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
{
@@ -124,6 +99,12 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
return 0;
}
+/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
+ *
+ * There's no recovery if this fails. The FRMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
static void
__frwr_reset_and_unmap(struct rpcrdma_mw *mw)
{
@@ -138,30 +119,10 @@ __frwr_reset_and_unmap(struct rpcrdma_mw *mw)
rpcrdma_put_mw(r_xprt, mw);
}
-/* Deferred reset of a single FRMR. Generate a fresh rkey by
- * replacing the MR.
- *
- * There's no recovery if this fails. The FRMR is abandoned, but
- * remains in rb_all. It will be cleaned up when the transport is
- * destroyed.
- */
-static void
-__frwr_recovery_worker(struct work_struct *work)
-{
- struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
- mw_work);
-
- __frwr_reset_and_unmap(r);
-}
-
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
static void
-__frwr_queue_recovery(struct rpcrdma_mw *r)
+frwr_op_recover_mr(struct rpcrdma_mw *mw)
{
- INIT_WORK(&r->mw_work, __frwr_recovery_worker);
- queue_work(frwr_recovery_wq, &r->mw_work);
+ __frwr_reset_and_unmap(mw);
}
static int
@@ -399,7 +360,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
seg1->rl_mw = NULL;
do {
if (mw)
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
mw = rpcrdma_get_mw(r_xprt);
if (!mw)
return -ENOMEM;
@@ -481,12 +442,11 @@ out_mapmr_err:
pr_err("rpcrdma: failed to register mr %p (%u/%u)\n",
frmr->fr_mr, n, mw->mw_nents);
rc = n < 0 ? n : -EIO;
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
return rc;
out_senderr:
- pr_err("rpcrdma: ib_post_send status %i\n", rc);
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
return rc;
}
@@ -627,7 +587,7 @@ frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (sync)
__frwr_reset_and_unmap(mw);
else
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
@@ -640,9 +600,6 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_mw *r;
- /* Ensure stale MWs for "buf" are no longer in flight */
- flush_workqueue(frwr_recovery_wq);
-
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
@@ -655,6 +612,7 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_map = frwr_op_map,
.ro_unmap_sync = frwr_op_unmap_sync,
.ro_unmap_safe = frwr_op_unmap_safe,
+ .ro_recover_mr = frwr_op_recover_mr,
.ro_open = frwr_op_open,
.ro_maxpages = frwr_op_maxpages,
.ro_init = frwr_op_init,
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 99d2e5b..29c91bf 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -741,7 +741,6 @@ void xprt_rdma_cleanup(void)
__func__, rc);
rpcrdma_destroy_wq();
- frwr_destroy_recovery_wq();
rc = xprt_unregister_transport(&xprt_rdma_bc);
if (rc)
@@ -753,20 +752,13 @@ int xprt_rdma_init(void)
{
int rc;
- rc = frwr_alloc_recovery_wq();
- if (rc)
- return rc;
-
rc = rpcrdma_alloc_wq();
- if (rc) {
- frwr_destroy_recovery_wq();
+ if (rc)
return rc;
- }
rc = xprt_register_transport(&xprt_rdma);
if (rc) {
rpcrdma_destroy_wq();
- frwr_destroy_recovery_wq();
return rc;
}
@@ -774,7 +766,6 @@ int xprt_rdma_init(void)
if (rc) {
xprt_unregister_transport(&xprt_rdma);
rpcrdma_destroy_wq();
- frwr_destroy_recovery_wq();
return rc;
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 4f7e1ba..e9f7f727 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -762,6 +762,41 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ib_drain_qp(ia->ri_id->qp);
}
+static void
+rpcrdma_mr_recovery_worker(struct work_struct *work)
+{
+ struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+ rb_recovery_worker.work);
+ struct rpcrdma_mw *mw;
+
+ spin_lock(&buf->rb_recovery_lock);
+ while (!list_empty(&buf->rb_stale_mrs)) {
+ mw = list_first_entry(&buf->rb_stale_mrs,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
+ spin_unlock(&buf->rb_recovery_lock);
+
+ dprintk("RPC: %s: recovering MR %p\n", __func__, mw);
+ mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
+
+ spin_lock(&buf->rb_recovery_lock);
+ };
+ spin_unlock(&buf->rb_recovery_lock);
+}
+
+void
+rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
+{
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+ spin_lock(&buf->rb_recovery_lock);
+ list_add(&mw->mw_list, &buf->rb_stale_mrs);
+ spin_unlock(&buf->rb_recovery_lock);
+
+ schedule_delayed_work(&buf->rb_recovery_worker, 0);
+}
+
struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
@@ -863,6 +898,11 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
list_add(&rep->rr_list, &buf->rb_recv_bufs);
}
+ INIT_DELAYED_WORK(&buf->rb_recovery_worker,
+ rpcrdma_mr_recovery_worker);
+ spin_lock_init(&buf->rb_recovery_lock);
+ INIT_LIST_HEAD(&buf->rb_stale_mrs);
+
return 0;
out:
rpcrdma_buffer_destroy(buf);
@@ -911,6 +951,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+ cancel_delayed_work_sync(&buf->rb_recovery_worker);
+
while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3472844..8eb02ab 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -244,7 +244,6 @@ struct rpcrdma_mw {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
};
- struct work_struct mw_work;
struct rpcrdma_xprt *mw_xprt;
struct list_head mw_all;
};
@@ -336,6 +335,10 @@ struct rpcrdma_buffer {
struct list_head rb_allreqs;
u32 rb_bc_max_requests;
+
+ spinlock_t rb_recovery_lock;
+ struct list_head rb_stale_mrs;
+ struct delayed_work rb_recovery_worker;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -395,6 +398,7 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_req *);
void (*ro_unmap_safe)(struct rpcrdma_xprt *,
struct rpcrdma_req *, bool);
+ void (*ro_recover_mr)(struct rpcrdma_mw *);
int (*ro_open)(struct rpcrdma_ia *,
struct rpcrdma_ep *,
struct rpcrdma_create_data_internal *);
@@ -471,6 +475,8 @@ void rpcrdma_buffer_put(struct rpcrdma_req *);
void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);
+
struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
size_t, gfp_t);
void rpcrdma_free_regbuf(struct rpcrdma_ia *,
@@ -478,9 +484,6 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
-int frwr_alloc_recovery_wq(void);
-void frwr_destroy_recovery_wq(void);
-
int rpcrdma_alloc_wq(void);
void rpcrdma_destroy_wq(void);
Instead of placing registered MWs sparsely into the rl_segments,
array, place these MWs on a per-req list.
ro_unmap_{sync,safe} can then simply pull those MWs off the list
instead of walking through the array.
This change significantly reduces the size of struct rpcrdma_req
by removing nsegs and rl_mw from every array element.
As an additional clean-up, chunk co-ordinates are returned in the
"*mw" output argument so they are no longer needed in every
array element.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 69 ++++++++++++-----------------------
net/sunrpc/xprtrdma/frwr_ops.c | 76 +++++++++++++++------------------------
net/sunrpc/xprtrdma/rpc_rdma.c | 76 +++++++++++++++++++--------------------
net/sunrpc/xprtrdma/transport.c | 3 ++
net/sunrpc/xprtrdma/verbs.c | 1 +
net/sunrpc/xprtrdma/xprt_rdma.h | 11 +++---
6 files changed, 100 insertions(+), 136 deletions(-)
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 45800c9..4632d25 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -73,6 +73,10 @@ __fmr_release(struct rpcrdma_mw *r)
{
int rc;
+ /* Ensure MW is not on any rl_registered list */
+ if (!list_empty(&r->mw_list))
+ list_del(&r->mw_list);
+
kfree(r->fmr.fm_physaddrs);
kfree(r->mw_sg);
@@ -172,25 +176,16 @@ out:
*/
static int
fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing)
+ int nsegs, bool writing, struct rpcrdma_mw **out)
{
struct rpcrdma_mr_seg *seg1 = seg;
int len, pageoff, i, rc;
struct rpcrdma_mw *mw;
u64 *dma_pages;
- mw = seg1->rl_mw;
- seg1->rl_mw = NULL;
- if (!mw) {
- mw = rpcrdma_get_mw(r_xprt);
- if (!mw)
- return -ENOMEM;
- } else {
- /* this is a retransmit; generate a fresh rkey */
- rc = __fmr_unmap(mw);
- if (rc)
- return rc;
- }
+ mw = rpcrdma_get_mw(r_xprt);
+ if (!mw)
+ return -ENOMEM;
pageoff = offset_in_page(seg1->mr_offset);
seg1->mr_offset -= pageoff; /* start of page */
@@ -229,11 +224,11 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
if (rc)
goto out_maperr;
- seg1->rl_mw = mw;
- seg1->mr_rkey = mw->fmr.fm_mr->rkey;
- seg1->mr_base = dma_pages[0] + pageoff;
- seg1->mr_nsegs = mw->mw_nents;
- seg1->mr_len = len;
+ mw->mw_handle = mw->fmr.fm_mr->rkey;
+ mw->mw_length = len;
+ mw->mw_offset = dma_pages[0] + pageoff;
+
+ *out = mw;
return mw->mw_nents;
out_dmamap_err:
@@ -255,12 +250,12 @@ out_maperr:
*
* Sleeps until it is safe for the host CPU to access the
* previously mapped memory regions.
+ *
+ * Caller ensures that req->rl_registered is not empty.
*/
static void
fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
- struct rpcrdma_mr_seg *seg;
- unsigned int i, nchunks;
struct rpcrdma_mw *mw;
LIST_HEAD(unmap_list);
int rc;
@@ -272,14 +267,8 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* ib_unmap_fmr() is slow, so use a single call instead
* of one call per mapped MR.
*/
- for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
- seg = &req->rl_segments[i];
- mw = seg->rl_mw;
-
+ list_for_each_entry(mw, &req->rl_registered, mw_list)
list_add(&mw->fmr.fm_mr->list, &unmap_list);
-
- i += seg->mr_nsegs;
- }
rc = ib_unmap_fmr(&unmap_list);
if (rc)
pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc);
@@ -287,20 +276,15 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* ORDER: Now DMA unmap all of the req's MRs, and return
* them to the free MW list.
*/
- for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
- seg = &req->rl_segments[i];
- mw = seg->rl_mw;
+ while (!list_empty(&req->rl_registered)) {
+ mw = list_first_entry(&req->rl_registered,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
mw->mw_sg, mw->mw_nents, mw->mw_dir);
rpcrdma_put_mw(r_xprt, mw);
-
- i += seg->mr_nsegs;
- seg->mr_nsegs = 0;
- seg->rl_mw = NULL;
}
-
- req->rl_nchunks = 0;
}
/* Use a slow, safe mechanism to invalidate all memory regions
@@ -310,22 +294,17 @@ static void
fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
bool sync)
{
- struct rpcrdma_mr_seg *seg;
struct rpcrdma_mw *mw;
- unsigned int i;
- for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
- seg = &req->rl_segments[i];
- mw = seg->rl_mw;
+ while (!list_empty(&req->rl_registered)) {
+ mw = list_first_entry(&req->rl_registered,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
if (sync)
__fmr_reset_and_unmap(mw);
else
rpcrdma_defer_mr_recovery(mw);
-
- i += seg->mr_nsegs;
- seg->mr_nsegs = 0;
- seg->rl_mw = NULL;
}
}
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 19bddfe..e0f610c 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -140,7 +140,6 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, unsigned int depth)
goto out_list_err;
sg_init_table(r->mw_sg, depth);
-
init_completion(&f->fr_linv_done);
return 0;
@@ -164,11 +163,17 @@ __frwr_release(struct rpcrdma_mw *r)
{
int rc;
+ /* Ensure MW is not on any rl_registered list */
+ if (!list_empty(&r->mw_list))
+ list_del(&r->mw_list);
+
rc = ib_dereg_mr(r->frmr.fr_mr);
if (rc)
dprintk("RPC: %s: ib_dereg_mr status %i\n",
__func__, rc);
+
kfree(r->mw_sg);
+ kfree(r);
}
static int
@@ -344,10 +349,9 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
*/
static int
frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
- int nsegs, bool writing)
+ int nsegs, bool writing, struct rpcrdma_mw **out)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_mw *mw;
struct rpcrdma_frmr *frmr;
struct ib_mr *mr;
@@ -356,8 +360,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
int rc, i, n, dma_nents;
u8 key;
- mw = seg1->rl_mw;
- seg1->rl_mw = NULL;
+ mw = NULL;
do {
if (mw)
rpcrdma_defer_mr_recovery(mw);
@@ -425,12 +428,11 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
if (rc)
goto out_senderr;
- seg1->rl_mw = mw;
- seg1->mr_rkey = mr->rkey;
- seg1->mr_base = mr->iova;
- seg1->mr_nsegs = mw->mw_nents;
- seg1->mr_len = mr->length;
+ mw->mw_handle = mr->rkey;
+ mw->mw_length = mr->length;
+ mw->mw_offset = mr->iova;
+ *out = mw;
return mw->mw_nents;
out_dmamap_err:
@@ -451,9 +453,8 @@ out_senderr:
}
static struct ib_send_wr *
-__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
+__frwr_prepare_linv_wr(struct rpcrdma_mw *mw)
{
- struct rpcrdma_mw *mw = seg->rl_mw;
struct rpcrdma_frmr *f = &mw->frmr;
struct ib_send_wr *invalidate_wr;
@@ -473,14 +474,14 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
*
* Sleeps until it is safe for the host CPU to access the
* previously mapped memory regions.
+ *
+ * Caller ensures that req->rl_registered is not empty.
*/
static void
frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_mr_seg *seg;
- unsigned int i, nchunks;
struct rpcrdma_frmr *f;
struct rpcrdma_mw *mw;
int rc;
@@ -492,22 +493,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* Chain the LOCAL_INV Work Requests and post them with
* a single ib_post_send() call.
*/
+ f = NULL;
invalidate_wrs = pos = prev = NULL;
- seg = NULL;
- for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
- seg = &req->rl_segments[i];
-
- pos = __frwr_prepare_linv_wr(seg);
+ list_for_each_entry(mw, &req->rl_registered, mw_list) {
+ pos = __frwr_prepare_linv_wr(mw);
if (!invalidate_wrs)
invalidate_wrs = pos;
else
prev->next = pos;
prev = pos;
-
- i += seg->mr_nsegs;
+ f = &mw->frmr;
}
- f = &seg->rl_mw->frmr;
/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
@@ -532,20 +529,15 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* them to the free MW list.
*/
unmap:
- for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
- seg = &req->rl_segments[i];
- mw = seg->rl_mw;
- seg->rl_mw = NULL;
+ while (!list_empty(&req->rl_registered)) {
+ mw = list_first_entry(&req->rl_registered,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
ib_dma_unmap_sg(ia->ri_device,
mw->mw_sg, mw->mw_nents, mw->mw_dir);
rpcrdma_put_mw(r_xprt, mw);
-
- i += seg->mr_nsegs;
- seg->mr_nsegs = 0;
}
-
- req->rl_nchunks = 0;
return;
reset_mrs:
@@ -554,17 +546,12 @@ reset_mrs:
/* Find and reset the MRs in the LOCAL_INV WRs that did not
* get posted. This is synchronous, and slow.
*/
- for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
- seg = &req->rl_segments[i];
- mw = seg->rl_mw;
+ list_for_each_entry(mw, &req->rl_registered, mw_list) {
f = &mw->frmr;
-
if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
__frwr_reset_mr(ia, mw);
bad_wr = bad_wr->next;
}
-
- i += seg->mr_nsegs;
}
goto unmap;
}
@@ -576,22 +563,17 @@ static void
frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
bool sync)
{
- struct rpcrdma_mr_seg *seg;
struct rpcrdma_mw *mw;
- unsigned int i;
- for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
- seg = &req->rl_segments[i];
- mw = seg->rl_mw;
+ while (!list_empty(&req->rl_registered)) {
+ mw = list_first_entry(&req->rl_registered,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
if (sync)
__frwr_reset_and_unmap(mw);
else
rpcrdma_defer_mr_recovery(mw);
-
- i += seg->mr_nsegs;
- seg->mr_nsegs = 0;
- seg->rl_mw = NULL;
}
}
@@ -603,8 +585,8 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
+
__frwr_release(r);
- kfree(r);
}
}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 35a8109..e7e86d7 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -286,11 +286,11 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
}
static inline __be32 *
-xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
+xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
{
- *iptr++ = cpu_to_be32(seg->mr_rkey);
- *iptr++ = cpu_to_be32(seg->mr_len);
- return xdr_encode_hyper(iptr, seg->mr_base);
+ *iptr++ = cpu_to_be32(mw->mw_handle);
+ *iptr++ = cpu_to_be32(mw->mw_length);
+ return xdr_encode_hyper(iptr, mw->mw_offset);
}
/* XDR-encode the Read list. Supports encoding a list of read
@@ -311,6 +311,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
__be32 *iptr, enum rpcrdma_chunktype rtype)
{
struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ struct rpcrdma_mw *mw;
unsigned int pos;
int n, nsegs;
@@ -328,9 +329,11 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
return ERR_PTR(nsegs);
do {
- n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
+ n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+ false, &mw);
if (n <= 0)
return ERR_PTR(n);
+ list_add(&mw->mw_list, &req->rl_registered);
*iptr++ = xdr_one; /* item present */
@@ -338,13 +341,12 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
* have the same "position".
*/
*iptr++ = cpu_to_be32(pos);
- iptr = xdr_encode_rdma_segment(iptr, seg);
+ iptr = xdr_encode_rdma_segment(iptr, mw);
- dprintk("RPC: %5u %s: read segment pos %u "
- "%d@0x%016llx:0x%08x (%s)\n",
+ dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
rqst->rq_task->tk_pid, __func__, pos,
- seg->mr_len, (unsigned long long)seg->mr_base,
- seg->mr_rkey, n < nsegs ? "more" : "last");
+ mw->mw_length, (unsigned long long)mw->mw_offset,
+ mw->mw_handle, n < nsegs ? "more" : "last");
r_xprt->rx_stats.read_chunk_count++;
req->rl_nchunks++;
@@ -376,6 +378,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
enum rpcrdma_chunktype wtype)
{
struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ struct rpcrdma_mw *mw;
int n, nsegs, nchunks;
__be32 *segcount;
@@ -396,17 +399,18 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nchunks = 0;
do {
- n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
+ n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+ true, &mw);
if (n <= 0)
return ERR_PTR(n);
+ list_add(&mw->mw_list, &req->rl_registered);
- iptr = xdr_encode_rdma_segment(iptr, seg);
+ iptr = xdr_encode_rdma_segment(iptr, mw);
- dprintk("RPC: %5u %s: write segment "
- "%d@0x016%llx:0x%08x (%s)\n",
+ dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
rqst->rq_task->tk_pid, __func__,
- seg->mr_len, (unsigned long long)seg->mr_base,
- seg->mr_rkey, n < nsegs ? "more" : "last");
+ mw->mw_length, (unsigned long long)mw->mw_offset,
+ mw->mw_handle, n < nsegs ? "more" : "last");
r_xprt->rx_stats.write_chunk_count++;
r_xprt->rx_stats.total_rdma_request += seg->mr_len;
@@ -443,6 +447,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
__be32 *iptr, enum rpcrdma_chunktype wtype)
{
struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ struct rpcrdma_mw *mw;
int n, nsegs, nchunks;
__be32 *segcount;
@@ -461,17 +466,18 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
nchunks = 0;
do {
- n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
+ n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+ true, &mw);
if (n <= 0)
return ERR_PTR(n);
+ list_add(&mw->mw_list, &req->rl_registered);
- iptr = xdr_encode_rdma_segment(iptr, seg);
+ iptr = xdr_encode_rdma_segment(iptr, mw);
- dprintk("RPC: %5u %s: reply segment "
- "%d@0x%016llx:0x%08x (%s)\n",
+ dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
rqst->rq_task->tk_pid, __func__,
- seg->mr_len, (unsigned long long)seg->mr_base,
- seg->mr_rkey, n < nsegs ? "more" : "last");
+ mw->mw_length, (unsigned long long)mw->mw_offset,
+ mw->mw_handle, n < nsegs ? "more" : "last");
r_xprt->rx_stats.reply_chunk_count++;
r_xprt->rx_stats.total_rdma_request += seg->mr_len;
@@ -705,15 +711,13 @@ out_unmap:
* RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
*/
static int
-rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp)
+rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
{
unsigned int i, total_len;
struct rpcrdma_write_chunk *cur_wchunk;
char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
i = be32_to_cpu(**iptrp);
- if (i > max)
- return -1;
cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
total_len = 0;
while (i--) {
@@ -960,14 +964,13 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
(headerp->rm_body.rm_chunks[1] == xdr_zero &&
headerp->rm_body.rm_chunks[2] != xdr_zero) ||
(headerp->rm_body.rm_chunks[1] != xdr_zero &&
- req->rl_nchunks == 0))
+ list_empty(&req->rl_registered)))
goto badheader;
if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
/* count any expected write chunks in read reply */
/* start at write chunk array count */
iptr = &headerp->rm_body.rm_chunks[2];
- rdmalen = rpcrdma_count_chunks(rep,
- req->rl_nchunks, 1, &iptr);
+ rdmalen = rpcrdma_count_chunks(rep, 1, &iptr);
/* check for validity, and no reply chunk after */
if (rdmalen < 0 || *iptr++ != xdr_zero)
goto badheader;
@@ -997,11 +1000,11 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
headerp->rm_body.rm_chunks[1] != xdr_zero ||
headerp->rm_body.rm_chunks[2] != xdr_one ||
- req->rl_nchunks == 0)
+ list_empty(&req->rl_registered))
goto badheader;
iptr = (__be32 *)((unsigned char *)headerp +
RPCRDMA_HDRLEN_MIN);
- rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
+ rdmalen = rpcrdma_count_chunks(rep, 0, &iptr);
if (rdmalen < 0)
goto badheader;
r_xprt->rx_stats.total_rdma_reply += rdmalen;
@@ -1014,14 +1017,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
badheader:
default:
- dprintk("%s: invalid rpcrdma reply header (type %d):"
- " chunks[012] == %d %d %d"
- " expected chunks <= %d\n",
- __func__, be32_to_cpu(headerp->rm_type),
- headerp->rm_body.rm_chunks[0],
- headerp->rm_body.rm_chunks[1],
- headerp->rm_body.rm_chunks[2],
- req->rl_nchunks);
+ dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
+ rqst->rq_task->tk_pid, __func__,
+ be32_to_cpu(headerp->rm_type));
status = -EIO;
r_xprt->rx_stats.bad_reply_count++;
break;
@@ -1035,7 +1033,7 @@ out:
* control: waking the next RPC waits until this RPC has
* relinquished all its Send Queue entries.
*/
- if (req->rl_nchunks)
+ if (!list_empty(&req->rl_registered))
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
spin_lock_bh(&xprt->transport_lock);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 29c91bf..549486d 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -610,6 +610,9 @@ xprt_rdma_send_request(struct rpc_task *task)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
int rc = 0;
+ /* On retransmit, remove any previously registered chunks */
+ r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
+
rc = rpcrdma_marshal_req(rqst);
if (rc < 0)
goto failed_marshal;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e9f7f727..4867923 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -813,6 +813,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
spin_unlock(&buffer->rb_reqslock);
req->rl_cqe.done = rpcrdma_wc_send;
req->rl_buffer = &r_xprt->rx_buf;
+ INIT_LIST_HEAD(&req->rl_registered);
return req;
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 8eb02ab..ea8d4f8 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -245,6 +245,9 @@ struct rpcrdma_mw {
struct rpcrdma_frmr frmr;
};
struct rpcrdma_xprt *mw_xprt;
+ u32 mw_handle;
+ u32 mw_length;
+ u64 mw_offset;
struct list_head mw_all;
};
@@ -272,11 +275,7 @@ struct rpcrdma_mw {
*/
struct rpcrdma_mr_seg { /* chunk descriptors */
- struct rpcrdma_mw *rl_mw; /* registered MR */
- u64 mr_base; /* registration result */
- u32 mr_rkey; /* registration result */
u32 mr_len; /* length of chunk or segment */
- int mr_nsegs; /* number of segments in chunk or 0 */
struct page *mr_page; /* owning page, if any */
char *mr_offset; /* kva if no page, else offset */
};
@@ -294,6 +293,7 @@ struct rpcrdma_req {
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
+ struct list_head rl_registered; /* registered segments */
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
struct rpcrdma_mr_seg *rl_nextseg;
@@ -393,7 +393,8 @@ struct rpcrdma_stats {
struct rpcrdma_xprt;
struct rpcrdma_memreg_ops {
int (*ro_map)(struct rpcrdma_xprt *,
- struct rpcrdma_mr_seg *, int, bool);
+ struct rpcrdma_mr_seg *, int, bool,
+ struct rpcrdma_mw **);
void (*ro_unmap_sync)(struct rpcrdma_xprt *,
struct rpcrdma_req *);
void (*ro_unmap_safe)(struct rpcrdma_xprt *,
Not having an rpcrdma_rep at call_allocate time can be a problem.
It means that send_request can't post a receive buffer to catch
the RPC's reply. Possible consequences are deadlock or RPC
timeouts.
Instead of allowing an RPC to proceed if an rpcrdma_rep is
not available, return NULL to force call_allocate to wait and
try again.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/verbs.c | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 4867923..e016b9c 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1009,8 +1009,6 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
/*
* Get a set of request/reply buffers.
- *
- * Reply buffer (if available) is attached to send buffer upon return.
*/
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
@@ -1032,10 +1030,10 @@ out_reqbuf:
pr_warn("RPC: %s: out of request buffers\n", __func__);
return NULL;
out_repbuf:
+ list_add(&req->rl_free, &buffers->rb_send_bufs);
spin_unlock(&buffers->rb_lock);
pr_warn("RPC: %s: out of reply buffers\n", __func__);
- req->rl_reply = NULL;
- return req;
+ return NULL;
}
/*
The fixed part of an R_key is 24 bits, but the variant part is only
8 bits, allowing 256 unique R_keys concurrenly in flight on one QP.
Thus an rpcrdma_xprt cannot use more than 256 rpcrdma_mws at a time.
Capping the number of rpcrdma_mws should prevent ro_map from
re-using an R_key, which could change the MR co-ordinates or access
settings while an RPC is still in progress. It also reduces the
number of pre-allocated FRMRs per transport, allowing more
transports per device.
To get more R_keys in flight at once, additional QPs will be
necessary.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/fmr_ops.c | 6 +-----
net/sunrpc/xprtrdma/frwr_ops.c | 6 +-----
net/sunrpc/xprtrdma/xprt_rdma.h | 8 ++++++++
3 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 4632d25..33c4661 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -127,12 +127,8 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_mw *r;
int i, rc;
- i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1);
- i += 2; /* head + tail */
- i *= buf->rb_max_requests; /* one set for each RPC slot */
- dprintk("RPC: %s: initalizing %d FMRs\n", __func__, i);
-
rc = -ENOMEM;
+ i = RPCRDMA_RKEYS_PER_QP;
while (i--) {
r = kzalloc(sizeof(*r), GFP_KERNEL);
if (!r)
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index e0f610c..04b8ab2e 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -317,11 +317,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
int i;
- i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
- i += 2; /* head + tail */
- i *= buf->rb_max_requests; /* one set for each RPC slot */
- dprintk("RPC: %s: initalizing %d FRMRs\n", __func__, i);
-
+ i = RPCRDMA_RKEYS_PER_QP;
while (i--) {
struct rpcrdma_mw *r;
int rc;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index ea8d4f8..8e53057 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -164,6 +164,14 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
*/
#define RPCRDMA_MAX_HDR_SEGS (8)
+/* An R_key is 32 bits wide. 24 bits are fixed per QP, and
+ * 8 bits vary for each registered MR. Thus only 256 R_keys
+ * can be registered at one time per QP.
+ */
+enum {
+ RPCRDMA_RKEYS_PER_QP = 256
+};
+
/*
* struct rpcrdma_rep -- this structure encapsulates state required to recv
* and complete a reply, asychronously. It needs several pieces of
Currently, all three chunk list encoders use a portion of the one
rl_segments array in rpcrdma_req. This is because the MWs for each
chunk list were preserved in rl_segments so that ro_unmap could find
and invalidate them after the RPC was complete.
However, now that MWs are placed on a per-req linked list as they
are registered, there is no longer any information in rpcrdma_mr_seg
that is shared between ro_map and ro_unmap_{sync,safe}, and thus
nothing in rl_segments needs to be preserved after
rpcrdma_marshal_req is complete.
Thus the rl_segments array can be used now just for the needs of
each rpcrdma_convert_iovs call. Once each chunk list is encoded, the
next chunk list encoder is free to re-use all of rl_segments.
This means all three chunk lists in one RPC request can now encode
a full size data payload with no increase in the size of
rl_segments.
This is a key requirement for Kerberos support, since both the Call
and Reply for a single RPC transaction are conveyed via Long
messages (RDMA Read/Write). Both can be large.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 61 ++++++++++++++++++---------------------
net/sunrpc/xprtrdma/xprt_rdma.h | 36 ++++++++++-------------
2 files changed, 44 insertions(+), 53 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index e7e86d7..b3d5a72 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -196,8 +196,7 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
* MR when they can.
*/
static int
-rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
- int n, int nsegs)
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
{
size_t page_offset;
u32 remaining;
@@ -206,7 +205,7 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
base = vec->iov_base;
page_offset = offset_in_page(base);
remaining = vec->iov_len;
- while (remaining && n < nsegs) {
+ while (remaining && n < RPCRDMA_MAX_SEGS) {
seg[n].mr_page = NULL;
seg[n].mr_offset = base;
seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
@@ -230,23 +229,23 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
static int
rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
- enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
+ enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
{
- int len, n = 0, p;
- int page_base;
+ int len, n, p, page_base;
struct page **ppages;
+ n = 0;
if (pos == 0) {
- n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
- if (n == nsegs)
- return -EIO;
+ n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n);
+ if (n == RPCRDMA_MAX_SEGS)
+ goto out_overflow;
}
len = xdrbuf->page_len;
ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
page_base = xdrbuf->page_base & ~PAGE_MASK;
p = 0;
- while (len && n < nsegs) {
+ while (len && n < RPCRDMA_MAX_SEGS) {
if (!ppages[p]) {
/* alloc the pagelist for receiving buffer */
ppages[p] = alloc_page(GFP_ATOMIC);
@@ -257,7 +256,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
seg[n].mr_offset = (void *)(unsigned long) page_base;
seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
if (seg[n].mr_len > PAGE_SIZE)
- return -EIO;
+ goto out_overflow;
len -= seg[n].mr_len;
++n;
++p;
@@ -265,8 +264,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
}
/* Message overflows the seg array */
- if (len && n == nsegs)
- return -EIO;
+ if (len && n == RPCRDMA_MAX_SEGS)
+ goto out_overflow;
/* When encoding the read list, the tail is always sent inline */
if (type == rpcrdma_readch)
@@ -277,12 +276,16 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
* xdr pad bytes, saving the server an RDMA operation. */
if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
return n;
- n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
- if (n == nsegs)
- return -EIO;
+ n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n);
+ if (n == RPCRDMA_MAX_SEGS)
+ goto out_overflow;
}
return n;
+
+out_overflow:
+ pr_err("rpcrdma: segment array overflow\n");
+ return -EIO;
}
static inline __be32 *
@@ -310,7 +313,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, struct rpc_rqst *rqst,
__be32 *iptr, enum rpcrdma_chunktype rtype)
{
- struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ struct rpcrdma_mr_seg *seg;
struct rpcrdma_mw *mw;
unsigned int pos;
int n, nsegs;
@@ -323,8 +326,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
pos = rqst->rq_snd_buf.head[0].iov_len;
if (rtype == rpcrdma_areadch)
pos = 0;
- nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
- RPCRDMA_MAX_SEGS - req->rl_nchunks);
+ seg = req->rl_segments;
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
if (nsegs < 0)
return ERR_PTR(nsegs);
@@ -349,11 +352,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
mw->mw_handle, n < nsegs ? "more" : "last");
r_xprt->rx_stats.read_chunk_count++;
- req->rl_nchunks++;
seg += n;
nsegs -= n;
} while (nsegs);
- req->rl_nextseg = seg;
/* Finish Read list */
*iptr++ = xdr_zero; /* Next item not present */
@@ -377,7 +378,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
struct rpc_rqst *rqst, __be32 *iptr,
enum rpcrdma_chunktype wtype)
{
- struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ struct rpcrdma_mr_seg *seg;
struct rpcrdma_mw *mw;
int n, nsegs, nchunks;
__be32 *segcount;
@@ -387,10 +388,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return iptr;
}
+ seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
rqst->rq_rcv_buf.head[0].iov_len,
- wtype, seg,
- RPCRDMA_MAX_SEGS - req->rl_nchunks);
+ wtype, seg);
if (nsegs < 0)
return ERR_PTR(nsegs);
@@ -414,12 +415,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
r_xprt->rx_stats.write_chunk_count++;
r_xprt->rx_stats.total_rdma_request += seg->mr_len;
- req->rl_nchunks++;
nchunks++;
seg += n;
nsegs -= n;
} while (nsegs);
- req->rl_nextseg = seg;
/* Update count of segments in this Write chunk */
*segcount = cpu_to_be32(nchunks);
@@ -446,7 +445,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, struct rpc_rqst *rqst,
__be32 *iptr, enum rpcrdma_chunktype wtype)
{
- struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ struct rpcrdma_mr_seg *seg;
struct rpcrdma_mw *mw;
int n, nsegs, nchunks;
__be32 *segcount;
@@ -456,8 +455,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
return iptr;
}
- nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
- RPCRDMA_MAX_SEGS - req->rl_nchunks);
+ seg = req->rl_segments;
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
if (nsegs < 0)
return ERR_PTR(nsegs);
@@ -481,12 +480,10 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
r_xprt->rx_stats.reply_chunk_count++;
r_xprt->rx_stats.total_rdma_request += seg->mr_len;
- req->rl_nchunks++;
nchunks++;
seg += n;
nsegs -= n;
} while (nsegs);
- req->rl_nextseg = seg;
/* Update count of segments in the Reply chunk */
*segcount = cpu_to_be32(nchunks);
@@ -656,8 +653,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* send a Call message with a Position Zero Read chunk and a
* regular Read chunk at the same time.
*/
- req->rl_nchunks = 0;
- req->rl_nextseg = req->rl_segments;
iptr = headerp->rm_body.rm_chunks;
iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
if (IS_ERR(iptr))
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 8e53057..33a1b48 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -179,23 +179,14 @@ enum {
* o recv buffer (posted to provider)
* o ib_sge (also donated to provider)
* o status of reply (length, success or not)
- * o bookkeeping state to get run by tasklet (list, etc)
+ * o bookkeeping state to get run by reply handler (list, etc)
*
- * These are allocated during initialization, per-transport instance;
- * however, the tasklet execution list itself is global, as it should
- * always be pretty short.
+ * These are allocated during initialization, per-transport instance.
*
* N of these are associated with a transport instance, and stored in
* struct rpcrdma_buffer. N is the max number of outstanding requests.
*/
-#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE)
-
-/* data segments + head/tail for Call + head/tail for Reply */
-#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4)
-
-struct rpcrdma_buffer;
-
struct rpcrdma_rep {
struct ib_cqe rr_cqe;
unsigned int rr_len;
@@ -275,13 +266,18 @@ struct rpcrdma_mw {
* of iovs for send operations. The reason is that the iovs passed to
* ib_post_{send,recv} must not be modified until the work request
* completes.
- *
- * NOTES:
- * o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we
- * marshal. The number needed varies depending on the iov lists that
- * are passed to us and the memory registration mode we are in.
*/
+/* Maximum number of page-sized "segments" per chunk list to be
+ * registered or invalidated. Must handle a Reply chunk:
+ */
+enum {
+ RPCRDMA_MAX_IOV_SEGS = 3,
+ RPCRDMA_MAX_DATA_SEGS = ((1 * 1024 * 1024) / PAGE_SIZE) + 1,
+ RPCRDMA_MAX_SEGS = RPCRDMA_MAX_DATA_SEGS +
+ RPCRDMA_MAX_IOV_SEGS,
+};
+
struct rpcrdma_mr_seg { /* chunk descriptors */
u32 mr_len; /* length of chunk or segment */
struct page *mr_page; /* owning page, if any */
@@ -290,10 +286,10 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
#define RPCRDMA_MAX_IOVS (2)
+struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_free;
unsigned int rl_niovs;
- unsigned int rl_nchunks;
unsigned int rl_connect_cookie;
struct rpc_task *rl_task;
struct rpcrdma_buffer *rl_buffer;
@@ -301,13 +297,13 @@ struct rpcrdma_req {
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
- struct list_head rl_registered; /* registered segments */
- struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
- struct rpcrdma_mr_seg *rl_nextseg;
struct ib_cqe rl_cqe;
struct list_head rl_all;
bool rl_backchannel;
+
+ struct list_head rl_registered; /* registered segments */
+ struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};
static inline struct rpcrdma_req *
When the remaining length of an incoming reply is longer than the
XDR buf's page_len, switch over to the tail iovec instead of
copying more than page_len bytes into the page list.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index b3d5a72..67cd3bc 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -776,12 +776,17 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
page_base &= ~PAGE_MASK;
if (copy_len && rqst->rq_rcv_buf.page_len) {
- npages = PAGE_ALIGN(page_base +
- rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
+ int pagelist_len;
+
+ pagelist_len = rqst->rq_rcv_buf.page_len;
+ if (pagelist_len > copy_len)
+ pagelist_len = copy_len;
+ npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
for (; i < npages; i++) {
curlen = PAGE_SIZE - page_base;
- if (curlen > copy_len)
- curlen = copy_len;
+ if (curlen > pagelist_len)
+ curlen = pagelist_len;
+
dprintk("RPC: %s: page %d"
" srcp 0x%p len %d curlen %d\n",
__func__, i, srcp, copy_len, curlen);
@@ -791,7 +796,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
kunmap_atomic(destp);
srcp += curlen;
copy_len -= curlen;
- if (copy_len == 0)
+ pagelist_len -= curlen;
+ if (!pagelist_len)
break;
page_base = 0;
}
While trying NFSv4.0/RDMA with sec=krb5p, I noticed small NFS READ
operations failed. After the client unwrapped the NFS READ reply
message, the NFS READ XDR decoder was not able to decode the reply.
The message was "Server cheating in reply", with the reported
number of received payload bytes being zero. Applications reported
a read(2) that returned -1/EIO.
The problem is rpcrdma_inline_fixup() sets the tail.iov_len to zero
when the incoming reply fits entirely in the head iovec. The zero
tail.iov_len confused xdr_buf_trim(), which then mangled the actual
reply data instead of simply removing the trailing GSS checksum.
As near as I can tell, RPC transports are not supposed to update the
head.iov_len, page_len, or tail.iov_len fields in the receive XDR
buffer when handling an incoming RPC reply message. These fields
contain the length of each component of the XDR buffer, and hence
the maximum number of bytes of reply data that can be stored in each
XDR buffer component. I've concluded this because:
- This is how xdr_partial_copy_from_skb() appears to behave
- rpcrdma_inline_fixup() already does not alter page_len
- call_decode() compares rq_private_buf and rq_rcv_buf and WARNs
if they are not exactly the same
Unfortunately, as soon as I tried the simple fix to just remove the
line that sets tail.iov_len to zero, I saw that the logic that
appends the implicit Write chunk pad inline depends on inline_fixup
setting tail.iov_len to zero.
To address this, re-organize the tail iovec handling logic to use
the same approach as with the head iovec: simply point tail.iov_base
to the correct bytes in the receive buffer.
While I remember all this, write down the conclusion in documenting
comments.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 61 ++++++++++++++++++++++------------------
1 file changed, 33 insertions(+), 28 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 67cd3bc..be70f22a 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -743,8 +743,16 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
return total_len;
}
-/*
- * Scatter inline received data back into provided iov's.
+/**
+ * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
+ * @rqst: controlling RPC request
+ * @srcp: points to RPC message payload in receive buffer
+ * @copy_len: remaining length of receive buffer content
+ * @pad: Write chunk pad bytes needed (zero for pure inline)
+ *
+ * The upper layer has set the maximum number of bytes it can
+ * receive in each component of rq_rcv_buf. These values are set in
+ * the head.iov_len, page_len, tail.iov_len, and buflen fields.
*/
static void
rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
@@ -754,17 +762,19 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
struct page **ppages;
int page_base;
+ /* The head iovec is redirected to the RPC reply message
+ * in the receive buffer, to avoid a memcopy.
+ */
+ rqst->rq_rcv_buf.head[0].iov_base = srcp;
+
+ /* The contents of the receive buffer that follow
+ * head.iov_len bytes are copied into the page list.
+ */
curlen = rqst->rq_rcv_buf.head[0].iov_len;
- if (curlen > copy_len) { /* write chunk header fixup */
+ if (curlen > copy_len)
curlen = copy_len;
- rqst->rq_rcv_buf.head[0].iov_len = curlen;
- }
-
dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
__func__, srcp, copy_len, curlen);
-
- /* Shift pointer for first receive segment only */
- rqst->rq_rcv_buf.head[0].iov_base = srcp;
srcp += curlen;
copy_len -= curlen;
@@ -801,28 +811,23 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
break;
page_base = 0;
}
- }
- if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
- curlen = copy_len;
- if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
- curlen = rqst->rq_rcv_buf.tail[0].iov_len;
- if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
- memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
- dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n",
- __func__, srcp, copy_len, curlen);
- rqst->rq_rcv_buf.tail[0].iov_len = curlen;
- copy_len -= curlen; ++i;
- } else
- rqst->rq_rcv_buf.tail[0].iov_len = 0;
-
- if (pad) {
- /* implicit padding on terminal chunk */
- unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base;
- while (pad--)
- p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0;
+ /* Implicit padding for the last segment in a Write
+ * chunk is inserted inline at the front of the tail
+ * iovec. The upper layer ignores the content of
+ * the pad. Simply ensure inline content in the tail
+ * that follows the Write chunk is properly aligned.
+ */
+ if (pad)
+ srcp -= pad;
}
+ /* The tail iovec is redirected to the remaining data
+ * in the receive buffer, to avoid a memcopy.
+ */
+ if (copy_len || pad)
+ rqst->rq_rcv_buf.tail[0].iov_base = srcp;
+
if (copy_len)
dprintk("RPC: %s: %d bytes in"
" %d extra segments (%d lost)\n",
Now that rpcrdma_inline_fixup() updates only two fields in
rq_rcv_buf, a full memcpy of that structure to rq_private_buf is
unwarranted. Updating rq_private_buf fields only where needed also
better documents what is going on.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index be70f22a..b763f49 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -753,6 +753,11 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
* The upper layer has set the maximum number of bytes it can
* receive in each component of rq_rcv_buf. These values are set in
* the head.iov_len, page_len, tail.iov_len, and buflen fields.
+ *
+ * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
+ * many cases this function simply updates iov_base pointers in
+ * rq_rcv_buf to point directly to the received reply data, to
+ * avoid copying reply data.
*/
static void
rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
@@ -766,6 +771,7 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
* in the receive buffer, to avoid a memcopy.
*/
rqst->rq_rcv_buf.head[0].iov_base = srcp;
+ rqst->rq_private_buf.head[0].iov_base = srcp;
/* The contents of the receive buffer that follow
* head.iov_len bytes are copied into the page list.
@@ -825,16 +831,15 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
/* The tail iovec is redirected to the remaining data
* in the receive buffer, to avoid a memcopy.
*/
- if (copy_len || pad)
+ if (copy_len || pad) {
rqst->rq_rcv_buf.tail[0].iov_base = srcp;
+ rqst->rq_private_buf.tail[0].iov_base = srcp;
+ }
if (copy_len)
dprintk("RPC: %s: %d bytes in"
" %d extra segments (%d lost)\n",
__func__, olen, i, copy_len);
-
- /* TBD avoid a warning from call_decode() */
- rqst->rq_private_buf = rqst->rq_rcv_buf;
}
void
fixup_copy_count should count only the number of bytes copied to the
page list. The head and tail are now always handled without a data
copy.
And the debugging at the end of rpcrdma_inline_fixup() is also no
longer necessary, since copy_len will be non-zero when there is reply
data in the tail (a normal and valid case).
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 26 +++++++++++++-------------
1 file changed, 13 insertions(+), 13 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index b763f49..34a5df9 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -758,11 +758,14 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
* many cases this function simply updates iov_base pointers in
* rq_rcv_buf to point directly to the received reply data, to
* avoid copying reply data.
+ *
+ * Returns the count of bytes which had to be memcopied.
*/
-static void
+static unsigned long
rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
{
- int i, npages, curlen, olen;
+ unsigned long fixup_copy_count;
+ int i, npages, curlen;
char *destp;
struct page **ppages;
int page_base;
@@ -784,13 +787,10 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
srcp += curlen;
copy_len -= curlen;
- olen = copy_len;
- i = 0;
- rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
page_base = rqst->rq_rcv_buf.page_base;
ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
page_base &= ~PAGE_MASK;
-
+ fixup_copy_count = 0;
if (copy_len && rqst->rq_rcv_buf.page_len) {
int pagelist_len;
@@ -798,7 +798,7 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
if (pagelist_len > copy_len)
pagelist_len = copy_len;
npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
- for (; i < npages; i++) {
+ for (i = 0; i < npages; i++) {
curlen = PAGE_SIZE - page_base;
if (curlen > pagelist_len)
curlen = pagelist_len;
@@ -812,6 +812,7 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
kunmap_atomic(destp);
srcp += curlen;
copy_len -= curlen;
+ fixup_copy_count += curlen;
pagelist_len -= curlen;
if (!pagelist_len)
break;
@@ -836,10 +837,7 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
rqst->rq_private_buf.tail[0].iov_base = srcp;
}
- if (copy_len)
- dprintk("RPC: %s: %d bytes in"
- " %d extra segments (%d lost)\n",
- __func__, olen, i, copy_len);
+ return fixup_copy_count;
}
void
@@ -1002,8 +1000,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
rep->rr_len -= RPCRDMA_HDRLEN_MIN;
status = rep->rr_len;
}
- /* Fix up the rpc results for upper layer */
- rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
+
+ r_xprt->rx_stats.fixup_copy_count +=
+ rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len,
+ rdmalen);
break;
case rdma_nomsg:
Direct data placement is not allowed when using flavors that
guarantee integrity or privacy. When such security flavors are in
effect, don't allow the use of Read and Write chunks for moving
individual data items. All messages larger than the inline threshold
are sent via Long Call or Long Reply.
On my systems (CX-3 Pro on FDR), for small I/O operations, the use
of Long messages adds only around 5 usecs of latency in each
direction.
Note that when integrity or encryption is used, the host CPU touches
every byte in these messages. Even if it could be used, data
movement offload doesn't buy much in this case.
Signed-off-by: Chuck Lever <[email protected]>
---
include/linux/sunrpc/auth.h | 5 ++++-
include/linux/sunrpc/gss_api.h | 2 ++
net/sunrpc/auth_gss/auth_gss.c | 5 ++++-
net/sunrpc/auth_gss/gss_krb5_mech.c | 2 ++
net/sunrpc/auth_gss/gss_mech_switch.c | 12 ++++++++++++
net/sunrpc/xprtrdma/rpc_rdma.c | 12 ++++++++++--
6 files changed, 34 insertions(+), 4 deletions(-)
diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
index 8997915..a7d52eb 100644
--- a/include/linux/sunrpc/auth.h
+++ b/include/linux/sunrpc/auth.h
@@ -95,7 +95,7 @@ struct rpc_auth {
/* for gss, used to calculate au_rslack: */
unsigned int au_verfsize;
- unsigned int au_flags; /* various flags */
+ unsigned long au_flags; /* various flags */
const struct rpc_authops *au_ops; /* operations */
rpc_authflavor_t au_flavor; /* pseudoflavor (note may
* differ from the flavor in
@@ -107,6 +107,9 @@ struct rpc_auth {
/* per-flavor data */
};
+/* au_flags */
+#define RPCAUTH_AUTH_DATATOUCH 0
+
struct rpc_auth_create_args {
rpc_authflavor_t pseudoflavor;
const char *target_name;
diff --git a/include/linux/sunrpc/gss_api.h b/include/linux/sunrpc/gss_api.h
index 1f911cc..68ec78c 100644
--- a/include/linux/sunrpc/gss_api.h
+++ b/include/linux/sunrpc/gss_api.h
@@ -73,6 +73,7 @@ u32 gss_delete_sec_context(
rpc_authflavor_t gss_svc_to_pseudoflavor(struct gss_api_mech *, u32 qop,
u32 service);
u32 gss_pseudoflavor_to_service(struct gss_api_mech *, u32 pseudoflavor);
+bool gss_pseudoflavor_to_datatouch(struct gss_api_mech *, u32 pseudoflavor);
char *gss_service_to_auth_domain_name(struct gss_api_mech *, u32 service);
struct pf_desc {
@@ -81,6 +82,7 @@ struct pf_desc {
u32 service;
char *name;
char *auth_domain_name;
+ bool datatouch;
};
/* Different mechanisms (e.g., krb5 or spkm3) may implement gss-api, and
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index e64ae93..aa2c69e 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -990,7 +990,8 @@ gss_create_new(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
if (!try_module_get(THIS_MODULE))
return ERR_PTR(err);
- if (!(gss_auth = kmalloc(sizeof(*gss_auth), GFP_KERNEL)))
+ gss_auth = kzalloc(sizeof(*gss_auth), GFP_KERNEL);
+ if (!gss_auth)
goto out_dec;
INIT_HLIST_NODE(&gss_auth->hash);
gss_auth->target_name = NULL;
@@ -1017,6 +1018,8 @@ gss_create_new(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
auth->au_rslack = GSS_VERF_SLACK >> 2;
auth->au_ops = &authgss_ops;
auth->au_flavor = flavor;
+ if (gss_pseudoflavor_to_datatouch(gss_auth->mech, flavor))
+ set_bit(RPCAUTH_AUTH_DATATOUCH, &auth->au_flags);
atomic_set(&auth->au_count, 1);
kref_init(&gss_auth->kref);
diff --git a/net/sunrpc/auth_gss/gss_krb5_mech.c b/net/sunrpc/auth_gss/gss_krb5_mech.c
index 6542749..6059583 100644
--- a/net/sunrpc/auth_gss/gss_krb5_mech.c
+++ b/net/sunrpc/auth_gss/gss_krb5_mech.c
@@ -745,12 +745,14 @@ static struct pf_desc gss_kerberos_pfs[] = {
.qop = GSS_C_QOP_DEFAULT,
.service = RPC_GSS_SVC_INTEGRITY,
.name = "krb5i",
+ .datatouch = true,
},
[2] = {
.pseudoflavor = RPC_AUTH_GSS_KRB5P,
.qop = GSS_C_QOP_DEFAULT,
.service = RPC_GSS_SVC_PRIVACY,
.name = "krb5p",
+ .datatouch = true,
},
};
diff --git a/net/sunrpc/auth_gss/gss_mech_switch.c b/net/sunrpc/auth_gss/gss_mech_switch.c
index 7063d85..5fec3ab 100644
--- a/net/sunrpc/auth_gss/gss_mech_switch.c
+++ b/net/sunrpc/auth_gss/gss_mech_switch.c
@@ -361,6 +361,18 @@ gss_pseudoflavor_to_service(struct gss_api_mech *gm, u32 pseudoflavor)
}
EXPORT_SYMBOL(gss_pseudoflavor_to_service);
+bool
+gss_pseudoflavor_to_datatouch(struct gss_api_mech *gm, u32 pseudoflavor)
+{
+ int i;
+
+ for (i = 0; i < gm->gm_pf_num; i++) {
+ if (gm->gm_pfs[i].pseudoflavor == pseudoflavor)
+ return gm->gm_pfs[i].datatouch;
+ }
+ return false;
+}
+
char *
gss_service_to_auth_domain_name(struct gss_api_mech *gm, u32 service)
{
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 34a5df9..aa7951a 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -570,6 +570,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
+ bool ddp_allowed;
ssize_t hdrlen;
size_t rpclen;
__be32 *iptr;
@@ -586,6 +587,13 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
headerp->rm_type = rdma_msg;
+ /* When the ULP employs a GSS flavor that guarantees integrity
+ * or privacy, direct data placement of individual data items
+ * is not allowed.
+ */
+ ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH,
+ &rqst->rq_cred->cr_auth->au_flags);
+
/*
* Chunks needed for results?
*
@@ -597,7 +605,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
*/
if (rpcrdma_results_inline(r_xprt, rqst))
wtype = rpcrdma_noch;
- else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+ else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
wtype = rpcrdma_writech;
else
wtype = rpcrdma_replych;
@@ -620,7 +628,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
rtype = rpcrdma_noch;
rpcrdma_inline_pullup(rqst);
rpclen = rqst->rq_svec[0].iov_len;
- } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
+ } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = rpcrdma_readch;
rpclen = rqst->rq_svec[0].iov_len;
rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
If an RPC program does not set vs_dispatch and pc_func() returns
rpc_drop_reply, the server sends a reply anyway containing a single
word containing the value RPC_DROP_REPLY (in network byte-order, of
course). This is a nonsense RPC message.
Fixes: 9e701c610923 ("svcrpc: simpler request dropping")
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/svc.c | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index cc98528..87290a5 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1188,7 +1188,8 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
*statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
/* Encode reply */
- if (test_bit(RQ_DROPME, &rqstp->rq_flags)) {
+ if (*statp == rpc_drop_reply ||
+ test_bit(RQ_DROPME, &rqstp->rq_flags)) {
if (procp->pc_release)
procp->pc_release(rqstp, NULL, rqstp->rq_resp);
goto dropit;
Before commit 778be232a207 ("NFS do not find client in NFSv4
pg_authenticate"), the Linux callback server replied with
RPC_AUTH_ERROR / RPC_AUTH_BADCRED, instead of dropping the CB
request. Let's restore that behavior so the server has a chance to
do something useful about it, and provide a warning that helps
admins correct the problem.
Fixes: 778be232a207 ("NFS do not find client in NFSv4 ...")
Signed-off-by: Chuck Lever <[email protected]>
---
fs/nfs/callback_xdr.c | 6 +++++-
net/sunrpc/svc.c | 5 +++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
index d81f96a..656f68f 100644
--- a/fs/nfs/callback_xdr.c
+++ b/fs/nfs/callback_xdr.c
@@ -925,7 +925,7 @@ static __be32 nfs4_callback_compound(struct svc_rqst *rqstp, void *argp, void *r
if (hdr_arg.minorversion == 0) {
cps.clp = nfs4_find_client_ident(SVC_NET(rqstp), hdr_arg.cb_ident);
if (!cps.clp || !check_gss_callback_principal(cps.clp, rqstp))
- return rpc_drop_reply;
+ goto out_invalidcred;
}
cps.minorversion = hdr_arg.minorversion;
@@ -953,6 +953,10 @@ static __be32 nfs4_callback_compound(struct svc_rqst *rqstp, void *argp, void *r
nfs_put_client(cps.clp);
dprintk("%s: done, status = %u\n", __func__, ntohl(status));
return rpc_success;
+
+out_invalidcred:
+ pr_warn_ratelimited("NFS: NFSv4 callback contains invalid cred\n");
+ return rpc_autherr_badcred;
}
/*
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 87290a5..c5b0cb4 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1194,6 +1194,11 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
procp->pc_release(rqstp, NULL, rqstp->rq_resp);
goto dropit;
}
+ if (*statp == rpc_autherr_badcred) {
+ if (procp->pc_release)
+ procp->pc_release(rqstp, NULL, rqstp->rq_resp);
+ goto err_bad_auth;
+ }
if (*statp == rpc_success &&
(xdr = procp->pc_encode) &&
!xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
Clean up: the extra layer of indirection doesn't add value.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 4 +++-
net/sunrpc/xprtrdma/verbs.c | 11 +----------
net/sunrpc/xprtrdma/xprt_rdma.h | 2 +-
3 files changed, 5 insertions(+), 12 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index aa7951a..41f7bf7 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -919,8 +919,10 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
* allowed to timeout, to discover the errors at that time.
*/
void
-rpcrdma_reply_handler(struct rpcrdma_rep *rep)
+rpcrdma_reply_handler(struct work_struct *work)
{
+ struct rpcrdma_rep *rep =
+ container_of(work, struct rpcrdma_rep, rr_work);
struct rpcrdma_msg *headerp;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e016b9c..d9d8aa0 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -128,15 +128,6 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
wc->status, wc->vendor_err);
}
-static void
-rpcrdma_receive_worker(struct work_struct *work)
-{
- struct rpcrdma_rep *rep =
- container_of(work, struct rpcrdma_rep, rr_work);
-
- rpcrdma_reply_handler(rep);
-}
-
/* Perform basic sanity checking to avoid using garbage
* to update the credit grant value.
*/
@@ -840,7 +831,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_device = ia->ri_device;
rep->rr_cqe.done = rpcrdma_receive_wc;
rep->rr_rxprt = r_xprt;
- INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
+ INIT_WORK(&rep->rr_work, rpcrdma_reply_handler);
return rep;
out_free:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 33a1b48..3bd47f1 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -507,7 +507,7 @@ rpcrdma_data_dir(bool writing)
*/
void rpcrdma_connect_worker(struct work_struct *);
void rpcrdma_conn_func(struct rpcrdma_ep *);
-void rpcrdma_reply_handler(struct rpcrdma_rep *);
+void rpcrdma_reply_handler(struct work_struct *);
/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
Clean up: Use a more efficient way to get the inline threshold
values.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/backchannel.c | 4 ++--
net/sunrpc/xprtrdma/rpc_rdma.c | 2 +-
net/sunrpc/xprtrdma/transport.c | 6 +++---
net/sunrpc/xprtrdma/xprt_rdma.h | 9 ---------
4 files changed, 6 insertions(+), 15 deletions(-)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 87762d9..5f60ab2 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -46,13 +46,13 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
return PTR_ERR(req);
req->rl_backchannel = true;
- size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+ size = r_xprt->rx_data.inline_wsize;
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;
- size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+ size += r_xprt->rx_data.inline_rsize;
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 41f7bf7..48a8dc9 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -673,7 +673,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
goto out_unmap;
hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
- if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+ if (hdrlen + rpclen > r_xprt->rx_data.inline_wsize)
goto out_overflow;
dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 549486d..970f102 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -518,7 +518,7 @@ out:
return req->rl_sendbuf->rg_base;
out_rdmabuf:
- min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ min_size = r_xprt->rx_data.inline_wsize;
rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
if (IS_ERR(rb))
goto out_fail;
@@ -541,8 +541,8 @@ out_sendbuf:
* reply will be large, but slush is provided here to allow
* flexibility when marshaling.
*/
- min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
- min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ min_size = r_xprt->rx_data.inline_rsize;
+ min_size += r_xprt->rx_data.inline_wsize;
if (size < min_size)
size = min_size;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3bd47f1..63d59f1 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -362,15 +362,6 @@ struct rpcrdma_create_data_internal {
unsigned int padding; /* non-rdma write header padding */
};
-#define RPCRDMA_INLINE_READ_THRESHOLD(rq) \
- (rpcx_to_rdmad(rq->rq_xprt).inline_rsize)
-
-#define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\
- (rpcx_to_rdmad(rq->rq_xprt).inline_wsize)
-
-#define RPCRDMA_INLINE_PAD_VALUE(rq)\
- rpcx_to_rdmad(rq->rq_xprt).padding
-
/*
* Statistics for RPCRDMA
*/
Clean up: Disentangle connection helpers from RPC-over-RDMA reply
decoding functions.
Signed-off-by: Chuck Lever <[email protected]>
---
net/sunrpc/xprtrdma/rpc_rdma.c | 34 ----------------------------------
net/sunrpc/xprtrdma/transport.c | 28 ++++++++++++++++++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 10 +++-------
3 files changed, 31 insertions(+), 41 deletions(-)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 48a8dc9..f07a77c 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -848,28 +848,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
return fixup_copy_count;
}
-void
-rpcrdma_connect_worker(struct work_struct *work)
-{
- struct rpcrdma_ep *ep =
- container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
- struct rpcrdma_xprt *r_xprt =
- container_of(ep, struct rpcrdma_xprt, rx_ep);
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-
- spin_lock_bh(&xprt->transport_lock);
- if (++xprt->connect_cookie == 0) /* maintain a reserved value */
- ++xprt->connect_cookie;
- if (ep->rep_connected > 0) {
- if (!xprt_test_and_set_connected(xprt))
- xprt_wake_pending_tasks(xprt, 0);
- } else {
- if (xprt_test_and_clear_connected(xprt))
- xprt_wake_pending_tasks(xprt, -ENOTCONN);
- }
- spin_unlock_bh(&xprt->transport_lock);
-}
-
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
/* By convention, backchannel calls arrive via rdma_msg type
* messages, and never populate the chunk lists. This makes
@@ -901,18 +879,6 @@ rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
}
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
-/*
- * This function is called when an async event is posted to
- * the connection which changes the connection state. All it
- * does at this point is mark the connection up/down, the rpc
- * timers do the rest.
- */
-void
-rpcrdma_conn_func(struct rpcrdma_ep *ep)
-{
- schedule_delayed_work(&ep->rep_connect_worker, 0);
-}
-
/* Process received RPC/RDMA messages.
*
* Errors must result in the RPC task either being awakened, or
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 970f102..8b9bb6e 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -219,6 +219,34 @@ xprt_rdma_free_addresses(struct rpc_xprt *xprt)
}
}
+void
+rpcrdma_conn_func(struct rpcrdma_ep *ep)
+{
+ schedule_delayed_work(&ep->rep_connect_worker, 0);
+}
+
+void
+rpcrdma_connect_worker(struct work_struct *work)
+{
+ struct rpcrdma_ep *ep =
+ container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
+ struct rpcrdma_xprt *r_xprt =
+ container_of(ep, struct rpcrdma_xprt, rx_ep);
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+
+ spin_lock_bh(&xprt->transport_lock);
+ if (++xprt->connect_cookie == 0) /* maintain a reserved value */
+ ++xprt->connect_cookie;
+ if (ep->rep_connected > 0) {
+ if (!xprt_test_and_set_connected(xprt))
+ xprt_wake_pending_tasks(xprt, 0);
+ } else {
+ if (xprt_test_and_clear_connected(xprt))
+ xprt_wake_pending_tasks(xprt, -ENOTCONN);
+ }
+ spin_unlock_bh(&xprt->transport_lock);
+}
+
static void
xprt_rdma_connect_worker(struct work_struct *work)
{
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 63d59f1..aea309b 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -447,6 +447,7 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
struct rpcrdma_create_data_internal *);
void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
+void rpcrdma_conn_func(struct rpcrdma_ep *);
int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
@@ -494,25 +495,20 @@ rpcrdma_data_dir(bool writing)
}
/*
- * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
- */
-void rpcrdma_connect_worker(struct work_struct *);
-void rpcrdma_conn_func(struct rpcrdma_ep *);
-void rpcrdma_reply_handler(struct work_struct *);
-
-/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
int rpcrdma_marshal_req(struct rpc_rqst *);
void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *,
struct rpcrdma_create_data_internal *,
unsigned int);
+void rpcrdma_reply_handler(struct work_struct *);
/* RPC/RDMA module init - xprtrdma/transport.c
*/
extern unsigned int xprt_rdma_max_inline_read;
void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
void xprt_rdma_free_addresses(struct rpc_xprt *xprt);
+void rpcrdma_connect_worker(struct work_struct *);
void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq);
int xprt_rdma_init(void);
void xprt_rdma_cleanup(void);
On Tue, Jun 07, 2016 at 03:47:32PM -0400, Chuck Lever wrote:
> The fixed part of an R_key is 24 bits, but the variant part is only
> 8 bits, allowing 256 unique R_keys concurrenly in flight on one QP.
>
> Thus an rpcrdma_xprt cannot use more than 256 rpcrdma_mws at a time.
>
> Capping the number of rpcrdma_mws should prevent ro_map from
> re-using an R_key, which could change the MR co-ordinates or access
> settings while an RPC is still in progress. It also reduces the
> number of pre-allocated FRMRs per transport, allowing more
> transports per device.
>
> To get more R_keys in flight at once, additional QPs will be
> necessary.
??
The 24 bit / 8 bit thing doesn't create new MRs, it just disambiguates
the life cycle of a single MR. Further, the MR is connected to a PD,
not a QP, there is no such thing as '256 unique R_keys concurrenly in
flight on one QP' ???
This is why no ULP has ever needed this constant before:
> +enum {
> + RPCRDMA_RKEYS_PER_QP = 256
> +};
Because you never need to care when working with FRMRs.
Either the number in queue is limited by the invalidation
synchronization point to 2, or in the case of iwarp read, it doesn't
matter since everything executes serially in order and the value can
safely wrap.
Jason
> On Jun 7, 2016, at 4:49 PM, Jason Gunthorpe <[email protected]> wrote:
>
> On Tue, Jun 07, 2016 at 03:47:32PM -0400, Chuck Lever wrote:
>> The fixed part of an R_key is 24 bits, but the variant part is only
>> 8 bits, allowing 256 unique R_keys concurrenly in flight on one QP.
>>
>> Thus an rpcrdma_xprt cannot use more than 256 rpcrdma_mws at a time.
>>
>> Capping the number of rpcrdma_mws should prevent ro_map from
>> re-using an R_key, which could change the MR co-ordinates or access
>> settings while an RPC is still in progress. It also reduces the
>> number of pre-allocated FRMRs per transport, allowing more
>> transports per device.
>>
>> To get more R_keys in flight at once, additional QPs will be
>> necessary.
>
> ??
>
> The 24 bit / 8 bit thing doesn't create new MRs, it just disambiguates
> the life cycle of a single MR.
Right.
> Further, the MR is connected to a PD,
> not a QP, there is no such thing as '256 unique R_keys concurrenly in
> flight on one QP' ???
OK, fair enough, but...
> This is why no ULP has ever needed this constant before:
>
>> +enum {
>> + RPCRDMA_RKEYS_PER_QP = 256
>> +};
>
> Because you never need to care when working with FRMRs.
>
> Either the number in queue is limited by the invalidation
> synchronization point to 2, or in the case of iwarp read, it doesn't
> matter since everything executes serially in order and the value can
> safely wrap.
I don't understand this. How does invalidation prevent
the ULP from registering more R_keys?
And, I'd like to limit the number of pre-allocated MRs
to no more than each transport endpoint can use. For
xprtrdma, a transport is one QP and PD. What is that
number?
--
Chuck Lever
On Tue, Jun 07, 2016 at 05:09:47PM -0400, Chuck Lever wrote:
> > Either the number in queue is limited by the invalidation
> > synchronization point to 2, or in the case of iwarp read, it doesn't
> > matter since everything executes serially in order and the value can
> > safely wrap.
>
> I don't understand this. How does invalidation prevent
> the ULP from registering more R_keys?
The ULP doesn't 'register' rkeys, it only changes the active RKey assigned
to a MR. The number of active rkeys is strictly limited to the number
of available MRs.
Further changing the rkey can only be done as part of an invalidate
cycle.
Invaliding a MR/RKey can only happen after synchronization with the
remote.
So the work queue looks broadly like this:
WR setup rkey 1
WR send rkey 1 to remote
<synchronize>
WR invalidate rkey 1
WR setup rkey 2
WR send rkey 2 to remote
<synchronize>
Thus the WR queue can never have more than 2 rkey's per MR in it at
any time, and there is no need to care about the 24/8 bit split.
> And, I'd like to limit the number of pre-allocated MRs
> to no more than each transport endpoint can use. For
> xprtrdma, a transport is one QP and PD. What is that
> number?
I'm not sure I understand this question. AFAIK there is no limit on
MRs that can be used with a QP. As long as the MR is allocated you can
use it.
Typically you'd allocate enough MRs to handle your typical case
concurrancy level (pipeline depth).
The concurrancy level is limited by lots of things, for instance the
max number of posted recv's typically places a hard upper limit.
Jason
> On Jun 7, 2016, at 5:28 PM, Jason Gunthorpe <[email protected]> wrote:
>
> On Tue, Jun 07, 2016 at 05:09:47PM -0400, Chuck Lever wrote:
>
>>> Either the number in queue is limited by the invalidation
>>> synchronization point to 2, or in the case of iwarp read, it doesn't
>>> matter since everything executes serially in order and the value can
>>> safely wrap.
>>
>> I don't understand this. How does invalidation prevent
>> the ULP from registering more R_keys?
>
> The ULP doesn't 'register' rkeys, it only changes the active RKey assigned
> to a MR. The number of active rkeys is strictly limited to the number
> of available MRs.
I'm sure I'm using the language poorly.
> Further changing the rkey can only be done as part of an invalidate
> cycle.
OK, I'm not remembering something very clearly, and
I've drawn an incorrect conclusion. Yes, there always
has to be an invalidation before an R_key can be
reused to register a new MR.
> Invaliding a MR/RKey can only happen after synchronization with the
> remote.
>
> So the work queue looks broadly like this:
>
> WR setup rkey 1
> WR send rkey 1 to remote
> <synchronize>
> WR invalidate rkey 1
> WR setup rkey 2
> WR send rkey 2 to remote
> <synchronize>
>
> Thus the WR queue can never have more than 2 rkey's per MR in it at
> any time, and there is no need to care about the 24/8 bit split.
Agreed, I see that the split has nothing to do with it.
I will drop this patch.
>> And, I'd like to limit the number of pre-allocated MRs
>> to no more than each transport endpoint can use. For
>> xprtrdma, a transport is one QP and PD. What is that
>> number?
>
> I'm not sure I understand this question. AFAIK there is no limit on
> MRs that can be used with a QP. As long as the MR is allocated you can
> use it.
There is a practical number of MRs that can be allocated
per device, I thought. And each MR consumes some amount
of host memory.
xprtrdma is happy to allocate thousands of MRs per QP/PD
pair, but that isn't practical as you start adding more
transports/connections.
The question has to do with scaling the number of xprt
connections over available device resources.
> Typically you'd allocate enough MRs to handle your typical case
> concurrancy level (pipeline depth).
>
> The concurrancy level is limited by lots of things, for instance the
> max number of posted recv's typically places a hard upper limit.
xprtrdma already makes an estimate. I'm wondering if its
still a valid one. Fewer MRs means better scaling in number
of transports.
--
Chuck Lever
On Tue, Jun 07, 2016 at 05:51:04PM -0400, Chuck Lever wrote:
> There is a practical number of MRs that can be allocated
> per device, I thought. And each MR consumes some amount
> of host memory.
> xprtrdma is happy to allocate thousands of MRs per QP/PD
> pair, but that isn't practical as you start adding more
> transports/connections.
Yes, this is all sane and valid.
Typically you'd target a concurrency limit per QP and allocate
everything (# MRs, sq,rq,cq depth, etc) in accordance with that goal,
I'd also suggest that target concurrency is probably a user tunable..
Jason
On 6/7/2016 6:01 PM, Jason Gunthorpe wrote:
> On Tue, Jun 07, 2016 at 05:51:04PM -0400, Chuck Lever wrote:
>
>> There is a practical number of MRs that can be allocated
>> per device, I thought. And each MR consumes some amount
>> of host memory.
>
>> xprtrdma is happy to allocate thousands of MRs per QP/PD
>> pair, but that isn't practical as you start adding more
>> transports/connections.
>
> Yes, this is all sane and valid.
>
> Typically you'd target a concurrency limit per QP and allocate
> everything (# MRs, sq,rq,cq depth, etc) in accordance with that goal,
> I'd also suggest that target concurrency is probably a user tunable..
This is already present - it's the RPC slot table depth. It would be
great, IMO, to make this a mount parameter since it is transport- and
also server-dependent, but currently I believe it is just a kernel
RPC module parameter.
Tom.
DQoNCk9uIDYvOC8xNiwgMTA6NTQsICJsaW51eC1uZnMtb3duZXJAdmdlci5rZXJuZWwub3JnIG9u
IGJlaGFsZiBvZiBUb20gVGFscGV5IiA8bGludXgtbmZzLW93bmVyQHZnZXIua2VybmVsLm9yZyBv
biBiZWhhbGYgb2YgdG9tQHRhbHBleS5jb20+IHdyb3RlOg0KDQo+T24gNi83LzIwMTYgNjowMSBQ
TSwgSmFzb24gR3VudGhvcnBlIHdyb3RlOg0KPj4gT24gVHVlLCBKdW4gMDcsIDIwMTYgYXQgMDU6
NTE6MDRQTSAtMDQwMCwgQ2h1Y2sgTGV2ZXIgd3JvdGU6DQo+Pg0KPj4+IFRoZXJlIGlzIGEgcHJh
Y3RpY2FsIG51bWJlciBvZiBNUnMgdGhhdCBjYW4gYmUgYWxsb2NhdGVkDQo+Pj4gcGVyIGRldmlj
ZSwgSSB0aG91Z2h0LiBBbmQgZWFjaCBNUiBjb25zdW1lcyBzb21lIGFtb3VudA0KPj4+IG9mIGhv
c3QgbWVtb3J5Lg0KPj4NCj4+PiB4cHJ0cmRtYSBpcyBoYXBweSB0byBhbGxvY2F0ZSB0aG91c2Fu
ZHMgb2YgTVJzIHBlciBRUC9QRA0KPj4+IHBhaXIsIGJ1dCB0aGF0IGlzbid0IHByYWN0aWNhbCBh
cyB5b3Ugc3RhcnQgYWRkaW5nIG1vcmUNCj4+PiB0cmFuc3BvcnRzL2Nvbm5lY3Rpb25zLg0KPj4N
Cj4+IFllcywgdGhpcyBpcyBhbGwgc2FuZSBhbmQgdmFsaWQuDQo+Pg0KPj4gVHlwaWNhbGx5IHlv
dSdkIHRhcmdldCBhIGNvbmN1cnJlbmN5IGxpbWl0IHBlciBRUCBhbmQgYWxsb2NhdGUNCj4+IGV2
ZXJ5dGhpbmcgKCMgTVJzLCBzcSxycSxjcSBkZXB0aCwgZXRjKSBpbiBhY2NvcmRhbmNlIHdpdGgg
dGhhdCBnb2FsLA0KPj4gSSdkIGFsc28gc3VnZ2VzdCB0aGF0IHRhcmdldCBjb25jdXJyZW5jeSBp
cyBwcm9iYWJseSBhIHVzZXIgdHVuYWJsZS4uDQo+DQo+VGhpcyBpcyBhbHJlYWR5IHByZXNlbnQg
LSBpdCdzIHRoZSBSUEMgc2xvdCB0YWJsZSBkZXB0aC4gSXQgd291bGQgYmUNCj5ncmVhdCwgSU1P
LCB0byBtYWtlIHRoaXMgYSBtb3VudCBwYXJhbWV0ZXIgc2luY2UgaXQgaXMgdHJhbnNwb3J0LSBh
bmQNCj5hbHNvIHNlcnZlci1kZXBlbmRlbnQsIGJ1dCBjdXJyZW50bHkgSSBiZWxpZXZlIGl0IGlz
IGp1c3QgYSBrZXJuZWwNCj5SUEMgbW9kdWxlIHBhcmFtZXRlci4NCg0KVGhhdCB3b3VsZCBiZSBh
IG1vdW50IHBhcmFtZXRlciB3aGljaCB3b3VsZCBvbmx5IGFwcGx5IHRvIFJETUEgc2luY2UgZXZl
cnl0aGluZyBlbHNlIHVzZXMgZHluYW1pYyBzbG90IGFsbG9jYXRpb24gdGhlc2UgZGF5cy4gSeKA
mWQgcHJlZmVyIHRvIGF2b2lkIHRoYXQuDQoNClRyb25kDQoNCg==
On Wed, Jun 08, 2016 at 03:06:56PM +0000, Trond Myklebust wrote:
> That would be a mount parameter which would only apply to RDMA since
> everything else uses dynamic slot allocation these days. I?d prefer to
> avoid that.
The only reason RDMA cannot allocate MRs on the fly is
performance.. Maybe a dynamic scheme is the right answer..
Jason
DQoNCk9uIDYvOC8xNiwgMTM6NDAsICJKYXNvbiBHdW50aG9ycGUiIDxqZ3VudGhvcnBlQG9ic2lk
aWFucmVzZWFyY2guY29tPiB3cm90ZToNCg0KPk9uIFdlZCwgSnVuIDA4LCAyMDE2IGF0IDAzOjA2
OjU2UE0gKzAwMDAsIFRyb25kIE15a2xlYnVzdCB3cm90ZToNCj4NCj4+ICAgIFRoYXQgd291bGQg
YmUgYSBtb3VudCBwYXJhbWV0ZXIgd2hpY2ggd291bGQgb25seSBhcHBseSB0byBSRE1BIHNpbmNl
DQo+PiAgICBldmVyeXRoaW5nIGVsc2UgdXNlcyBkeW5hbWljIHNsb3QgYWxsb2NhdGlvbiB0aGVz
ZSBkYXlzLiBJw6JkIHByZWZlciB0bw0KPj4gICAgYXZvaWQgdGhhdC4NCj4NCj5UaGUgb25seSBy
ZWFzb24gUkRNQSBjYW5ub3QgYWxsb2NhdGUgTVJzIG9uIHRoZSBmbHkgaXMNCj5wZXJmb3JtYW5j
ZS4uIE1heWJlIGEgZHluYW1pYyBzY2hlbWUgaXMgdGhlIHJpZ2h0IGFuc3dlci4uDQo+DQoNCkl0
IHNob3VsZCBiZSBlYXN5IHRvIGRvIGZvciBORlN2NC54ICh4PjApLCB3aGVyZSB0aGUgTkZTIHBy
b3RvY29sIGFscmVhZHkgbWFuYWdlcyB0aGUgbnVtYmVyIG9mIHNsb3RzLiBUaGUgUlBDIGxheWVy
IGNvdWxkIGJlIG5vdGlmaWVkIHdoZW4gaXQgaXMgT0sgdG8gcmV0aXJlIHNsb3RzIHRoYXQgYXJl
IG5vIGxvbmdlciBpbiB1c2UuDQoNCg0KDQo=
Hi Chuck,
One nitpicky comment below:
On 06/07/2016 03:46 PM, Chuck Lever wrote:
> Clean up: Now that ALLPHYSICAL is gone, there is some buffer
> initialization code that is common to remaining memory registration
> modes. Move it into rpcrdma_buffer_create.
>
> Signed-off-by: Chuck Lever <[email protected]>
> ---
> net/sunrpc/xprtrdma/fmr_ops.c | 4 ----
> net/sunrpc/xprtrdma/frwr_ops.c | 4 ----
> net/sunrpc/xprtrdma/verbs.c | 3 +++
> 3 files changed, 3 insertions(+), 8 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
> index 6326ebe..b8952c7 100644
> --- a/net/sunrpc/xprtrdma/fmr_ops.c
> +++ b/net/sunrpc/xprtrdma/fmr_ops.c
> @@ -126,10 +126,6 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
> struct rpcrdma_mw *r;
> int i, rc;
>
> - spin_lock_init(&buf->rb_mwlock);
> - INIT_LIST_HEAD(&buf->rb_mws);
> - INIT_LIST_HEAD(&buf->rb_all);
> -
> i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1);
> i += 2; /* head + tail */
> i *= buf->rb_max_requests; /* one set for each RPC slot */
> diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
> index c094754..6e0e5e8 100644
> --- a/net/sunrpc/xprtrdma/frwr_ops.c
> +++ b/net/sunrpc/xprtrdma/frwr_ops.c
> @@ -355,10 +355,6 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
> struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
> int i;
>
> - spin_lock_init(&buf->rb_mwlock);
> - INIT_LIST_HEAD(&buf->rb_mws);
> - INIT_LIST_HEAD(&buf->rb_all);
> -
> i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
> i += 2; /* head + tail */
> i *= buf->rb_max_requests; /* one set for each RPC slot */
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 7f09162..ecd494a 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -822,6 +822,9 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>
> buf->rb_max_requests = r_xprt->rx_data.max_requests;
> buf->rb_bc_srv_max_requests = 0;
> + spin_lock_init(&buf->rb_mwlock);
> + INIT_LIST_HEAD(&buf->rb_mws);
> + INIT_LIST_HEAD(&buf->rb_all);
> spin_lock_init(&buf->rb_lock);
Can you rearrange this so the two spin_lock_inits() are next to each other?
Thanks,
Anna
> atomic_set(&buf->rb_credits, 1);
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to [email protected]
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
> On Jun 8, 2016, at 1:40 PM, Jason Gunthorpe <[email protected]> wrote:
>
> On Wed, Jun 08, 2016 at 03:06:56PM +0000, Trond Myklebust wrote:
>
>> That would be a mount parameter which would only apply to RDMA since
>> everything else uses dynamic slot allocation these days. Iād prefer to
>> avoid that.
>
> The only reason RDMA cannot allocate MRs on the fly is
> performance.. Maybe a dynamic scheme is the right answer..
Dynamically managing the MR free list should be straightforward:
1. Allocate a minimum number of MRs at transport creation time
2. If workload causes the transport to exhaust its supply of
MRs, kick a worker thread to create some more
3. Maybe add a shrinker that can trim the MR list when the
system is under memory pressure
--
Chuck Lever
On 6/8/2016 1:53 PM, Chuck Lever wrote:
>
>> On Jun 8, 2016, at 1:40 PM, Jason Gunthorpe <[email protected]> wrote:
>>
>> On Wed, Jun 08, 2016 at 03:06:56PM +0000, Trond Myklebust wrote:
>>
>>> That would be a mount parameter which would only apply to RDMA since
>>> everything else uses dynamic slot allocation these days. Iād prefer to
>>> avoid that.
>>
>> The only reason RDMA cannot allocate MRs on the fly is
>> performance.. Maybe a dynamic scheme is the right answer..
>
> Dynamically managing the MR free list should be straightforward:
>
> 1. Allocate a minimum number of MRs at transport creation time
>
> 2. If workload causes the transport to exhaust its supply of
> MRs, kick a worker thread to create some more
>
> 3. Maybe add a shrinker that can trim the MR list when the
> system is under memory pressure
SMB Direct in Windows Server 2012R2 does this, and it works
extremely well. We reduced MR demand by approximately a factor
of 50, since it's rare that many connections are active at
high queue depths.
Tom.