2007-07-01 23:12:25

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 00/10] knfsd: RMDA Transport Driver v2

This patchset implements the NFS-RDMA transport driver for RDMA. It is
a follow-up to the earlier patchset and reflects recommendations by
various reviewers. Most notable improvements are as follows:

- Moved all WR processing into a tasklet reducing the number of irqsave
locks to 1.

- Added support for RDMA_READ completion processing that avoids wait
in an NFSD thread.

- Integration with the transport switch patchset changes from SGI

- Support for dynamically adding listeners via writes to the portlist file

I looked at making the context allocation fixed. This would require 161
pages of contexts for 32k wsize/rsize and the default RQ and SQ depths.
Big data would drive this number up to 5152 pages. This seemed
untenable to me. Maybe a pool shared across all mounts?

--
Signed-off-by: Tom Tucker <[email protected]>

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs


2007-07-11 04:12:24

by Tom Tucker

[permalink] [raw]
Subject: Re: [RFC,PATCH 10/10] rdma: Kconfig

Agreed, this help needs updating

Tom

On Jul 10, 2007, at 10:04 AM, James Lentini <[email protected]> wrote:

>
>
> On Sun, 1 Jul 2007, Tom Tucker wrote:
>
>>
>> Add NFS_RDMA as an option to the Kconfig file.
>>
>> Signed-off-by: Tom Tucker <[email protected]>
>> ---
>>
>> fs/Kconfig | 7 +++++++
>> 1 files changed, 7 insertions(+), 0 deletions(-)
>>
>> diff --git a/fs/Kconfig b/fs/Kconfig
>> index 0fa0c11..718006c 100644
>> --- a/fs/Kconfig
>> +++ b/fs/Kconfig
>> @@ -1652,6 +1652,13 @@ config NFSD
>> To compile the NFS server support as a module, choose M here:
>> the
>> module will be called nfsd. If unsure, say N.
>>
>> +config NFSD_RDMA
>> + tristate "Provide NFS server over RDMA support (EXPERIMENTAL)"
>> + depends on SUNRPC && INFINIBAND && EXPERIMENTAL
>> + help
>> + If you want your NFS server to support RDMA connections, say
>> Y here
>> + If unsure, say N.
>> +
>
> I suggest rewording this text now that the svcrdma code can be built
> as a module (Y, N, and m are now valid choices).
>
>> config NFSD_V2_ACL
>> bool
>> depends on NFSD

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-10 15:12:34

by James Lentini

[permalink] [raw]
Subject: Re: [RFC,PATCH 10/10] rdma: Kconfig



On Sun, 1 Jul 2007, Tom Tucker wrote:

>
> Add NFS_RDMA as an option to the Kconfig file.
>
> Signed-off-by: Tom Tucker <[email protected]>
> ---
>
> fs/Kconfig | 7 +++++++
> 1 files changed, 7 insertions(+), 0 deletions(-)
>
> diff --git a/fs/Kconfig b/fs/Kconfig
> index 0fa0c11..718006c 100644
> --- a/fs/Kconfig
> +++ b/fs/Kconfig
> @@ -1652,6 +1652,13 @@ config NFSD
> To compile the NFS server support as a module, choose M here: the
> module will be called nfsd. If unsure, say N.
>
> +config NFSD_RDMA
> + tristate "Provide NFS server over RDMA support (EXPERIMENTAL)"
> + depends on SUNRPC && INFINIBAND && EXPERIMENTAL
> + help
> + If you want your NFS server to support RDMA connections, say Y here
> + If unsure, say N.
> +

I suggest rewording this text now that the svcrdma code can be built
as a module (Y, N, and m are now valid choices).

> config NFSD_V2_ACL
> bool
> depends on NFSD

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:27

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 01/10] rdma: ONCRPC RDMA Header File


These are the core data types that are used to process the ONCRPC protocol
in the NFS-RDMA client and server.

Signed-off-by: Tom Talpey <[email protected]>
---

include/linux/sunrpc/rpc_rdma.h | 116 +++++++++++++++++++++++++++++++++++++++
1 files changed, 116 insertions(+), 0 deletions(-)

diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
new file mode 100644
index 0000000..0013a0d
--- /dev/null
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _LINUX_SUNRPC_RPC_RDMA_H
+#define _LINUX_SUNRPC_RPC_RDMA_H
+
+struct rpcrdma_segment {
+ uint32_t rs_handle; /* Registered memory handle */
+ uint32_t rs_length; /* Length of the chunk in bytes */
+ uint64_t rs_offset; /* Chunk virtual address or offset */
+};
+
+/*
+ * read chunk(s), encoded as a linked list.
+ */
+struct rpcrdma_read_chunk {
+ uint32_t rc_discrim; /* 1 indicates presence */
+ uint32_t rc_position; /* Position in XDR stream */
+ struct rpcrdma_segment rc_target;
+};
+
+/*
+ * write chunk, and reply chunk.
+ */
+struct rpcrdma_write_chunk {
+ struct rpcrdma_segment wc_target;
+};
+
+/*
+ * write chunk(s), encoded as a counted array.
+ */
+struct rpcrdma_write_array {
+ uint32_t wc_discrim; /* 1 indicates presence */
+ uint32_t wc_nchunks; /* Array count */
+ struct rpcrdma_write_chunk wc_array[0];
+};
+
+struct rpcrdma_msg {
+ uint32_t rm_xid; /* Mirrors the RPC header xid */
+ uint32_t rm_vers; /* Version of this protocol */
+ uint32_t rm_credit; /* Buffers requested/granted */
+ uint32_t rm_type; /* Type of message (enum rpcrdma_proc) */
+ union {
+
+ struct { /* no chunks */
+ uint32_t rm_empty[3]; /* 3 empty chunk lists */
+ } rm_nochunks;
+
+ struct { /* no chunks and padded */
+ uint32_t rm_align; /* Padding alignment */
+ uint32_t rm_thresh; /* Padding threshold */
+ uint32_t rm_pempty[3]; /* 3 empty chunk lists */
+ } rm_padded;
+
+ uint32_t rm_chunks[0]; /* read, write and reply chunks */
+
+ } rm_body;
+};
+
+#define RPCRDMA_HDRLEN_MIN 28
+
+enum rpcrdma_errcode {
+ ERR_VERS = 1,
+ ERR_CHUNK = 2
+};
+
+struct rpcrdma_err_vers {
+ uint32_t rdma_vers_low; /* Version range supported by peer */
+ uint32_t rdma_vers_high;
+};
+
+enum rpcrdma_proc {
+ RDMA_MSG = 0, /* An RPC call or reply msg */
+ RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
+ RDMA_MSGP = 2, /* An RPC call or reply msg with padding */
+ RDMA_DONE = 3, /* Client signals reply completion */
+ RDMA_ERROR = 4 /* An RPC RDMA encoding error */
+};
+
+#endif /* _LINUX_SUNRPC_RPC_RDMA_H */

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:28

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 02/10] rdma: sysctl for SVCRDMA transport module


Add sysctl flags for SVCRDMA transport module debug messages.

Signed-off-by: Tom Tucker <[email protected]>
---

include/linux/sunrpc/debug.h | 15 ++++++++++++++-
1 files changed, 14 insertions(+), 1 deletions(-)

diff --git a/include/linux/sunrpc/debug.h b/include/linux/sunrpc/debug.h
index 4c847d4..4ab8372 100644
--- a/include/linux/sunrpc/debug.h
+++ b/include/linux/sunrpc/debug.h
@@ -52,7 +52,7 @@ #endif
#define dprintk(args...) dfprintk(FACILITY, ## args)

#undef ifdebug
-#ifdef RPC_DEBUG
+#ifdef RPC_DEBUG
# define ifdebug(fac) if (unlikely(rpc_debug & RPCDBG_##fac))
# define dfprintk(fac, args...) do { ifdebug(fac) printk(args); } while(0)
# define RPC_IFDEBUG(x) x
@@ -89,6 +89,19 @@ enum {
CTL_MIN_RESVPORT,
CTL_MAX_RESVPORT,
CTL_TRANSPORTS,
+ CTL_SVCRDMA,
+ CTL_RDMA_MAX_REQUESTS,
+ CTL_RDMA_MAX_REQ_SIZE,
+ CTL_RDMA_ORD,
+ CTL_RDMA_STAT_RECV,
+ CTL_RDMA_STAT_READ,
+ CTL_RDMA_STAT_WRITE,
+ CTL_RDMA_STAT_SQ_STARVE,
+ CTL_RDMA_STAT_RQ_STARVE,
+ CTL_RDMA_STAT_RQ_POLL,
+ CTL_RDMA_STAT_RQ_PROD,
+ CTL_RDMA_STAT_SQ_POLL,
+ CTL_RDMA_STAT_SQ_PROD
};

#endif /* _LINUX_SUNRPC_DEBUG_H_ */

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:31

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 03/10] rdma: SVCRMDA Header File


This file defines the data types used by the SVCRDMA transport module.
The principle data structure is the transport specific extension to
the svcxprt structure.

Signed-off-by: Tom Tucker <[email protected]>
---

include/linux/sunrpc/svc_rdma.h | 261 +++++++++++++++++++++++++++++++++++++++
1 files changed, 261 insertions(+), 0 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
new file mode 100644
index 0000000..0bad94b
--- /dev/null
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -0,0 +1,261 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <[email protected]>
+ */
+
+#ifndef SVC_RDMA_H
+#define SVC_RDMA_H
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/svcsock.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#define SVCRDMA_DEBUG
+
+/* RPC/RDMA parameters */
+extern unsigned int svcrdma_ord;
+extern unsigned int svcrdma_max_requests;
+extern unsigned int svcrdma_max_req_size;
+extern unsigned int rdma_stat_recv;
+extern unsigned int rdma_stat_read;
+extern unsigned int rdma_stat_write;
+extern unsigned int rdma_stat_sq_starve;
+extern unsigned int rdma_stat_rq_starve;
+extern unsigned int rdma_stat_rq_poll;
+extern unsigned int rdma_stat_rq_prod;
+extern unsigned int rdma_stat_sq_poll;
+extern unsigned int rdma_stat_sq_prod;
+
+#define RPCRDMA_VERSION 1
+
+/*
+ * Contexts are built when an RDMA request is created and are a
+ * record of the resources that can be recovered when the request
+ * completes.
+ */
+struct svc_rdma_op_ctxt {
+ struct svc_rdma_op_ctxt *next;
+ struct xdr_buf arg;
+ struct list_head dto_q;
+ enum ib_wr_opcode wr_op;
+ enum ib_wc_status wc_status;
+ u32 byte_len;
+ struct svcxprt_rdma *xprt;
+ unsigned long flags;
+ enum dma_data_direction direction;
+ int count;
+ struct ib_sge sge[RPCSVC_MAXPAGES];
+ struct page *pages[RPCSVC_MAXPAGES];
+};
+
+#define RDMACTXT_F_READ_DONE 1
+#define RDMACTXT_F_LAST_CTXT 2
+
+struct svc_rdma_deferred_req {
+ struct svc_deferred_req req;
+ struct page *arg_page;
+ int arg_len;
+};
+
+struct svcxprt_rdma {
+ struct svc_sock sc_xprt; /* SVC transport structure */
+ struct rdma_cm_id *sc_cm_id; /* RDMA connection id */
+ struct list_head sc_accept_q; /* Conn. waiting accept */
+ int sc_ord; /* RDMA read limit */
+ wait_queue_head_t sc_read_wait;
+ int sc_max_sge;
+
+ int sc_sq_depth; /* Depth of SQ */
+ atomic_t sc_sq_count; /* Number of SQ WR on queue */
+
+ int sc_max_requests; /* Depth of RQ */
+ int sc_max_req_size; /* Size of each RQ WR buf */
+
+ struct ib_pd *sc_pd;
+
+ struct svc_rdma_op_ctxt *sc_ctxt_head;
+ int sc_ctxt_cnt;
+ int sc_ctxt_bump;
+ int sc_ctxt_max;
+ spinlock_t sc_ctxt_lock;
+ struct list_head sc_rq_dto_q;
+ spinlock_t sc_rq_dto_lock;
+ struct ib_qp *sc_qp;
+ struct ib_cq *sc_rq_cq;
+ struct ib_cq *sc_sq_cq;
+ struct ib_mr *sc_phys_mr; /* MR for server memory */
+
+ spinlock_t sc_lock;
+
+ wait_queue_head_t sc_send_wait; /* SQ exhaustion waitlist */
+ int sc_flags;
+ struct list_head sc_dto_q; /* DTO tasklet I/O pending */
+ struct list_head sc_read_complete_q;
+ spinlock_t sc_read_complete_lock;
+};
+/* sc_flags */
+#define RDMAXPRT_RQ_PENDING 1
+#define RDMAXPRT_SQ_PENDING 2
+
+#define RPCRDMA_LISTEN_BACKLOG 10
+/* The default ORD value is based on two outstanding full-size writes with a
+ * page size of 4k, or 32k * 2 ops / 4k = 16 outstanding RDMA_READ. */
+#define RPCRDMA_ORD (64/4)
+#define RPCRDMA_SQ_DEPTH_MULT 8
+#define RPCRDMA_MAX_THREADS 16
+#define RPCRDMA_MAX_REQUESTS 16
+#define RPCRDMA_MAX_REQ_SIZE 4096
+
+/* svc_rdma_marshal.c */
+extern void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *, int *, int *);
+extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg**, struct svc_rqst *);
+extern int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *);
+extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma*, struct rpcrdma_msg*,
+ enum rpcrdma_errcode, u32*);
+extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int);
+extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int);
+extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int,
+ u32, u64, u32);
+extern void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *,
+ struct rpcrdma_msg *,
+ struct rpcrdma_msg *,
+ enum rpcrdma_proc);
+extern int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *);
+
+/* svc_rdma_recvfrom.c */
+extern int svc_rdma_recvfrom(struct svc_rqst*);
+
+/* svc_rdma_sendto.c */
+extern int svc_rdma_sendto(struct svc_rqst*);
+
+/* svc_rdma_transport.c */
+extern int svc_rdma_send(struct svcxprt_rdma*, struct ib_send_wr*);
+extern int svc_rdma_send_error(struct svcxprt_rdma*, struct rpcrdma_msg*,
+ enum rpcrdma_errcode);
+struct page* svc_rdma_get_page(void);
+extern int svc_rdma_post_recv(struct svcxprt_rdma*);
+extern int svc_rdma_create_listen(struct svc_serv*, int, struct sockaddr*);
+extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
+extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
+extern void svc_sq_reap(struct svcxprt_rdma*);
+extern void svc_rq_reap(struct svcxprt_rdma*);
+
+/* svc_rdma.c */
+extern int svc_rdma_init(void);
+extern void svc_rdma_cleanup(void);
+
+/*
+ * Returns the address of the first read chunk or <nul> if no read chunk is
+ * present
+ */
+static inline struct rpcrdma_read_chunk *
+svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp)
+{
+ struct rpcrdma_read_chunk *ch =
+ (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+
+ if (ch->rc_discrim == 0)
+ return NULL;
+
+ return ch;
+}
+
+/*
+ * Returns the address of the first read write array element or <nul> if no
+ * write array list is present
+ */
+static inline struct rpcrdma_write_array *
+svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp)
+{
+ if (rmsgp->rm_body.rm_chunks[0] != 0
+ || rmsgp->rm_body.rm_chunks[1] == 0)
+ return NULL;
+
+ return (struct rpcrdma_write_array *)&rmsgp->rm_body.rm_chunks[1];
+}
+
+/*
+ * Returns the address of the first reply array element or <nul> if no
+ * reply array is present
+ */
+static inline struct rpcrdma_write_array *
+svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
+{
+ struct rpcrdma_read_chunk *rch;
+ struct rpcrdma_write_array *wr_ary;
+ struct rpcrdma_write_array *rp_ary;
+
+ /* XXX: Need to fix when reply list may occur with read-list and/or
+ * write list */
+ if (rmsgp->rm_body.rm_chunks[0] != 0 ||
+ rmsgp->rm_body.rm_chunks[1] != 0)
+ return NULL;
+
+ rch = svc_rdma_get_read_chunk(rmsgp);
+ if (rch) {
+ while (rch->rc_discrim)
+ rch ++;
+
+ /* The reply list follows an empty write array located
+ * at 'rc_position' here. The reply array is at rc_target.
+ */
+ rp_ary = (struct rpcrdma_write_array*)&rch->rc_target;
+
+ goto found_it;
+ }
+
+ wr_ary = svc_rdma_get_write_array(rmsgp);
+ if (wr_ary) {
+ rp_ary = (struct rpcrdma_write_array*)
+ &wr_ary->wc_array[wr_ary->wc_nchunks].wc_target.rs_length;
+
+ goto found_it;
+ }
+
+ /* No read list, no write list */
+ rp_ary = (struct rpcrdma_write_array*)
+ &rmsgp->rm_body.rm_chunks[2];
+
+ found_it:
+ if (rp_ary->wc_discrim == 0)
+ return NULL;
+
+ return rp_ary;
+}
+#endif

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:34

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 04/10] rdma: SVCRDMA Transport Module


This file implements the RDMA transport module initialization and
termination logic and registers the transport sysctl variables.

Signed-off-by: Tom Tucker <[email protected]>
---

net/sunrpc/svc_rdma.c | 270 +++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 270 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma.c b/net/sunrpc/svc_rdma.c
new file mode 100644
index 0000000..56168a2
--- /dev/null
+++ b/net/sunrpc/svc_rdma.c
@@ -0,0 +1,270 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <[email protected]>
+ */
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/fs.h>
+#include <linux/sysctl.h>
+#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCTRANS
+
+/* RPC/RDMA parameters */
+unsigned int svcrdma_ord = RPCRDMA_ORD;
+static unsigned int min_ord = 1;
+static unsigned int max_ord = 4096;
+unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
+static unsigned int min_max_requests = 4;
+static unsigned int max_max_requests = 16384;
+unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE;
+static unsigned int min_max_inline = 4096;
+static unsigned int max_max_inline = 65536;
+static unsigned int zero = 0;
+static unsigned int one = 1;
+
+unsigned int rdma_stat_recv = 0;
+unsigned int rdma_stat_read = 0;
+unsigned int rdma_stat_write = 0;
+unsigned int rdma_stat_sq_starve = 0;
+unsigned int rdma_stat_rq_starve = 0;
+unsigned int rdma_stat_rq_poll = 0;
+unsigned int rdma_stat_rq_prod = 0;
+unsigned int rdma_stat_sq_poll = 0;
+unsigned int rdma_stat_sq_prod = 0;
+
+extern struct svc_sock_ops svc_rdma_ops;
+
+static struct ctl_table_header *svcrdma_table_header;
+static ctl_table svcrdma_parm_table[] = {
+ {
+ .ctl_name = CTL_RDMA_MAX_REQUESTS,
+ .procname = "max_requests",
+ .data = &svcrdma_max_requests,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_max_requests,
+ .extra2 = &max_max_requests
+ },
+ {
+ .ctl_name = CTL_RDMA_MAX_REQ_SIZE,
+ .procname = "max_req_size",
+ .data = &svcrdma_max_req_size,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_max_inline,
+ .extra2 = &max_max_inline
+ },
+ {
+ .ctl_name = CTL_RDMA_ORD,
+ .procname = "max_outbound_read_requests",
+ .data = &svcrdma_ord,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &min_ord,
+ .extra2 = &max_ord,
+ },
+
+ {
+ .ctl_name = CTL_RDMA_STAT_READ,
+ .procname = "rdma_stat_read",
+ .data = &rdma_stat_read,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+ {
+ .ctl_name = CTL_RDMA_STAT_RECV,
+ .procname = "rdma_stat_recv",
+ .data = &rdma_stat_recv,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+ {
+ .ctl_name = CTL_RDMA_STAT_WRITE,
+ .procname = "rdma_stat_write",
+ .data = &rdma_stat_write,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+ {
+ .ctl_name = CTL_RDMA_STAT_SQ_STARVE,
+ .procname = "rdma_stat_sq_starve",
+ .data = &rdma_stat_sq_starve,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+ {
+ .ctl_name = CTL_RDMA_STAT_RQ_STARVE,
+ .procname = "rdma_stat_rq_starve",
+ .data = &rdma_stat_rq_starve,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+ {
+ .ctl_name = CTL_RDMA_STAT_RQ_POLL,
+ .procname = "rdma_stat_rq_poll",
+ .data = &rdma_stat_rq_poll,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+ {
+ .ctl_name = CTL_RDMA_STAT_RQ_PROD,
+ .procname = "rdma_stat_rq_prod",
+ .data = &rdma_stat_rq_prod,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+ {
+ .ctl_name = CTL_RDMA_STAT_SQ_POLL,
+ .procname = "rdma_stat_sq_poll",
+ .data = &rdma_stat_sq_poll,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+ {
+ .ctl_name = CTL_RDMA_STAT_SQ_PROD,
+ .procname = "rdma_stat_sq_prod",
+ .data = &rdma_stat_sq_prod,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .strategy = &sysctl_intvec,
+ .extra1 = &zero,
+ .extra2 = &one,
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+static ctl_table svcrdma_table[] = {
+ {
+ .ctl_name = CTL_SVCRDMA,
+ .procname = "svc_rdma",
+ .mode = 0555,
+ .child = svcrdma_parm_table
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+static ctl_table svcrdma_root_table[] = {
+ {
+ .ctl_name = CTL_SUNRPC,
+ .procname = "sunrpc",
+ .mode = 0555,
+ .child = svcrdma_table
+ },
+ {
+ .ctl_name = 0,
+ },
+};
+
+void svc_rdma_cleanup(void)
+{
+ dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
+ if (svcrdma_table_header) {
+ unregister_sysctl_table(svcrdma_table_header);
+ svcrdma_table_header = NULL;
+ }
+ svc_unregister_transport(&svc_rdma_ops);
+}
+
+int svc_rdma_init(void)
+{
+ dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
+ dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord);
+ dprintk("\tmax_requests : %d\n", svcrdma_max_requests);
+ dprintk("\tsq_depth : %d\n",
+ svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
+ dprintk("\tmax_inline : %d\n", svcrdma_max_req_size);
+ if (!svcrdma_table_header) {
+ svcrdma_table_header =
+ register_sysctl_table(svcrdma_root_table);
+ }
+ /* Register RDMA with the SVC transport switch */
+ svc_register_transport(&svc_rdma_ops);
+ return 0;
+}
+MODULE_AUTHOR("Tom Tucker <[email protected]>");
+MODULE_DESCRIPTION("SVC RDMA Transport");
+MODULE_LICENSE("Dual BSD/GPL");
+module_init(svc_rdma_init);
+module_exit(svc_rdma_cleanup);

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:42

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 08/10] rdma: ONCRPC RDMA protocol marshalling


This file implements the ONCRPC RDMA protocol marshelling and
unmarshalling logic.

Signed-off-by: Tom Tucker <[email protected]>
---

net/sunrpc/svc_rdma_marshal.c | 424 +++++++++++++++++++++++++++++++++++++++++
1 files changed, 424 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma_marshal.c b/net/sunrpc/svc_rdma_marshal.c
new file mode 100644
index 0000000..0a34efb
--- /dev/null
+++ b/net/sunrpc/svc_rdma_marshal.c
@@ -0,0 +1,424 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <[email protected]>
+ */
+
+#include <asm/semaphore.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/err.h>
+#include <linux/time.h>
+
+#include <rdma/rdma_cm.h>
+
+#include <linux/sunrpc/svcsock.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <linux/net.h>
+#include <net/sock.h>
+#include <asm/io.h>
+#include <asm/unaligned.h>
+#include <rdma/rdma_cm.h>
+#include <rdma/ib_verbs.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCTRANS
+
+/*
+ * Decodes a read chunk list. The expected format is as follows:
+ * descrim : xdr_one
+ * position : u32 offset into XDR stream
+ * handle : u32 RKEY
+ * . . .
+ * end-of-list: xdr_zero
+ */
+static u32 *decode_read_list(u32 *va, u32 *vaend)
+{
+ struct rpcrdma_read_chunk *ch = (struct rpcrdma_read_chunk*)va;
+
+ while (ch->rc_discrim != xdr_zero) {
+ u64 ch_offset;
+
+ if (((unsigned long)ch + sizeof(struct rpcrdma_read_chunk)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: vaend=%p, ch=%p\n", vaend, ch);
+ return NULL;
+ }
+
+ ch->rc_discrim = ntohl(ch->rc_discrim);
+ ch->rc_position = ntohl(ch->rc_position);
+ ch->rc_target.rs_handle = ntohl(ch->rc_target.rs_handle);
+ ch->rc_target.rs_length = ntohl(ch->rc_target.rs_length);
+ va = (u32*)&ch->rc_target.rs_offset;
+ xdr_decode_hyper(va, &ch_offset);
+ put_unaligned(ch_offset, (u64*)va);
+ ch++;
+ }
+ return (u32*)&ch->rc_position;
+}
+
+/*
+ * Determine number of chunks and total bytes in chunk list. The chunk
+ * list has already been verified to fit within the RPCRDMA header.
+ */
+void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *ch,
+ int *ch_count, int *byte_count)
+{
+ /* compute the number of bytes represented by read chunks */
+ *byte_count = 0;
+ *ch_count = 0;
+ for (; ch->rc_discrim != 0; ch++) {
+ *byte_count = *byte_count + ch->rc_target.rs_length;
+ *ch_count = *ch_count + 1;
+ }
+}
+
+/*
+ * Decodes a write chunk list. The expected format is as follows:
+ * descrim : xdr_one
+ * nchunks : <count>
+ * handle : u32 RKEY ---+
+ * length : u32 <len of segment> |
+ * offset : remove va + <count>
+ * . . . |
+ * ---+
+ */
+static u32 *decode_write_list(u32 *va, u32 *vaend)
+{
+ int ch_no;
+ struct rpcrdma_write_array *ary =
+ (struct rpcrdma_write_array*)va;
+
+ /* Check for not write-array */
+ if (ary->wc_discrim == xdr_zero)
+ return (u32*)&ary->wc_nchunks;
+
+ if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+ return NULL;
+ }
+ ary->wc_discrim = ntohl(ary->wc_discrim);
+ ary->wc_nchunks = ntohl(ary->wc_nchunks);
+ if (((unsigned long)&ary->wc_array[0] +
+ (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+ ary, ary->wc_nchunks, vaend);
+ return NULL;
+ }
+ for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+ u64 ch_offset;
+
+ ary->wc_array[ch_no].wc_target.rs_handle =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+ ary->wc_array[ch_no].wc_target.rs_length =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+ va = (u32*)&ary->wc_array[ch_no].wc_target.rs_offset;
+ xdr_decode_hyper(va, &ch_offset);
+ put_unaligned(ch_offset, (u64*)va);
+ }
+
+ /*
+ * rs_length is the 2nd 4B field in wc_target and taking its
+ * address skips the list terminator
+ */
+ return (u32*)&ary->wc_array[ch_no].wc_target.rs_length;
+}
+
+static u32 *decode_reply_array(u32 *va, u32 *vaend)
+{
+ int ch_no;
+ struct rpcrdma_write_array *ary =
+ (struct rpcrdma_write_array*)va;
+
+ /* Check for no reply-array */
+ if (ary->wc_discrim == xdr_zero)
+ return (u32*)&ary->wc_nchunks;
+
+ if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+ return NULL;
+ }
+ ary->wc_discrim = ntohl(ary->wc_discrim);
+ ary->wc_nchunks = ntohl(ary->wc_nchunks);
+ if (((unsigned long)&ary->wc_array[0] +
+ (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+ ary, ary->wc_nchunks, vaend);
+ return NULL;
+ }
+ for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+ u64 ch_offset;
+
+ ary->wc_array[ch_no].wc_target.rs_handle =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+ ary->wc_array[ch_no].wc_target.rs_length =
+ ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+ va = (u32*)&ary->wc_array[ch_no].wc_target.rs_offset;
+ xdr_decode_hyper(va, &ch_offset);
+ put_unaligned(ch_offset, (u64*)va);
+ }
+
+ return (u32*)&ary->wc_array[ch_no];
+}
+
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req, struct svc_rqst *rqstp)
+{
+ struct rpcrdma_msg *rmsgp = NULL;
+ u32 *va;
+ u32 *vaend;
+ u32 hdr_len;
+
+ rmsgp = (struct rpcrdma_msg*)rqstp->rq_arg.head[0].iov_base;
+
+ /* Verify that there's enough bytes for header + something */
+ if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+ dprintk("svcrdma: header too short = %d\n",
+ rqstp->rq_arg.len);
+ return -EINVAL;
+ }
+
+ /* Decode the header */
+ rmsgp->rm_xid = ntohl(rmsgp->rm_xid);
+ rmsgp->rm_vers = ntohl(rmsgp->rm_vers);
+ rmsgp->rm_credit = ntohl(rmsgp->rm_credit);
+ rmsgp->rm_type = ntohl(rmsgp->rm_type);
+
+ if (rmsgp->rm_vers != RPCRDMA_VERSION)
+ return -ENOSYS;
+
+ /* Pull in the extra for the padded case and bump our pointer */
+ if (rmsgp->rm_type == RDMA_MSGP) {
+ int hdrlen;
+ rmsgp->rm_body.rm_padded.rm_align =
+ ntohl(rmsgp->rm_body.rm_padded.rm_align);
+ rmsgp->rm_body.rm_padded.rm_thresh =
+ ntohl(rmsgp->rm_body.rm_padded.rm_thresh);
+
+ va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+ if (hdrlen > rqstp->rq_arg.len)
+ return -EINVAL;
+ return hdrlen;
+ }
+
+ /* The chunk list may contain either a read chunk list or a write
+ * chunk list and a reply chunk list.
+ */
+ va = &rmsgp->rm_body.rm_chunks[0];
+ vaend = (u32*)((unsigned long)rmsgp + rqstp->rq_arg.len);
+ va = decode_read_list(va,vaend);
+ if (!va)
+ return -EINVAL;
+ va = decode_write_list(va, vaend);
+ if (!va)
+ return -EINVAL;
+ va = decode_reply_array(va,vaend);
+ if (!va)
+ return -EINVAL;
+
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdr_len = (unsigned long)va - (unsigned long)rmsgp;
+ rqstp->rq_arg.head[0].iov_len -= hdr_len;
+
+ *rdma_req = rmsgp;
+ return hdr_len;
+}
+
+int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *rqstp)
+{
+ struct rpcrdma_msg *rmsgp = NULL;
+ struct rpcrdma_read_chunk *ch;
+ struct rpcrdma_write_array *ary;
+ u32 *va;
+ u32 hdrlen;
+
+ dprintk("svcrdma: processing deferred RDMA header on rqstp=%p\n",
+ rqstp);
+ rmsgp = (struct rpcrdma_msg*)rqstp->rq_arg.head[0].iov_base;
+
+ /* Pull in the extra for the padded case and bump our pointer */
+ if (rmsgp->rm_type == RDMA_MSGP) {
+ va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+ return hdrlen;
+ }
+
+ /*
+ * Skip all chunks to find RPC msg. These were previously processed
+ */
+ va = &rmsgp->rm_body.rm_chunks[0];
+
+ /* Skip read-list */
+ for (ch = (struct rpcrdma_read_chunk*)va;
+ ch->rc_discrim != xdr_zero; ch++);
+ va = (u32*)&ch->rc_position;
+
+ /* Skip write-list */
+ ary = (struct rpcrdma_write_array*)va;
+ if (ary->wc_discrim == xdr_zero)
+ va = (u32*)&ary->wc_nchunks;
+ else
+ /*
+ * rs_length is the 2nd 4B field in wc_target and taking its
+ * address skips the list terminator
+ */
+ va = (u32*)&ary->wc_array[ary->wc_nchunks].wc_target.rs_length;
+
+ /* Skip reply-array */
+ ary = (struct rpcrdma_write_array*)va;
+ if (ary->wc_discrim == xdr_zero)
+ va = (u32*)&ary->wc_nchunks;
+ else
+ va = (u32*)&ary->wc_array[ary->wc_nchunks];
+
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (unsigned long)va - (unsigned long)rmsgp;
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+
+ return hdrlen;
+}
+
+int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err, u32 *va)
+{
+ u32* startp = va;
+
+ *va++ = htonl(rmsgp->rm_xid);
+ *va++ = htonl(rmsgp->rm_vers);
+ *va++ = htonl(xprt->sc_max_requests);
+ *va++ = htonl(RDMA_ERROR);
+ *va++ = htonl(err);
+ if (err == ERR_VERS) {
+ *va++ = htonl(RPCRDMA_VERSION);
+ *va++ = htonl(RPCRDMA_VERSION);
+ }
+
+ return (int)((unsigned long)va - (unsigned long)startp);
+}
+
+int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *rmsgp)
+{
+ struct rpcrdma_write_array *wr_ary;
+
+ /* There is no read-list in a reply */
+
+ /* skip write list */
+ wr_ary = (struct rpcrdma_write_array *)
+ &rmsgp->rm_body.rm_chunks[1];
+ if (wr_ary->wc_discrim)
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)].
+ wc_target.rs_length;
+ else
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_nchunks;
+
+ /* skip reply array */
+ if (wr_ary->wc_discrim)
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)];
+ else
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_nchunks;
+
+ return (unsigned long) wr_ary - (unsigned long) rmsgp;
+}
+
+void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
+{
+ struct rpcrdma_write_array *ary;
+
+ /* no read-list */
+ rmsgp->rm_body.rm_chunks[0] = xdr_zero;
+
+ /* write-array discrim */
+ ary = (struct rpcrdma_write_array *)
+ &rmsgp->rm_body.rm_chunks[1];
+ ary->wc_discrim = xdr_one;
+ ary->wc_nchunks = htonl(chunks);
+
+ /* write-list terminator */
+ ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
+
+ /* reply-array discriminator */
+ ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
+}
+
+void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
+ int chunks)
+{
+ ary->wc_discrim = xdr_one;
+ ary->wc_nchunks = htonl(chunks);
+}
+
+void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary, int chunk_no,
+ u32 rs_handle, u64 rs_offset, u32 write_len)
+{
+ struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
+ seg->rs_handle = htonl(rs_handle);
+ seg->rs_length = htonl(write_len);
+ xdr_encode_hyper((u32*) &seg->rs_offset, rs_offset);
+}
+
+void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ enum rpcrdma_proc rdma_type)
+{
+ rdma_resp->rm_xid = htonl(rdma_argp->rm_xid);
+ rdma_resp->rm_vers = htonl(rdma_argp->rm_vers);
+ rdma_resp->rm_credit = htonl(xprt->sc_max_requests);
+ rdma_resp->rm_type = htonl(rdma_type);
+
+ /* Encode <nul> chunks lists */
+ rdma_resp->rm_body.rm_chunks[0] = xdr_zero;
+ rdma_resp->rm_body.rm_chunks[1] = xdr_zero;
+ rdma_resp->rm_body.rm_chunks[2] = xdr_zero;
+}
+

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:42

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 07/10] rdma: SVCRDMA sendto


This file implements the RDMA transport sendto function. A RPC reply
on an RDMA transport consists of some number of RDMA_WRITE requests followed
by an RDMA_SEND request. The sendto function parses the ONCRPC RDMA reply
header to determine how to send the reply back to the client. The send
queue is sized so as to be able to send complete replies for requests in
most cases. In the event that there are not enough SQ WR slots to reply, e.g.
big data, the send will block the NFSD thread. The I/O callback functions
in svc_rdma_transport.c that reap WR completions wake any waiters blocked
on the SQ.

Signed-off-by: Tom Tucker <[email protected]>
---

net/sunrpc/svc_rdma_sendto.c | 515 ++++++++++++++++++++++++++++++++++++++++++
1 files changed, 515 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma_sendto.c b/net/sunrpc/svc_rdma_sendto.c
new file mode 100644
index 0000000..a15b456
--- /dev/null
+++ b/net/sunrpc/svc_rdma_sendto.c
@@ -0,0 +1,515 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <[email protected]>
+ */
+
+#include <asm/semaphore.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/err.h>
+#include <linux/time.h>
+
+#include <linux/sunrpc/svcsock.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/mm.h> /* num_physpages */
+#include <linux/spinlock.h>
+#include <linux/net.h>
+#include <net/sock.h>
+#include <asm/io.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCTRANS
+
+/* Encode an XDR as an array of IB SGE
+ *
+ * Assumptions:
+ * - head[0] is physically contiguous.
+ * - tail[0] is physically contiguous.
+ * - pages[] is not physically or virtually contigous and consists of
+ * PAGE_SIZE elements.
+ *
+ * Output:
+ * SGE[0] reserved for RCPRDMA header
+ * SGE[1] data from xdr->head[]
+ * SGE[2..sge_count-2] data from xdr->pages[]
+ * SGE[sge_count-1] data from xdr->tail.
+ *
+ */
+static struct ib_sge *xdr_to_sge(struct svcxprt_rdma *xprt,
+ struct xdr_buf *xdr,
+ struct ib_sge *sge,
+ int *sge_count)
+{
+ /* Max we need is the length of the XDR / pagesize + one for
+ * head + one for tail + one for RPCRDMA header
+ */
+ int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
+int sge_no;
+ u32 byte_count = xdr->len;
+ u32 sge_bytes;
+ u32 page_bytes;
+ int page_off;
+ int page_no;
+
+ /* Skip the first sge, this is for the RPCRDMA header */
+ sge_no = 1;
+
+ /* Head SGE */
+ sge[sge_no].addr = ib_dma_map_single(xprt->sc_cm_id->device,
+ xdr->head[0].iov_base,
+ xdr->head[0].iov_len,
+ DMA_TO_DEVICE);
+ sge_bytes = min_t(u32, byte_count, xdr->head[0].iov_len);
+ byte_count -= sge_bytes;
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ sge_no ++;
+
+ /* pages SGE */
+ page_no = 0;
+ page_bytes = xdr->page_len;
+ page_off = xdr->page_base;
+ while (byte_count && page_bytes) {
+ sge_bytes = min_t(u32, byte_count, (PAGE_SIZE-page_off));
+ sge[sge_no].addr =
+ ib_dma_map_page(xprt->sc_cm_id->device,
+ xdr->pages[page_no], page_off, sge_bytes,
+ DMA_TO_DEVICE);
+ sge_bytes = min(sge_bytes, page_bytes);
+ byte_count -= sge_bytes;
+ page_bytes -= sge_bytes;
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+
+ sge_no ++;
+ page_no ++;
+ page_off = 0; /* reset for next time through loop */
+ }
+
+ /* Tail SGE */
+ if (byte_count && xdr->tail[0].iov_len) {
+ sge[sge_no].addr =
+ ib_dma_map_single(xprt->sc_cm_id->device,
+ xdr->tail[0].iov_base,
+ xdr->tail[0].iov_len,
+ DMA_TO_DEVICE);
+ sge_bytes = min_t(u32, byte_count, xdr->tail[0].iov_len);
+ byte_count -= sge_bytes;
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ sge_no ++;
+ }
+
+ BUG_ON(sge_no > sge_max);
+ BUG_ON(byte_count != 0);
+
+ *sge_count = sge_no;
+ return sge;
+}
+
+
+/* Assumptions:
+ * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
+ */
+static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
+ u32 rmr, u64 to,
+ u32 xdr_off, int write_len,
+ struct ib_sge *xdr_sge, int sge_count)
+{
+ struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+ struct ib_send_wr write_wr;
+ struct ib_sge *sge;
+ int xdr_sge_no;
+ int sge_no;
+ int sge_bytes;
+ int sge_off;
+ int bc;
+ struct svc_rdma_op_ctxt *ctxt;
+ int ret = 0;
+
+ BUG_ON(sge_count >= 32);
+ dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, write_len=%d, "
+ "xdr_sge=%p, sge_count=%d\n",
+ rmr, to, xdr_off, write_len, xdr_sge, sge_count);
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->count = 0;
+ tmp_sge_ctxt = svc_rdma_get_context(xprt);
+ sge = tmp_sge_ctxt->sge;
+
+ /* Find the SGE associated with xdr_off */
+ for (bc = xdr_off, xdr_sge_no=1; bc && xdr_sge_no < sge_count;
+ xdr_sge_no++) {
+ if (xdr_sge[xdr_sge_no].length > bc)
+ break;
+ bc -= xdr_sge[xdr_sge_no].length;
+ }
+
+ sge_off = bc;
+ bc = write_len;
+ sge_no = 0;
+
+ /* Copy the remaining SGE */
+ while (bc != 0 && xdr_sge_no < sge_count) {
+ sge[sge_no].addr = xdr_sge[xdr_sge_no].addr + sge_off;
+ sge[sge_no].lkey = xdr_sge[xdr_sge_no].lkey;
+ sge_bytes = min((size_t)bc,
+ (size_t)(xdr_sge[xdr_sge_no].length-sge_off));
+ sge[sge_no].length = sge_bytes;
+
+ sge_off = 0;
+ sge_no ++;
+ xdr_sge_no ++;
+ bc -= sge_bytes;
+ }
+
+ BUG_ON(bc != 0);
+ BUG_ON(xdr_sge_no > sge_count);
+
+ /* Prepare WRITE WR */
+ memset(&write_wr, 0, sizeof write_wr);
+ ctxt->wr_op = IB_WR_RDMA_WRITE;
+ write_wr.wr_id = (unsigned long)ctxt;
+ write_wr.sg_list = &sge[0];
+ write_wr.num_sge = sge_no;
+ write_wr.opcode = IB_WR_RDMA_WRITE;
+ write_wr.send_flags = IB_SEND_SIGNALED;
+ write_wr.wr.rdma.rkey = rmr;
+ write_wr.wr.rdma.remote_addr = to;
+
+ /* Post It */
+ rdma_stat_write ++;
+ if (svc_rdma_send(xprt, &write_wr)) {
+ svc_rdma_put_context(ctxt,1);
+ /* Fatal error, close transport */
+ ret = -EIO;
+ }
+ svc_rdma_put_context(tmp_sge_ctxt, 0);
+ return ret;
+}
+
+static int send_write_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rqst *rqstp,
+ struct ib_sge *sge,
+ int sge_count)
+{
+ u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+ int write_len;
+ int max_write;
+ u32 xdr_off;
+ int chunk_off;
+ int chunk_no;
+ struct rpcrdma_write_array *arg_ary;
+ struct rpcrdma_write_array *res_ary;
+ int ret;
+
+ arg_ary = svc_rdma_get_write_array(rdma_argp);
+ if (!arg_ary)
+ return 0;
+ res_ary = (struct rpcrdma_write_array*)
+ &rdma_resp->rm_body.rm_chunks[1];
+
+ max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+ /* Write chunks start at the pagelist */
+ for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
+ xfer_len && chunk_no < arg_ary->wc_nchunks;
+ chunk_no++) {
+ struct rpcrdma_segment *arg_ch;
+ u64 rs_offset;
+
+ arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
+ write_len = min(xfer_len, arg_ch->rs_length);
+
+ /* Prepare the response chunk given the length actually
+ * written */
+ rs_offset = get_unaligned(&(arg_ch->rs_offset));
+ svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+ arg_ch->rs_handle,
+ rs_offset,
+ write_len);
+ chunk_off = 0;
+ while (write_len) {
+ int this_write;
+ this_write = min(write_len, max_write);
+ ret = send_write(xprt, rqstp,
+ arg_ch->rs_handle,
+ rs_offset + chunk_off,
+ xdr_off,
+ this_write,
+ sge,
+ sge_count);
+ if (ret) {
+ dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n", ret);
+ return -EIO;
+ }
+ chunk_off += this_write;
+ xdr_off += this_write;
+ xfer_len -= this_write;
+ write_len -= this_write;
+ }
+ }
+ /* Update the req with the number of chunks actually used */
+ svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
+
+ return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+}
+
+static int send_reply_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rqst *rqstp,
+ struct ib_sge *sge,
+ int sge_count)
+{
+ u32 xfer_len = rqstp->rq_res.len;
+ int write_len;
+ int max_write;
+ u32 xdr_off;
+ int chunk_no;
+ int chunk_off;
+ struct rpcrdma_segment *ch;
+ struct rpcrdma_write_array *arg_ary;
+ struct rpcrdma_write_array *res_ary;
+ int ret;
+
+ arg_ary = svc_rdma_get_reply_array(rdma_argp);
+ if (!arg_ary)
+ return 0;
+ /* XXX: need to fix when reply lists occur with read-list and or
+ * write-list */
+ res_ary = (struct rpcrdma_write_array*)
+ &rdma_resp->rm_body.rm_chunks[2];
+
+ max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+ /* xdr offset starts at RPC message */
+ for (xdr_off = 0, chunk_no = 0;
+ xfer_len && chunk_no < arg_ary->wc_nchunks;
+ chunk_no++) {
+ u64 rs_offset;
+ ch = &arg_ary->wc_array[chunk_no].wc_target;
+ write_len = min(xfer_len, ch->rs_length);
+
+
+ /* Prepare the reply chunk given the length actually
+ * written */
+ rs_offset = get_unaligned(&(ch->rs_offset));
+ svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+ ch->rs_handle,rs_offset,
+ write_len);
+ chunk_off = 0;
+ while (write_len) {
+ int this_write;
+
+ this_write = min(write_len, max_write);
+ ret = send_write(xprt, rqstp,
+ ch->rs_handle,
+ rs_offset + chunk_off,
+ xdr_off,
+ this_write,
+ sge,
+ sge_count);
+ if (ret) {
+ dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n", ret);
+ return -EIO;
+ }
+ chunk_off += this_write;
+ xdr_off += this_write;
+ xfer_len -= this_write;
+ write_len -= this_write;
+ }
+ }
+ /* Update the req with the number of chunks actually used */
+ svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
+
+ return rqstp->rq_res.len;
+}
+
+/* This function prepares the portion of the RPCRDMA message to be
+ * sent in the RDMA_SEND. This function is called after data sent via
+ * RDMA has already been transmitted. There are three cases:
+ * - The RPCRDMA header, RPC header, and payload are all sent in a
+ * single RDMA_SEND. This is the "inline" case.
+ * - The RPCRDMA header and some portion of the RPC header and data
+ * are sent via this RDMA_SEND and another portion of the data is
+ * sent via RDMA.
+ * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
+ * header and data are all transmitted via RDMA.
+ * In all three cases, this function prepares the RPCRDMA header in
+ * sge[0], the 'type' parameter indicates the type to place in the
+ * RPCRDMA header, and the 'byte_count' field indicates how much of
+ * the XDR to include in this RDMA_SEND.
+ */
+static int send_reply(struct svcxprt_rdma *rdma,
+ struct svc_rqst *rqstp,
+ struct page *page,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rdma_op_ctxt *ctxt,
+ int sge_count,
+ int byte_count)
+{
+ struct ib_send_wr send_wr;
+ int sge_no;
+ int sge_bytes;
+ int page_no;
+ int ret;
+
+ /* Prepare the context */
+ ctxt->pages[0] = page;
+ ctxt->count = 1;
+
+ /* Prepare the SGE for the RPCRDMA Header */
+ ctxt->sge[0].addr =
+ ib_dma_map_page(rdma->sc_cm_id->device,
+ page, 0, PAGE_SIZE, DMA_TO_DEVICE);
+ ctxt->direction = DMA_TO_DEVICE;
+ ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
+ ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
+
+ /* Determine how many of our SGE are to be transmitted */
+ for (sge_no = 1; byte_count && sge_no < sge_count; sge_no++) {
+ sge_bytes = min((size_t)ctxt->sge[sge_no].length, (size_t)byte_count);
+ byte_count -= sge_bytes;
+ }
+ BUG_ON(byte_count != 0);
+
+ /* Save all respages in the ctxt and remove them from the
+ * respages array. They are our pages until the I/O
+ * completes.
+ */
+ for (page_no=0; page_no < rqstp->rq_resused; page_no++) {
+ ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
+ ctxt->count ++;
+ rqstp->rq_respages[page_no] = NULL;
+ }
+
+ BUG_ON(sge_no > rdma->sc_max_sge);
+ memset(&send_wr, 0, sizeof send_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ send_wr.wr_id = (unsigned long)ctxt;
+ send_wr.sg_list = ctxt->sge;
+ send_wr.num_sge = sge_no;
+ send_wr.opcode = IB_WR_SEND;
+ send_wr.send_flags = IB_SEND_SIGNALED;
+
+ ret = svc_rdma_send(rdma, &send_wr);
+ if (ret)
+ svc_rdma_put_context(ctxt,1);
+
+ return ret;
+}
+
+int svc_rdma_sendto(struct svc_rqst *rqstp)
+{
+ struct svc_sock *xprt = rqstp->rq_sock;
+ struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+ struct rpcrdma_msg *rdma_argp;
+ struct rpcrdma_msg *rdma_resp;
+ struct rpcrdma_write_array *reply_ary;
+ enum rpcrdma_proc reply_type;
+ int ret;
+ int inline_bytes;
+ struct ib_sge *sge;
+ int sge_count = 0;
+ struct page *res_page;
+ struct svc_rdma_op_ctxt *ctxt;
+
+ dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
+
+ /* Return the receive WR to the RQ. Any error posting to the
+ * RQ will surface on the SQ post below
+ */
+ (void)svc_rdma_post_recv(rdma);
+
+ /* Get the RDMA request header. */
+ rdma_argp = page_address(rqstp->rq_pages[0]);
+
+ /* Build an SGE for the XDR */
+ ctxt = svc_rdma_get_context(rdma);
+ ctxt->direction = DMA_TO_DEVICE;
+ sge = xdr_to_sge(rdma, &rqstp->rq_res, ctxt->sge, &sge_count);
+
+ inline_bytes = rqstp->rq_res.len;
+
+ /* Create the RDMA response header */
+ res_page = svc_rdma_get_page();
+ rdma_resp = page_address(res_page);
+ reply_ary=svc_rdma_get_reply_array(rdma_argp);
+ if (reply_ary)
+ reply_type = RDMA_NOMSG;
+ else
+ reply_type = RDMA_MSG;
+ svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
+ rdma_resp, reply_type);
+
+ /* Send any write-chunk data and build resp write-list */
+ ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
+ rqstp, sge, sge_count);
+ if (ret < 0) {
+ printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n", ret);
+ goto error;
+ }
+ inline_bytes -= ret;
+
+ /* Send any reply-list data and update resp reply-list */
+ ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
+ rqstp, sge, sge_count);
+ if (ret < 0) {
+ printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n", ret);
+ goto error;
+ }
+ inline_bytes -= ret;
+
+ ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, sge_count,
+ inline_bytes);
+ dprintk("svcrdma: send_reply returns %d\n", ret);
+ return ret;
+ error:
+ svc_rdma_put_context(ctxt, 0);
+ put_page(res_page);
+ return ret;
+}
+

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:42

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 09/10] rdma: makefile


Add the NFSD_RDMA module to the sunrpc makefile.

Signed-off-by: Tom Tucker <[email protected]>
---

net/sunrpc/Makefile | 4 ++++
1 files changed, 4 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index 8ebfc4d..a683d6a 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -13,3 +13,7 @@ sunrpc-y := clnt.o xprt.o socklib.o xprt
sunrpc_syms.o cache.o rpc_pipe.o
sunrpc-$(CONFIG_PROC_FS) += stats.o
sunrpc-$(CONFIG_SYSCTL) += sysctl.o
+
+obj-$(CONFIG_NFSD_RDMA) += svcrdma.o
+svcrdma-y := svc_rdma.o svc_rdma_transport.o \
+ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:42

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 10/10] rdma: Kconfig


Add NFS_RDMA as an option to the Kconfig file.

Signed-off-by: Tom Tucker <[email protected]>
---

fs/Kconfig | 7 +++++++
1 files changed, 7 insertions(+), 0 deletions(-)

diff --git a/fs/Kconfig b/fs/Kconfig
index 0fa0c11..718006c 100644
--- a/fs/Kconfig
+++ b/fs/Kconfig
@@ -1652,6 +1652,13 @@ config NFSD
To compile the NFS server support as a module, choose M here: the
module will be called nfsd. If unsure, say N.

+config NFSD_RDMA
+ tristate "Provide NFS server over RDMA support (EXPERIMENTAL)"
+ depends on SUNRPC && INFINIBAND && EXPERIMENTAL
+ help
+ If you want your NFS server to support RDMA connections, say Y here
+ If unsure, say N.
+
config NFSD_V2_ACL
bool
depends on NFSD

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:42

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 06/10] rdma: SVCRDMA recvfrom


This file implements the RDMA transport recvfrom function. The function
dequeues work reqeust completion contexts from an I/O list that it shares
with the I/O tasklet in svc_rdma_transport.c. For ONCRPC RDMA, an RPC may
not be complete when it is received. Instead, the RDMA header that precedes
the RPC message informs the transport where to get the RPC data from on
the client and where to place it in the RPC message before it is delivered
to the server. The svc_rdma_recvfrom function therefore, parses this RDMA
header and issues any necessary RDMA operations to fetch the remainder of
the RPC from the client.

Special handling is required when the request involves an RDMA_READ
in this case, submits all of the RDMA_READ requests to the underlying
transport driver and then returns 0 (EAGAIN). When the transport
completes the last RDMA_READ for the request, it enqueues it on an
read completion queue and enqueues the transport. The recvfrom code
favors this queue over the regular DTO queue when satisfying reads.

Signed-off-by: Tom Tucker <[email protected]>
---

net/sunrpc/svc_rdma_recvfrom.c | 664 ++++++++++++++++++++++++++++++++++++++++
1 files changed, 664 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma_recvfrom.c b/net/sunrpc/svc_rdma_recvfrom.c
new file mode 100644
index 0000000..278f8a7
--- /dev/null
+++ b/net/sunrpc/svc_rdma_recvfrom.c
@@ -0,0 +1,664 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <[email protected]>
+ */
+
+#include <asm/semaphore.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/err.h>
+#include <linux/time.h>
+
+#include <linux/sunrpc/svcsock.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/mm.h> /* num_physpages */
+#include <linux/spinlock.h>
+#include <linux/net.h>
+#include <net/sock.h>
+#include <asm/io.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCTRANS
+
+/*
+ * Replace the pages in the rq_argpages array with the pages from the SGE in
+ * the RDMA_RECV completion. The SGL should contain full pages up until the
+ * last one.
+ */
+static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *ctxt,
+ u32 byte_count)
+{
+ struct page *page;
+ u32 bc;
+ int sge_no;
+
+ /* Swap the page in the SGE with the page in argpages */
+ page = ctxt->pages[0];
+ put_page(rqstp->rq_pages[0]);
+ rqstp->rq_pages[0] = page;
+
+ /* Set up the XDR head */
+ rqstp->rq_arg.head[0].iov_base = page_address(page);
+ rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
+ rqstp->rq_arg.len = byte_count;
+ rqstp->rq_arg.buflen = byte_count;
+
+ /* Compute bytes past head in the SGL */
+ bc = byte_count - rqstp->rq_arg.head[0].iov_len;
+
+ /* If data remains, store it in the pagelist */
+ rqstp->rq_arg.page_len = bc;
+ rqstp->rq_arg.page_base = 0;
+ rqstp->rq_arg.pages = &rqstp->rq_pages[1];
+ sge_no = 1;
+ while (bc && sge_no < ctxt->count) {
+ page = ctxt->pages[sge_no];
+ put_page(rqstp->rq_pages[sge_no]);
+ rqstp->rq_pages[sge_no] = page;
+ bc -= min(bc, ctxt->sge[sge_no].length);
+ rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
+ sge_no ++;
+ }
+ rqstp->rq_respages = &rqstp->rq_pages[sge_no];
+
+ /* We should never run out of SGE because the limit is defined to
+ * support the max allowed RPC data length
+ */
+ BUG_ON(bc && (sge_no == ctxt->count));
+ BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
+ != byte_count);
+ BUG_ON(rqstp->rq_arg.len != byte_count);
+
+ /* If not all pages were used from the SGL, free the remaining ones */
+ bc = sge_no;
+ while (sge_no < ctxt->count) {
+ page = ctxt->pages[sge_no++];
+ put_page(page);
+ }
+ ctxt->count = bc;
+
+ /* Set up tail */
+ rqstp->rq_arg.tail[0].iov_base = NULL;
+ rqstp->rq_arg.tail[0].iov_len = 0;
+}
+
+struct chunk_sge {
+ int start; /* sge no for this chunk */
+ int count; /* sge count for this chunk */
+};
+
+/* Encode a read-chunk-list as an array of IB SGE
+ *
+ * Assumptions:
+ * - chunk[0]->position points to pages[0] at an offset of 0
+ * - pages[] is not physically or virtually contigous and consists of
+ * PAGE_SIZE elements.
+ * - The context at 'head' will be used to create a copy of
+ * rq_arg.pages for deferred processing in recvfrom
+ *
+ * Output:
+ * - sge array pointing into pages[] array.
+ * - chunk_sge array specifying sge index and count for each
+ * chunk in the read list
+ *
+ */
+static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ struct rpcrdma_msg *rmsgp,
+ struct ib_sge *sge,
+ struct chunk_sge *ch_sge_ary,
+ int ch_count,
+ int byte_count)
+{
+ int sge_no;
+ int sge_bytes;
+ int page_off;
+ int page_no;
+ int ch_bytes;
+ int ch_no;
+ struct rpcrdma_read_chunk *ch;
+
+ sge_no = 0;
+ page_no = 0;
+ page_off = 0;
+ ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+ ch_no = 0;
+ ch_bytes = ch->rc_target.rs_length;
+ head->arg.head[0] = rqstp->rq_arg.head[0];
+ head->arg.tail[0] = rqstp->rq_arg.tail[0];
+ head->arg.pages = &head->pages[head->count];
+ head->sge[0].length = head->count; /* save count of hdr pages */
+ head->arg.page_base = 0;
+ head->arg.page_len = ch_bytes;
+ head->arg.len = rqstp->rq_arg.len + ch_bytes;
+ head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
+ head->count ++;
+ ch_sge_ary[0].start = 0;
+ while (byte_count) {
+ sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
+ sge[sge_no].addr =
+ ib_dma_map_page(xprt->sc_cm_id->device,
+ rqstp->rq_arg.pages[page_no],
+ page_off, sge_bytes,
+ DMA_FROM_DEVICE);
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ /*
+ * Don't bump head->count here because the same page
+ * may be used by multiple SGE.
+ */
+ head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
+
+ byte_count -= sge_bytes;
+ ch_bytes -= sge_bytes;
+ sge_no ++;
+ /*
+ * If all bytes for this chunk have been mapped to an
+ * SGE, move to the next SGE
+ */
+ if (ch_bytes == 0) {
+ ch_sge_ary[ch_no].count =
+ sge_no - ch_sge_ary[ch_no].start;
+ ch_no ++;
+ ch++;
+ ch_sge_ary[ch_no].start = sge_no;
+ ch_bytes = ch->rc_target.rs_length;
+ /* If bytes remaining account for next chunk */
+ if (byte_count) {
+ head->arg.page_len += ch_bytes;
+ head->arg.len += ch_bytes;
+ head->arg.buflen += ch_bytes;
+ }
+ }
+ /*
+ * If this SGE consumed all of the page, move to the
+ * next page
+ */
+ if ((sge_bytes + page_off) == PAGE_SIZE) {
+ page_no ++;
+ page_off = 0;
+ /*
+ * If there are still bytes left to map, bump
+ * the page count
+ */
+ if (byte_count)
+ head->count ++;
+ } else
+ page_off += sge_bytes;
+ }
+ BUG_ON(byte_count != 0);
+ return sge_no;
+}
+
+static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt,
+ struct ib_sge *sge,
+ u64 *sgl_offset,
+ int count)
+{
+ int i;
+
+ ctxt->count = count;
+ for (i = 0; i < count; i++) {
+ ctxt->sge[i].addr = sge[i].addr;
+ ctxt->sge[i].length = sge[i].length;
+ *sgl_offset = *sgl_offset + sge[i].length;
+ }
+}
+
+static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
+{
+#ifdef RDMA_TRANSPORT_IWARP
+ if ((RDMA_TRANSPORT_IWARP ==
+ rdma_node_get_transport(xprt->sc_cm_id->
+ device->node_type))
+ && sge_count > 1)
+ return 1;
+ else
+#endif
+ return min_t(int, sge_count, xprt->sc_max_sge);
+}
+
+/*
+ * Use RDMA_READ to read data from the advertised client buffer into the
+ * XDR stream starting at rq_arg.head[0].iov_base.
+ * Each chunk in the array
+ * contains the following fields:
+ * discrim - '1', This isn't used for data placement
+ * position - The xdr stream offset (the same for every chunk)
+ * handle - RMR for client memory region
+ * length - data transfer length
+ * offset - 64 bit tagged offset in remote memory region
+ *
+ * On our side, we need to read into a pagelist. The first page immediately
+ * follows the RPC header.
+ *
+ * This function returns 1 to indicate success. The data is not yet in
+ * the pagelist and therefore the RPC request must be deferred. The
+ * I/O completion will enqueue the transport again and
+ * svc_rdma_recvfrom will complete the request.
+ *
+ * NOTE: The ctxt must not be touched after the last WR has been posted
+ * because the I/O completion processing may occur on another
+ * processor and free / modify the context. Ne touche pas!
+ */
+static int
+rdma_read_xdr(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rmsgp,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *hdr_ctxt)
+{
+ struct ib_send_wr read_wr;
+ int err = 0;
+ int ch_no;
+ struct ib_sge *sge;
+ int ch_count;
+ int byte_count;
+ int sge_count;
+ u64 sgl_offset;
+ struct rpcrdma_read_chunk *ch;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct svc_rdma_op_ctxt *head;
+ struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+ struct svc_rdma_op_ctxt *tmp_ch_ctxt;
+ struct chunk_sge *ch_sge_ary;
+
+ /* If no read list is present, return 0 */
+ ch = svc_rdma_get_read_chunk(rmsgp);
+ if (!ch)
+ return 0;
+
+ /* Allocate temporary contexts to keep SGE */
+ BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge));
+ tmp_sge_ctxt = svc_rdma_get_context(xprt);
+ sge = tmp_sge_ctxt->sge;
+ tmp_ch_ctxt = svc_rdma_get_context(xprt);
+ ch_sge_ary = (struct chunk_sge*)tmp_ch_ctxt->sge;
+
+ svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
+ sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
+ sge, ch_sge_ary,
+ ch_count, byte_count);
+ head = svc_rdma_get_context(xprt);
+ sgl_offset = 0;
+ ch_no = 0;
+
+ for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+ ch->rc_discrim != 0; ch++, ch_no++) {
+ next_sge:
+ if (!ctxt)
+ ctxt = head;
+ else {
+ ctxt->next = svc_rdma_get_context(xprt);
+ ctxt = ctxt->next;
+ }
+ ctxt->next = NULL;
+ ctxt->direction = DMA_FROM_DEVICE;
+ clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+ clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ if ((ch+1)->rc_discrim == 0) {
+ /*
+ * Checked in sq_cq_reap to see if we need to
+ * be enqueued
+ */
+ set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ ctxt->next = hdr_ctxt;
+ hdr_ctxt->next = head;
+ }
+
+ /* Prepare READ WR */
+ memset(&read_wr, 0, sizeof read_wr);
+ ctxt->wr_op = IB_WR_RDMA_READ;
+ read_wr.wr_id = (unsigned long)ctxt;
+ read_wr.opcode = IB_WR_RDMA_READ;
+ read_wr.send_flags = IB_SEND_SIGNALED;
+ read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;
+ read_wr.wr.rdma.remote_addr =
+ get_unaligned(&(ch->rc_target.rs_offset)) +
+ sgl_offset;
+ read_wr.sg_list = &sge[ch_sge_ary[ch_no].start];
+ read_wr.num_sge =
+ rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count);
+ rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start],
+ &sgl_offset,
+ read_wr.num_sge);
+
+ /* Post the read */
+ err = svc_rdma_send(xprt, &read_wr);
+ if (err) {
+ printk(KERN_ERR "svcrdma: Error posting send =%d\n", err);
+ /*
+ * Break the circular list so free knows when
+ * to stop if the error happened to occur on
+ * the last read
+ */
+ ctxt->next = NULL;
+ goto out;
+ }
+ rdma_stat_read ++;
+
+ if (read_wr.num_sge < ch_sge_ary[ch_no].count) {
+ ch_sge_ary[ch_no].count -= read_wr.num_sge;
+ ch_sge_ary[ch_no].start += read_wr.num_sge;
+ goto next_sge;
+ }
+ sgl_offset = 0;
+ err = 0;
+ }
+
+ out:
+ svc_rdma_put_context(tmp_sge_ctxt, 0);
+ svc_rdma_put_context(tmp_ch_ctxt, 0);
+
+ /* Detach arg pages. svc_recv will replenish them */
+ for (ch_no=0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
+ rqstp->rq_pages[ch_no] = NULL;
+
+ /*
+ * Detach res pages. svc_release must see a resused count of
+ * zero or it will attempt to put them.
+ */
+ while (rqstp->rq_resused)
+ rqstp->rq_respages[--rqstp->rq_resused] = NULL;
+
+ if (err) {
+ printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err);
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ /* Free the linked list of read contexts */
+ while (head != NULL) {
+ ctxt = head->next;
+ svc_rdma_put_context(head, 1);
+ head = ctxt;
+ }
+ return 0;
+ }
+
+ return 1;
+}
+
+static struct svc_deferred_req *
+svc_rdma_deferred_dequeue(struct svc_sock *xprt)
+{
+ struct svc_deferred_req *dr = NULL;
+ spin_lock_bh(&xprt->sk_server->sv_lock);
+ clear_bit(SK_DEFERRED, &xprt->sk_flags);
+ if (!list_empty(&xprt->sk_deferred)) {
+ dr = list_entry(xprt->sk_deferred.next,
+ struct svc_deferred_req,
+ handle.recent);
+ list_del_init(&dr->handle.recent);
+ set_bit(SK_DEFERRED, &xprt->sk_flags);
+ }
+ spin_unlock_bh(&xprt->sk_server->sv_lock);
+ return dr;
+}
+
+static int svc_rdma_deferred_recv(struct svc_rqst *rqstp)
+{
+ struct svc_sock *xprt = rqstp->rq_sock;
+ struct svcxprt_rdma *rdma_xprt = (struct svcxprt_rdma*)xprt;
+ struct svc_rdma_deferred_req *dr =
+ (struct svc_rdma_deferred_req *)rqstp->rq_deferred;
+ void *page_va;
+ int ret;
+ int len;
+
+ /* Rebuild the thread context */
+ rqstp->rq_prot = IPPROTO_MAX;
+ memcpy(&rqstp->rq_addr,
+ &rdma_xprt->sc_cm_id->route.addr.dst_addr,
+ sizeof(rqstp->rq_addr));
+ rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
+
+ page_va = page_address(dr->arg_page);
+ dprintk("svcrdma: satisfying receive from deferred req=%p\n"
+ "\tpage=%p, page va=%p\n\targ_len=%d\n",
+ dr, dr->arg_page, page_va,
+ dr->arg_len);
+
+ /* Replace page in rq_arg.head */
+ put_page(rqstp->rq_pages[0]);
+ rqstp->rq_pages[0] = dr->arg_page;
+ rqstp->rq_arg.head[0].iov_base = page_address(rqstp->rq_pages[0]);
+ rqstp->rq_arg.head[0].iov_len = dr->arg_len;
+ rqstp->rq_arg.tail[0].iov_base = NULL;
+ rqstp->rq_arg.tail[0].iov_len = 0;
+
+ /* There are no additional pages */
+ rqstp->rq_arg.pages = NULL;
+ rqstp->rq_arg.page_base = 0;
+ rqstp->rq_arg.page_len = 0;
+ rqstp->rq_arg.len = dr->arg_len;
+
+ /* Prep the response pages */
+ rqstp->rq_respages = &rqstp->rq_pages[1];
+
+ len = svc_rdma_xdr_decode_deferred_req(rqstp);
+
+ ret = rqstp->rq_arg.head[0].iov_len
+ + rqstp->rq_arg.page_len
+ + rqstp->rq_arg.tail[0].iov_len;
+
+ rqstp->rq_deferred = NULL;
+ kfree(dr);
+ svc_sock_received(xprt);
+ return ret;
+}
+
+static int
+rdma_read_complete(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *data)
+{
+ struct svc_rdma_op_ctxt *head = data->next;
+ struct svcxprt_rdma *rdma_xprt = (struct svcxprt_rdma*)rqstp->rq_sock;
+ int page_no;
+ int ret;
+
+ BUG_ON(!head);
+
+ /* Copy RPC pages */
+ for (page_no=0; page_no < head->count; page_no++) {
+ put_page(rqstp->rq_pages[page_no]);
+ rqstp->rq_pages[page_no] = head->pages[page_no];
+ }
+ /* Point rq_arg.pages past header */
+ rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length];
+ rqstp->rq_arg.page_len = head->arg.page_len;
+ rqstp->rq_arg.page_base = head->arg.page_base;
+
+ /* rq_respages starts after the last arg page */
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
+ rqstp->rq_resused = 0;
+
+ /* Rebuild rq_arg head and tail. */
+ rqstp->rq_arg.head[0] = head->arg.head[0];
+ rqstp->rq_arg.tail[0] = head->arg.tail[0];
+ rqstp->rq_arg.len = head->arg.len;
+ rqstp->rq_arg.buflen = head->arg.buflen;
+
+ rqstp->rq_prot = IPPROTO_MAX;
+ memcpy(&rqstp->rq_addr,
+ &rdma_xprt->sc_cm_id->route.addr.dst_addr,
+ sizeof(rqstp->rq_addr));
+ rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
+
+ /*
+ * Free the contexts we used to build the RDMA_READ. We have
+ * to be careful here because the context list uses the same
+ * next pointer used to chain the contexts associated with the
+ * RDMA_READ
+ */
+ data->next = NULL; /* terminate circular list */
+ do {
+ data = head->next;
+ svc_rdma_put_context(head, 0);
+ head = data;
+ } while (head != NULL);
+
+ svc_sock_received(rqstp->rq_sock);
+ ret = rqstp->rq_arg.head[0].iov_len
+ + rqstp->rq_arg.page_len
+ + rqstp->rq_arg.tail[0].iov_len;
+ dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+ ret,
+ rqstp->rq_arg.len,
+ rqstp->rq_arg.head[0].iov_base,
+ rqstp->rq_arg.head[0].iov_len);
+ return ret;
+}
+
+/*
+ * Set up the rqstp thread context to point to the RQ buffer. If
+ * necessary, pull additional data from the client with an RDMA_READ
+ * request.
+ */
+int svc_rdma_recvfrom(struct svc_rqst *rqstp)
+{
+ struct svc_sock *xprt = rqstp->rq_sock;
+ struct svcxprt_rdma *rdma_xprt = (struct svcxprt_rdma*)xprt;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct rpcrdma_msg *rmsgp;
+ int ret = 0;
+ int len;
+
+ dprintk("svcrdma: rqstp=%p\n", rqstp);
+ if (unlikely(test_bit(SK_DEFERRED, &xprt->sk_flags)))
+ if ((rqstp->rq_deferred = svc_rdma_deferred_dequeue(xprt)))
+ return svc_rdma_deferred_recv(rqstp);
+
+ spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
+ if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
+ ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ }
+ spin_unlock_bh(&rdma_xprt->sc_read_complete_lock);
+ if (ctxt)
+ return rdma_read_complete(rqstp, ctxt);
+
+ spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
+ if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
+ ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ set_bit(SK_DATA, &xprt->sk_flags);
+ } else {
+ rdma_stat_rq_starve ++;
+ clear_bit(SK_DATA, &xprt->sk_flags);
+ ctxt = NULL;
+ }
+ spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
+ if (!ctxt) {
+ /* This is the EAGAIN path. The svc_recv routine will
+ * return -EAGAIN, the nfsd thread will go to call into
+ * svc_recv again and we shouldn't be on the active
+ * transport list
+ */
+ if (test_bit(SK_CLOSE, &xprt->sk_flags))
+ goto close_out;
+
+ BUG_ON(ret);
+ svc_sock_received(xprt);
+ goto out;
+ }
+ dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
+ ctxt, rdma_xprt, rqstp, ctxt->wc_status);
+ BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
+ rdma_stat_recv ++;
+
+ /* Kick another thread for the next RPC */
+ svc_sock_received(xprt);
+
+ /* rqstp struct expects transport to fill in peer address */
+ rqstp->rq_prot = IPPROTO_MAX;
+ memcpy(&rqstp->rq_addr,
+ &rdma_xprt->sc_cm_id->route.addr.dst_addr,
+ sizeof(rqstp->rq_addr));
+ rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
+
+ /* Build up the XDR from the receive buffers. */
+ rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
+
+ /* Decode the RDMA header. */
+ len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+
+ /* If the request is invalid, reply with an error */
+ if (len < 0) {
+ if (len == -ENOSYS)
+ (void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
+ goto close_out;
+ }
+
+ /* Read read-list data. If we would need to wait, defer it */
+ if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt))
+ return 0;
+
+ ret = rqstp->rq_arg.head[0].iov_len
+ + rqstp->rq_arg.page_len
+ + rqstp->rq_arg.tail[0].iov_len;
+ svc_rdma_put_context(ctxt, 0);
+ out:
+ dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+ ret, rqstp->rq_arg.len,
+ rqstp->rq_arg.head[0].iov_base,
+ rqstp->rq_arg.head[0].iov_len);
+ return ret;
+
+ close_out:
+ if (ctxt)
+ svc_rdma_put_context(ctxt, 1);
+ dprintk("svcrdma: transport %p is closing\n", xprt);
+ /*
+ * Set the close bit and enqueue it. svc_recv will see the
+ * close bit and call svc_sock_delete
+ */
+ set_bit(SK_CLOSE, &xprt->sk_flags);
+ svc_sock_received(xprt);
+
+ return 0;
+}

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-01 23:12:41

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 05/10] rdma: SVCRDMA Core Transport Services


This file implements the core transport data management and I/O
path. The I/O path for RDMA involves receiving callbacks on interrupt
context. Since all the svc transport locks are _bh locks we enqueue the
transport on a list, schedule a tasklet to dequeue data indications from
the RDMA completion queue. The tasklet in turn takes _bh locks to
enqueue receive data indications on a list for the transport. The
svc_rdma_recvfrom transport function dequeues data from this list in an
NFSD thread context.

Signed-off-by: Tom Tucker <[email protected]>
---

net/sunrpc/svc_rdma_transport.c | 1206 +++++++++++++++++++++++++++++++++++++++
net/sunrpc/svcauth_unix.c | 3
2 files changed, 1207 insertions(+), 2 deletions(-)

diff --git a/net/sunrpc/svc_rdma_transport.c b/net/sunrpc/svc_rdma_transport.c
new file mode 100644
index 0000000..f0792ed
--- /dev/null
+++ b/net/sunrpc/svc_rdma_transport.c
@@ -0,0 +1,1206 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <[email protected]>
+ */
+
+#include <asm/semaphore.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/err.h>
+#include <linux/time.h>
+#include <linux/delay.h>
+
+#include <linux/sunrpc/svcsock.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/mm.h> /* num_physpages */
+#include <linux/spinlock.h>
+#include <linux/net.h>
+#include <net/sock.h>
+#include <asm/io.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <net/ipv6.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCTRANS
+
+int svc_rdma_create_svc(struct svc_serv *serv, struct sockaddr *sa, int flags);
+static int svc_rdma_accept(struct svc_sock *xprt);
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt);
+static void dto_tasklet_func(unsigned long data);
+static struct cache_deferred_req *svc_rdma_defer(struct cache_req *req);
+static void svc_rdma_detach(struct svc_sock *svsk);
+static void svc_rdma_free(struct svc_sock *svsk);
+static int svc_rdma_has_wspace(struct svc_sock *svsk);
+static int svc_rdma_get_name(char *buf, struct svc_sock *svsk);
+
+static void rq_cq_reap(struct svcxprt_rdma *xprt);
+static void sq_cq_reap(struct svcxprt_rdma *xprt);
+
+DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
+static spinlock_t dto_lock = SPIN_LOCK_UNLOCKED;
+static LIST_HEAD(dto_xprt_q);
+
+struct svc_sock_ops svc_rdma_ops = {
+ .sko_name = "rdma",
+ .sko_create_svc = svc_rdma_create_svc,
+ .sko_get_name = svc_rdma_get_name,
+ .sko_recvfrom = svc_rdma_recvfrom,
+ .sko_sendto = svc_rdma_sendto,
+ .sko_detach = svc_rdma_detach,
+ .sko_free = svc_rdma_free,
+ .sko_has_wspace = svc_rdma_has_wspace,
+ .sko_max_payload = RPCSVC_MAXPAYLOAD_TCP,
+ .sko_accept = svc_rdma_accept,
+ .sko_defer = svc_rdma_defer
+};
+
+static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
+{
+ int target;
+ int at_least_one = 0;
+ struct svc_rdma_op_ctxt *ctxt;
+
+ target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump,
+ xprt->sc_ctxt_max);
+
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ while (xprt->sc_ctxt_cnt < target) {
+ xprt->sc_ctxt_cnt ++;
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ if (ctxt) {
+ at_least_one = 1;
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ } else {
+ /* kmalloc failed...give up for now */
+ xprt->sc_ctxt_cnt --;
+ break;
+ }
+ }
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+ dprintk("svcrdma: sc_ctxt_max=%d, sc_ctxt_cnt=%d\n",
+ xprt->sc_ctxt_max,xprt->sc_ctxt_cnt);
+ return at_least_one;
+}
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+
+ while (1) {
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ if (unlikely(xprt->sc_ctxt_head == NULL)) {
+ /* Try to bump my cache. */
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+ if (rdma_bump_context_cache(xprt))
+ continue;
+
+ printk(KERN_INFO "svcrdma: sleeping waiting for context "
+ "memory on xprt=%p\n",
+ xprt);
+ schedule_timeout_uninterruptible(msecs_to_jiffies(500));
+ continue;
+ }
+ ctxt = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt->next;
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+ ctxt->xprt = xprt;
+ INIT_LIST_HEAD(&ctxt->dto_q);
+ ctxt->count = 0;
+ break;
+ }
+ return ctxt;
+}
+
+void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
+{
+ struct svcxprt_rdma *xprt;
+ int i;
+
+ BUG_ON(!ctxt);
+ xprt = ctxt->xprt;
+ if (free_pages) {
+ for (i=0; i < ctxt->count; i++)
+ put_page(ctxt->pages[i]);
+ }
+
+ for (i=0; i < ctxt->count; i++) {
+ dma_unmap_single(xprt->sc_cm_id->device->dma_device,
+ ctxt->sge[i].addr,
+ ctxt->sge[i].length,
+ ctxt->direction);
+ }
+ spin_lock_bh(&xprt->sc_ctxt_lock);
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ spin_unlock_bh(&xprt->sc_ctxt_lock);
+}
+
+/* ib_cq event handler */
+static void cq_event_handler(struct ib_event *event, void *context)
+{
+ struct svcxprt_rdma *xprt = (struct svcxprt_rdma *)context;
+ printk(KERN_INFO "svcrdma: received CQ event id=%d, context=%p\n",
+ event->event, context);
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+}
+
+/* QP event handler */
+static void qp_event_handler(struct ib_event *event, void *context)
+{
+ struct svcxprt_rdma *xprt = context;
+
+ switch (event->event) {
+ /* These are considered benign events */
+ case IB_EVENT_PATH_MIG:
+ case IB_EVENT_COMM_EST:
+ case IB_EVENT_SQ_DRAINED:
+ case IB_EVENT_QP_LAST_WQE_REACHED:
+ printk(KERN_INFO "svcrdma: QP event %d received for QP=%p\n",
+ event->event, event->element.qp);
+ break;
+ /* These are considered fatal events */
+ case IB_EVENT_PATH_MIG_ERR:
+ case IB_EVENT_QP_FATAL:
+ case IB_EVENT_QP_REQ_ERR:
+ case IB_EVENT_QP_ACCESS_ERR:
+ case IB_EVENT_DEVICE_FATAL:
+ default:
+ printk(KERN_ERR "svcrdma: QP ERROR event %d received for QP=%p, "
+ "closing transport\n",
+ event->event, event->element.qp);
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ break;
+ }
+}
+
+/*
+ * Data Transfer Operation Tasklet
+ *
+ * Walks a list of transports with I/O pending, removing entries as
+ * they are added to the server's I/O pending list. Two bits indicate
+ * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
+ * spinlock that serializes access to the transport list with the RQ
+ * and SQ interrupt handlers.
+ */
+static void dto_tasklet_func(unsigned long data)
+{
+ struct svcxprt_rdma *xprt;
+ unsigned long flags;
+
+ spin_lock_irqsave(&dto_lock, flags);
+ while (!list_empty(&dto_xprt_q)) {
+ xprt = list_entry(dto_xprt_q.next, struct svcxprt_rdma, sc_dto_q);
+ list_del_init(&xprt->sc_dto_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
+ ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+ rq_cq_reap(xprt);
+ set_bit(SK_DATA, &xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&xprt->sc_xprt);
+ }
+
+ if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
+ ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ sq_cq_reap(xprt);
+ }
+
+ spin_lock_irqsave(&dto_lock, flags);
+ }
+ spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+/*
+ * Receive Queue Completion Handler - potentially called on interrupt context.
+ *
+ * svc_sock_enqueue and the remainder of the svc core assumes
+ * uses _bh locks. Since the rq_comp_handler is called on interrupt
+ * context, we need to refer the handling of the I/O to a tasklet
+ */
+static void
+rq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct svcxprt_rdma *xprt = cq_context;
+ unsigned long flags;
+
+ /*
+ * Set the bit regardless of whether or not it's on the list
+ * because it may be on the list already due to an SQ
+ * completion.
+ */
+ set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
+
+ /*
+ * If this transport is not already on the DTO transport queue,
+ * add it
+ */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (list_empty(&xprt->sc_dto_q))
+ list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ /* Tasklet does all the work to avoid irqsave locks. */
+ tasklet_schedule(&dto_tasklet);
+}
+
+/*
+ * rq_cq_reap - Process the RQ CQ.
+ *
+ * Take all completing WC off the CQE and enqueue the associated DTO context
+ * on the dto_q for the transport.
+ */
+static void
+rq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ int ret;
+ struct ib_wc wc;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+
+ rdma_stat_rq_poll ++;
+
+ spin_lock_bh(&xprt->sc_rq_dto_lock);
+ while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
+ ctxt = (struct svc_rdma_op_ctxt*)(unsigned long)wc.wr_id;
+ ctxt->wc_status = wc.status;
+ ctxt->byte_len = wc.byte_len;
+ if (wc.status != IB_WC_SUCCESS) {
+ /* Close the transport */
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ svc_rdma_put_context(ctxt, 1);
+ continue;
+ }
+ list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ }
+ spin_unlock_bh(&xprt->sc_rq_dto_lock);
+
+ if (ctxt)
+ rdma_stat_rq_prod ++;
+}
+
+/*
+ * Send Queue Completion Handler - potentially called on interrupt context.
+ */
+static void
+sq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct ib_wc wc;
+ struct ib_cq *cq = xprt->sc_sq_cq;
+ int ret;
+
+ rdma_stat_sq_poll ++;
+ while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
+ ctxt = (struct svc_rdma_op_ctxt*)(unsigned long)wc.wr_id;
+ xprt = ctxt->xprt;
+
+ if (wc.status != IB_WC_SUCCESS)
+ /* Close the transport */
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+
+ /* Decrement used SQ WR count */
+ atomic_dec(&xprt->sc_sq_count);
+ wake_up(&xprt->sc_send_wait);
+
+ switch (ctxt->wr_op) {
+ case IB_WR_SEND:
+ case IB_WR_RDMA_WRITE:
+ svc_rdma_put_context(ctxt,1);
+ break;
+
+ case IB_WR_RDMA_READ:
+ if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+ set_bit(SK_DATA, &xprt->sc_xprt.sk_flags);
+ set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+ spin_lock_bh(&xprt->sc_read_complete_lock);
+ list_add_tail(&ctxt->dto_q, &xprt->sc_read_complete_q);
+ spin_unlock_bh(&xprt->sc_read_complete_lock);
+ svc_sock_enqueue(&xprt->sc_xprt);
+ }
+ break;
+
+ default:
+ printk(KERN_ERR "svcrdma: unexpected completion type, "
+ "opcode=%d, status=%d\n",
+ wc.opcode, wc.status);
+ break;
+ }
+ }
+
+ if (ctxt)
+ rdma_stat_sq_prod ++;
+}
+
+static void
+sq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct svcxprt_rdma *xprt = cq_context;
+ unsigned long flags;
+
+ /*
+ * Set the bit regardless of whether or not it's on the list
+ * because it may be on the list already due to an RQ
+ * completion.
+ */
+ set_bit(RDMAXPRT_SQ_PENDING,&xprt->sc_flags);
+
+ /*
+ * If this transport is not already on the DTO transport queue,
+ * add it
+ */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (list_empty(&xprt->sc_dto_q))
+ list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ /* Tasklet does all the work to avoid irqsave locks. */
+ tasklet_schedule(&dto_tasklet);
+}
+
+static void
+create_context_cache(struct svcxprt_rdma *xprt,
+ int ctxt_count, int ctxt_bump, int ctxt_max)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+ int i;
+
+ xprt->sc_ctxt_max = ctxt_max;
+ xprt->sc_ctxt_bump = ctxt_bump;
+ xprt->sc_ctxt_cnt = 0;
+ xprt->sc_ctxt_head = NULL;
+ for (i=0; i < ctxt_count; i++) {
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+ if (ctxt) {
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ xprt->sc_ctxt_cnt ++;
+ }
+ }
+}
+
+static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+{
+ struct svc_rdma_op_ctxt *next;
+ if (!ctxt)
+ return;
+
+ do {
+ next = ctxt->next;
+ kfree(ctxt);
+ ctxt = next;
+ } while (next);
+}
+
+static struct svcxprt_rdma *rdma_create_xprt(int listener)
+{
+ struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
+
+ if (!cma_xprt)
+ return NULL;
+
+ INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
+ init_waitqueue_head(&cma_xprt->sc_send_wait);
+
+ spin_lock_init(&cma_xprt->sc_lock);
+ spin_lock_init(&cma_xprt->sc_read_complete_lock);
+ spin_lock_init(&cma_xprt->sc_ctxt_lock);
+ spin_lock_init(&cma_xprt->sc_rq_dto_lock);
+
+ cma_xprt->sc_ord = svcrdma_ord;
+
+ cma_xprt->sc_max_req_size = svcrdma_max_req_size;
+ cma_xprt->sc_max_requests = svcrdma_max_requests;
+ cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
+ atomic_set(&cma_xprt->sc_sq_count,0);
+
+ if (!listener) {
+ int reqs = cma_xprt->sc_max_requests;
+ create_context_cache(cma_xprt,
+ reqs << 1, /* starting size */
+ reqs, /* bump amount */
+ reqs +
+ cma_xprt->sc_sq_depth +
+ RPCRDMA_MAX_THREADS + 1); /* max */
+ if (!cma_xprt->sc_ctxt_head) {
+ kfree(cma_xprt);
+ return NULL;
+ }
+ }
+
+ return cma_xprt;
+}
+
+/*
+ * Create a string for presentation in the portlist file. The RDMA
+ * listener presents an issue because it is a) transport independent,
+ * and b) not bound to a local interface yet. That is, it's listening
+ * on INADDR_ANY. This means that the endpoint 'cm_id' is not yet
+ * bound to a device and therefore, we dont' know if it's IP or
+ * IB. The string created here, therefore, is a fabrication unless the
+ * service has bound to a specific local endpoint.
+ */
+static int svc_rdma_get_name(char *buf, struct svc_sock *svsk)
+{
+ int len;
+ struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)svsk;
+
+ BUG_ON(rdma->sc_cm_id==NULL);
+
+ /* Check if we're bound to a device. If not, */
+ if (!rdma->sc_cm_id->device) {
+ /* fabricate a string */
+ len = sprintf(buf, "ofa rdma 0.0.0.0 %d\n",
+ ntohs(((struct sockaddr_in*)&rdma->sc_cm_id->
+ route.addr.src_addr)->sin_port));
+ return len;
+ }
+
+ switch (rdma_node_get_transport(rdma->sc_cm_id->device->node_type)) {
+ case RDMA_TRANSPORT_IB:
+ len = sprintf(buf, "ib rdma %u.%u.%u.%u %d\n",
+ NIPQUAD(((struct sockaddr_in*)&rdma->sc_cm_id->
+ route.addr.src_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in*)&rdma->sc_cm_id->
+ route.addr.src_addr)->sin_port));
+ break;
+ case RDMA_TRANSPORT_IWARP:
+ len = sprintf(buf, "ipv4 rdma %u.%u.%u.%u %d\n",
+ NIPQUAD(((struct sockaddr_in*)&rdma->sc_cm_id->
+ route.addr.src_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in*)&rdma->sc_cm_id->
+ route.addr.src_addr)->sin_port));
+ break;
+ default:
+ len = sprintf(buf, "*unknown-%d*\n",
+ rdma->sc_cm_id->device->node_type);
+ }
+
+ return len;
+}
+
+struct page *svc_rdma_get_page(void)
+{
+ struct page *page;
+
+ while ((page = alloc_page(GFP_KERNEL))==NULL) {
+ /* If we can't get memory, wait a bit and try again */
+ printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 jiffies.\n");
+ schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
+ }
+ return page;
+}
+
+int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
+{
+ struct ib_recv_wr recv_wr, *bad_recv_wr;
+ struct svc_rdma_op_ctxt *ctxt;
+ struct page *page;
+ unsigned long pa;
+ int sge_no;
+ int buflen;
+ int ret;
+
+ ctxt = svc_rdma_get_context(xprt);
+ buflen = 0;
+ ctxt->direction = DMA_FROM_DEVICE;
+ for (sge_no=0; buflen < xprt->sc_max_req_size; sge_no++) {
+ BUG_ON(sge_no >= xprt->sc_max_sge);
+ page = svc_rdma_get_page();
+ ctxt->pages[sge_no] = page;
+ pa = ib_dma_map_page(xprt->sc_cm_id->device,
+ page, 0, PAGE_SIZE,
+ DMA_FROM_DEVICE);
+ ctxt->sge[sge_no].addr = pa;
+ ctxt->sge[sge_no].length = PAGE_SIZE;
+ ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ buflen += PAGE_SIZE;
+ }
+ ctxt->count = sge_no;
+ recv_wr.next = NULL;
+ recv_wr.sg_list = &ctxt->sge[0];
+ recv_wr.num_sge = ctxt->count;
+ recv_wr.wr_id = (u64)(unsigned long)ctxt;
+
+ ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+ return ret;
+}
+
+static void init_sock(struct svc_sock *svsk, struct svc_serv* serv)
+{
+ svsk->sk_ops = &svc_rdma_ops;
+ INIT_LIST_HEAD(&svsk->sk_list);
+ svc_sock_init(svsk, serv);
+}
+
+/*
+ * This function handles the CONNECT_REQUEST event on a listening
+ * endpoint. It is passed the cma_id for the _new_ connection. The context in
+ * this cma_id is inherited from the listening cma_id and is the svc_sock
+ * structure for the listening endpoint.
+ *
+ * This function creates a new xprt for the new connection and enqueues it on
+ * the accept queue for the listent xprt. When the listen thread is kicked, it
+ * will call the recvfrom method on the listen xprt which will accept the new
+ * connection.
+ */
+static void handle_connect_req(struct rdma_cm_id *new_cma_id)
+{
+ struct svcxprt_rdma *listen_xprt = new_cma_id->context;
+ struct svcxprt_rdma *newxprt;
+
+ /* Create a new transport */
+ newxprt = rdma_create_xprt(0);
+ if (!newxprt) {
+ dprintk("svcrdma: failed to create new transport\n");
+ return;
+ }
+ newxprt->sc_cm_id = new_cma_id;
+ new_cma_id->context = newxprt;
+ dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
+ newxprt, newxprt->sc_cm_id, listen_xprt);
+
+ /* Initialize the new transport */
+ init_sock(&newxprt->sc_xprt, listen_xprt->sc_xprt.sk_server);
+
+ /* Enqueue the new transport on the accept queue of the listening
+ * transport */
+ spin_lock_bh(&listen_xprt->sc_lock);
+ list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
+ spin_unlock_bh(&listen_xprt->sc_lock);
+
+ listen_xprt->sc_xprt.sk_pool = NULL;
+ set_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&listen_xprt->sc_xprt);
+}
+
+/*
+ * Handles events generated on the listening endpoint. These events will be
+ * either be incoming connect requests or adapter removal events.
+ * @param cma_id The CMA ID for the listening endpoint
+ * @event the event being delivered.
+ */
+static int
+rdma_listen_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
+{
+ struct svcxprt_rdma *xprt = cma_id->context;
+ int ret = 0;
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, event=%d\n",
+ cma_id, cma_id->context, event->event);
+ handle_connect_req(cma_id);
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ dprintk("svcrdma: Connection completed on LISTEN xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt)
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ break;
+
+ default:
+ dprintk("svcrdma: Unexpected event on listening endpoint %p, event=%d\n",
+ cma_id, event->event);
+ break;
+ }
+
+ return ret;
+}
+
+static int
+rdma_cma_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
+{
+ struct svcxprt_rdma *xprt = cma_id->context;
+ int ret = 0;
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ dprintk("svcrdma: Connection completed on DTO xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ break;
+
+ case RDMA_CM_EVENT_DISCONNECTED:
+ dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt) {
+ xprt->sc_xprt.sk_pool = NULL;
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&xprt->sc_xprt);
+ }
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, event=%d\n",
+ cma_id, cma_id->context, event->event);
+ if (xprt) {
+ xprt->sc_xprt.sk_pool = NULL;
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&xprt->sc_xprt);
+ }
+ break;
+
+ default:
+ dprintk("svcrdma: Unexpected event on DTO endpoint %p, event=%d\n",
+ cma_id, event->event);
+ break;
+ }
+
+ return ret;
+}
+
+/*
+ * Create a listening RDMA service endpoint
+ * @param serv the RPC service this instance will belong to
+ * @param protocol the protocol for the instance
+ * @param sa the address to bind the local interface to
+ * @return 0 on success, negative value for errors
+ */
+int svc_rdma_create_svc(struct svc_serv *serv, struct sockaddr *sa,
+ int flags)
+{
+ struct rdma_cm_id *listen_id;
+ struct svcxprt_rdma *cma_xprt;
+ struct svc_sock *xprt;
+ int ret;
+
+ dprintk("svcrdma: Creating RDMA socket\n");
+
+ cma_xprt = rdma_create_xprt(1);
+ if (!cma_xprt)
+ return -ENOMEM;
+
+ xprt = &cma_xprt->sc_xprt;
+ init_sock(xprt, serv);
+ set_bit(SK_LISTENER, &xprt->sk_flags);
+
+ /*
+ * We shouldn't receive any events (except device removal) on
+ * the id until we submit the listen request. Any events that
+ * we do receive will get logged as errors and ignored
+ */
+ listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
+ if (IS_ERR(listen_id)) {
+ ret = PTR_ERR(listen_id);
+ rdma_destroy_xprt(cma_xprt);
+ dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
+ return ret;
+ }
+ ret = rdma_bind_addr(listen_id, sa);
+ if (ret) {
+ ret = PTR_ERR(listen_id);
+ rdma_destroy_xprt(cma_xprt);
+ rdma_destroy_id(listen_id);
+ dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
+ return ret;
+ }
+ cma_xprt->sc_cm_id = listen_id;
+
+ /* The xprt is ready to process events at this point */
+ ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
+ if (ret) {
+ ret = PTR_ERR(listen_id);
+ rdma_destroy_id(listen_id);
+ rdma_destroy_xprt(cma_xprt);
+ dprintk("svcrdma: rdma_listen failed = %d\n", ret);
+ return ret;
+ }
+
+ /* Add to list of permanent (listening/unconnected) sockets */
+ svc_sock_add_listener(xprt);
+ clear_bit(SK_BUSY, &xprt->sk_flags);
+
+ return 0;
+}
+
+/*
+ * This is the sk_recvfrom function for listening endpoints. It's purpose is
+ * to accept incoming connections. The CMA callback handler has already
+ * created a new transport and attached the new CMA ID.
+ *
+ * There is a queue of pending connections hung on the listening
+ * transport. This queue contains the new svc_sock structure. This function
+ * takes svc_sock structures off the accept_q and completes the
+ * connection.
+ */
+static int
+svc_rdma_accept(struct svc_sock *xprt)
+{
+ struct svcxprt_rdma *listen_xprt;
+ struct svcxprt_rdma *newxprt;
+ struct rdma_conn_param conn_param;
+ struct ib_qp_init_attr qp_attr;
+ struct ib_device_attr devattr;
+ int ret;
+ int i;
+
+ listen_xprt = (struct svcxprt_rdma*)xprt;
+ if (list_empty(&listen_xprt->sc_accept_q)) {
+ printk(KERN_INFO
+ "svcrdma: woke-up with no pending connection!\n");
+ clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
+ BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
+ clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
+ return 0;
+ }
+
+ /* Get the next entry off the accept list */
+ spin_lock_bh(&listen_xprt->sc_lock);
+ newxprt = list_entry(listen_xprt->sc_accept_q.next,
+ struct svcxprt_rdma, sc_accept_q);
+ list_del_init(&newxprt->sc_accept_q);
+ spin_unlock_bh(&listen_xprt->sc_lock);
+
+ dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
+ newxprt, newxprt->sc_cm_id);
+
+ ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
+ if (ret) {
+ printk(KERN_ERR
+ "svcrdma: could not query device attributes on "
+ "device %p, rc=%d\n",
+ newxprt->sc_cm_id->device, ret);
+ goto errout;
+ }
+
+ /* Qualify the transport resource defaults with the
+ * capabilities of this particular device */
+ newxprt->sc_max_sge = min((size_t)devattr.max_sge,
+ (size_t)RPCSVC_MAXPAGES);
+ newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
+ (size_t)svcrdma_max_requests);
+ newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
+
+ newxprt->sc_ord = min((size_t)devattr.max_qp_rd_atom,
+ (size_t)svcrdma_ord);
+
+ newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
+ if (IS_ERR(newxprt->sc_pd)) {
+ printk(KERN_ERR
+ "svcrdma: error creating PD for connect request\n");
+ ret = PTR_ERR(newxprt->sc_pd);
+ goto errout;
+ }
+ newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ sq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_sq_depth,
+ 0);
+ if (IS_ERR(newxprt->sc_sq_cq)) {
+ printk(KERN_ERR
+ "svcrdma: error creating SQ CQ for connect request\n");
+ ret = PTR_ERR(newxprt->sc_sq_cq);
+ goto errout;
+ }
+ newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ rq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_max_requests,
+ 0);
+ if (IS_ERR(newxprt->sc_rq_cq)) {
+ printk(KERN_ERR
+ "svcrdma: error creating RQ CQ for connect request\n");
+ ret = PTR_ERR(newxprt->sc_rq_cq);
+ goto errout;
+ }
+
+ memset(&qp_attr, 0, sizeof qp_attr);
+ qp_attr.event_handler = qp_event_handler;
+ qp_attr.qp_context = newxprt;
+ qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
+ qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+ qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
+ qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
+ qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ qp_attr.qp_type = IB_QPT_RC;
+ qp_attr.send_cq = newxprt->sc_sq_cq;
+ qp_attr.recv_cq = newxprt->sc_rq_cq;
+ dprintk("newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
+ "cm_id->device=%p, sc_pd->device=%p\n"
+ "qp_attr.cap.max_send_wr = %d\n"
+ "qp_attr.cap.max_recv_wr = %d\n"
+ "qp_attr.cap.max_send_sge = %d\n"
+ "qp_attr.cap.max_recv_sge = %d\n",
+ newxprt->sc_cm_id, newxprt->sc_pd,
+ newxprt->sc_cm_id->device, newxprt->sc_pd->device,
+ qp_attr.cap.max_send_wr,
+ qp_attr.cap.max_recv_wr,
+ qp_attr.cap.max_send_sge,
+ qp_attr.cap.max_recv_sge);
+
+ ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
+ if (ret) {
+ /*
+ * XXX: This is a hack. We need a xx_request_qp interface
+ * that will adjust the qp_attr's with a best-effort
+ * number
+ */
+ qp_attr.cap.max_send_sge -= 2;
+ qp_attr.cap.max_recv_sge -= 2;
+ ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
+ if (ret) {
+ printk(KERN_ERR "svcrdma: failed to create QP, ret=%d\n", ret);
+ goto errout;
+ }
+ newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
+ newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
+ newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
+ newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
+ }
+ newxprt->sc_qp = newxprt->sc_cm_id->qp;
+
+ /* Register all of physical memory */
+ newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd,
+ IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_WRITE);
+ if (IS_ERR(newxprt->sc_phys_mr)) {
+ ret = PTR_ERR(newxprt->sc_phys_mr);
+ printk(KERN_ERR
+ "svcrdma: Failed to create DMA MR ret=%d\n", ret);
+ goto errout;
+ }
+
+ /* Post receive buffers */
+ for (i=0; i < newxprt->sc_max_requests; i++)
+ if ((ret = svc_rdma_post_recv(newxprt))) {
+ printk(KERN_ERR
+ "svcrdma: failure posting receive buffers\n");
+ goto errout;
+ }
+
+ /* Swap out the handler */
+ newxprt->sc_cm_id->event_handler = rdma_cma_handler;
+
+ /* We will get a getattr request from the client before we see
+ * the connect complete event because DTO's run on tasklets,
+ * and connection events run on threads
+ */
+ clear_bit(SK_BUSY, &newxprt->sc_xprt.sk_flags);
+
+ /* Accept Connection */
+ memset(&conn_param, 0, sizeof conn_param);
+ conn_param.responder_resources = 0;
+ conn_param.initiator_depth = newxprt->sc_ord;
+ ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
+ if (ret) {
+ printk(KERN_ERR
+ "svcrdma: failed to accept new connection, ret=%d\n",
+ ret);
+ goto errout;
+ }
+
+ dprintk("svcrdma: new connection %p accepted with the following "
+ "attributes:\n"
+ "\tlocal_ip : %d.%d.%d.%d\n"
+ "\tlocal_port : %d\n"
+ "\tremote_ip : %d.%d.%d.%d\n"
+ "\tremote_port : %d\n"
+ "\tmax_sge : %d\n"
+ "\tsq_depth : %d\n"
+ "\tmax_requests : %d\n"
+ "\tord : %d\n",
+ newxprt,
+ NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_port),
+ NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_port),
+ newxprt->sc_max_sge,
+ newxprt->sc_sq_depth,
+ newxprt->sc_max_requests,
+ newxprt->sc_ord);
+
+ spin_lock_bh(&listen_xprt->sc_lock);
+ if (list_empty(&listen_xprt->sc_accept_q))
+ clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
+ spin_unlock_bh(&listen_xprt->sc_lock);
+ listen_xprt->sc_xprt.sk_pool = NULL;
+ BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
+ clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&listen_xprt->sc_xprt);
+
+ /* Add to the server's temporary (connected) socket list */
+ svc_sock_add_connection(&newxprt->sc_xprt);
+
+ ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+ return ret;
+
+ errout:
+ printk(KERN_ERR "svcrdma: failure accepting new connection rc=%d.\n",
+ ret);
+ BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
+ clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
+ clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
+ rdma_destroy_id(newxprt->sc_cm_id);
+ rdma_destroy_xprt(newxprt);
+ return 0; /* ret; */
+}
+
+/* Disable data ready events for this connection */
+static void svc_rdma_detach(struct svc_sock *svsk)
+{
+ struct svcxprt_rdma *rdma = (struct svcxprt_rdma*)svsk;
+ unsigned long flags;
+
+ dprintk("svc: svc_rdma_detach(%p)\n", svsk);
+
+ /*
+ * Shutdown the connection. This will ensure we don't get any
+ * more events from the provider.
+ */
+ rdma_disconnect(rdma->sc_cm_id);
+ rdma_destroy_id(rdma->sc_cm_id);
+
+ /* We may already be on the DTO list, however */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (!list_empty(&rdma->sc_dto_q))
+ list_del_init(&rdma->sc_dto_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+static void svc_rdma_free(struct svc_sock *svsk)
+{
+ struct svcxprt_rdma *xprt = (struct svcxprt_rdma *)svsk;
+ dprintk("svcrdma: svc_rdma_free(%p)\n", svsk);
+
+ rdma_destroy_xprt(xprt);
+ kfree(svsk);
+}
+
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
+{
+ if (xprt->sc_qp)
+ ib_destroy_qp(xprt->sc_qp);
+
+ if (xprt->sc_sq_cq)
+ ib_destroy_cq(xprt->sc_sq_cq);
+
+ if (xprt->sc_rq_cq)
+ ib_destroy_cq(xprt->sc_rq_cq);
+
+ if (xprt->sc_pd)
+ ib_dealloc_pd(xprt->sc_pd);
+
+ destroy_context_cache(xprt->sc_ctxt_head);
+}
+
+static int svc_rdma_has_wspace(struct svc_sock *svsk)
+{
+ struct svcxprt_rdma *xprt = (struct svcxprt_rdma *)svsk;
+ /*
+ * If there are fewer SQ WR available than required to send a
+ * simple response, return false.
+ */
+ if ((xprt->sc_sq_depth - atomic_read(&xprt->sc_sq_count) < 3))
+ return 0;
+
+ /*
+ * ...or there are already waiters on the SQ,
+ * return false.
+ */
+ if (waitqueue_active(&xprt->sc_send_wait))
+ return 0;
+
+ /* Otherwise return true. */
+ return 1;
+}
+
+int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
+{
+ struct ib_send_wr *bad_wr;
+ int ret;
+
+ if (test_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags))
+ return 0;
+
+ BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
+ BUG_ON(((struct svc_rdma_op_ctxt*)(unsigned long)wr->wr_id)->wr_op !=
+ wr->opcode);
+ /* If the SQ is full, wait until an SQ entry is available */
+ while (1) {
+ spin_lock_bh(&xprt->sc_lock);
+ if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
+ spin_unlock_bh(&xprt->sc_lock);
+ rdma_stat_sq_starve ++;
+ /* First see if we can opportunistically reap some SQ WR */
+ sq_cq_reap(xprt);
+
+ /* Wait until SQ WR available if SQ still full */
+ wait_event(xprt->sc_send_wait,
+ atomic_read(&xprt->sc_sq_count) < xprt->sc_sq_depth);
+ continue;
+ }
+ /* Bumped used SQ WR count and post */
+ ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
+ if (!ret)
+ atomic_inc(&xprt->sc_sq_count);
+ else {
+ printk(KERN_ERR "svcrdma: failed to post SQ WR rc=%d, "
+ "sc_sq_count=%d, sc_sq_depth=%d\n",
+ ret, atomic_read(&xprt->sc_sq_count),
+ xprt->sc_sq_depth);
+ }
+ spin_unlock_bh(&xprt->sc_lock);
+ break;
+ }
+
+ return ret;
+}
+
+int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err)
+{
+ struct ib_send_wr err_wr;
+ struct ib_sge sge;
+ struct page *p;
+ struct svc_rdma_op_ctxt *ctxt;
+ u32 *va;
+ int length;
+ int ret;
+
+ p = svc_rdma_get_page();
+ va = page_address(p);
+
+ /* XDR encode error */
+ length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+ /* Prepare SGE for local address */
+ sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
+ p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
+ sge.lkey = xprt->sc_phys_mr->lkey;
+ sge.length = length;
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->count = 1;
+ ctxt->pages[0] = p;
+
+ /* Prepare SEND WR */
+ memset(&err_wr, 0, sizeof err_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ err_wr.wr_id = (unsigned long)ctxt;
+ err_wr.sg_list = &sge;
+ err_wr.num_sge = 1;
+ err_wr.opcode = IB_WR_SEND;
+ err_wr.send_flags = IB_SEND_SIGNALED;
+
+ /* Post It */
+ ret = svc_rdma_send(xprt, &err_wr);
+ if (ret) {
+ dprintk("svcrdma: Error posting send = %d\n", ret);
+ svc_rdma_put_context(ctxt,1);
+ }
+
+ return ret;
+}
+
+/*
+ * This request cannot be handled right now. Allocate a structure to
+ * keep it's state pending completion processing. To accomplish this, the
+ * function creates an svc_rdma_op_ctxt that looks like a receive completion and
+ * enqueues it on the svc_sock's deferred request list. When*
+ * svc_rdma_recvfrom is subsequently called, it first checks if there is a
+ * deferred RPC and if there is:
+ * - Takes the deferred request off the deferred request queue
+ * - Extracts the svc_rdma_op_ctxt from the deferred request structure
+ * - Frees the deferred request structure
+ * - Skips the ib_cq_poll call and processes the svc_rdma_op_ctxt as if it had
+ * just come out of an WR pulled from the CQ.
+ */
+static struct cache_deferred_req *
+svc_rdma_defer(struct cache_req *req)
+{
+ struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
+ struct svcxprt_rdma *xprt;
+ struct svc_rdma_deferred_req *dr;
+
+ dprintk("svcrdma: deferring request on \n"
+ " rqstp=%p\n"
+ " rqstp->rq_arg.len=%d\n",
+ rqstp,
+ rqstp->rq_arg.len);
+
+ /* if more than a page, give up FIXME */
+ if (rqstp->rq_arg.page_len)
+ return NULL;
+
+ BUG_ON(rqstp->rq_deferred);
+ xprt = (struct svcxprt_rdma*)rqstp->rq_sock;
+ dr = kmalloc(sizeof(struct svc_rdma_deferred_req), GFP_KERNEL);
+ if (!dr)
+ return NULL;
+
+ dr->req.handle.owner = rqstp->rq_server;
+ dr->req.prot = rqstp->rq_prot;
+ dr->req.addr = rqstp->rq_addr;
+ dr->req.daddr = rqstp->rq_daddr;
+ dr->req.argslen = rqstp->rq_arg.len >> 2;
+ dr->arg_page = rqstp->rq_pages[0];
+ dr->arg_len = rqstp->rq_arg.len;
+ rqstp->rq_pages[0] = svc_rdma_get_page();
+
+ svc_sock_get(rqstp->rq_sock);
+ dr->req.svsk = rqstp->rq_sock;
+ dr->req.handle.revisit = svc_revisit;
+
+ return &dr->req.handle;
+}
+
diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c
index 07dcd20..4029924 100644
--- a/net/sunrpc/svcauth_unix.c
+++ b/net/sunrpc/svcauth_unix.c
@@ -409,9 +409,8 @@ static inline void
ip_map_cached_put(struct svc_rqst *rqstp, struct ip_map *ipm)
{
struct svc_sock *svsk = rqstp->rq_sock;
-
spin_lock(&svsk->sk_lock);
- if (svsk->sk_sock->type == SOCK_STREAM &&
+ if (test_bit(SK_TEMP, &svsk->sk_flags) &&
svsk->sk_info_authunix == NULL) {
/* newly cached, keep the reference */
svsk->sk_info_authunix = ipm;

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-07-04 14:42:54

by Tom Tucker

[permalink] [raw]
Subject: Re: [RFC,PATCH 08/10] rdma: ONCRPC RDMA protocol marshalling


These marshal/unmarshal routines are for the RDMA header, not the NFS RPC
header. They are broken out here to try and keep all of the RPC RDMA wire
protocol knowledge in one place.


On 7/2/07 11:34 AM, "Chuck Lever" <[email protected]> wrote:

> I need to study this more, but can you explain why transport-specific
> marshal/unmarshal routines are required?
>
> Does this replace the page-handling XDR routines that are the defaults
> for socket-based transports? If these are really needed, should they
> part of the server side transport switch?
>
> Tom Tucker wrote:
>> This file implements the ONCRPC RDMA protocol marshelling and
>> unmarshalling logic.
>>
>> Signed-off-by: Tom Tucker <[email protected]>
>> ---
>>
>> net/sunrpc/svc_rdma_marshal.c | 424
>> +++++++++++++++++++++++++++++++++++++++++
>> 1 files changed, 424 insertions(+), 0 deletions(-)
>>
>> diff --git a/net/sunrpc/svc_rdma_marshal.c b/net/sunrpc/svc_rdma_marshal.c
>> new file mode 100644
>> index 0000000..0a34efb
>> --- /dev/null
>> +++ b/net/sunrpc/svc_rdma_marshal.c
>> @@ -0,0 +1,424 @@
>> +/*
>> + * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
>> + *
>> + * This software is available to you under a choice of one of two
>> + * licenses. You may choose to be licensed under the terms of the GNU
>> + * General Public License (GPL) Version 2, available from the file
>> + * COPYING in the main directory of this source tree, or the BSD-type
>> + * license below:
>> + *
>> + * Redistribution and use in source and binary forms, with or without
>> + * modification, are permitted provided that the following conditions
>> + * are met:
>> + *
>> + * Redistributions of source code must retain the above copyright
>> + * notice, this list of conditions and the following disclaimer.
>> + *
>> + * Redistributions in binary form must reproduce the above
>> + * copyright notice, this list of conditions and the following
>> + * disclaimer in the documentation and/or other materials provided
>> + * with the distribution.
>> + *
>> + * Neither the name of the Network Appliance, Inc. nor the names of
>> + * its contributors may be used to endorse or promote products
>> + * derived from this software without specific prior written
>> + * permission.
>> + *
>> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
>> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
>> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
>> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
>> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
>> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
>> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
>> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
>> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
>> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
>> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
>> + *
>> + * Author: Tom Tucker <[email protected]>
>> + */
>> +
>> +#include <asm/semaphore.h>
>> +#include <linux/device.h>
>> +#include <linux/in.h>
>> +#include <linux/err.h>
>> +#include <linux/time.h>
>> +
>> +#include <rdma/rdma_cm.h>
>> +
>> +#include <linux/sunrpc/svcsock.h>
>> +#include <linux/sunrpc/debug.h>
>> +#include <linux/sunrpc/rpc_rdma.h>
>> +#include <linux/spinlock.h>
>> +#include <linux/net.h>
>> +#include <net/sock.h>
>> +#include <asm/io.h>
>> +#include <asm/unaligned.h>
>> +#include <rdma/rdma_cm.h>
>> +#include <rdma/ib_verbs.h>
>> +#include <linux/sunrpc/rpc_rdma.h>
>> +#include <linux/sunrpc/svc_rdma.h>
>> +
>> +#define RPCDBG_FACILITY RPCDBG_SVCTRANS
>> +
>> +/*
>> + * Decodes a read chunk list. The expected format is as follows:
>> + * descrim : xdr_one
>> + * position : u32 offset into XDR stream
>> + * handle : u32 RKEY
>> + * . . .
>> + * end-of-list: xdr_zero
>> + */
>> +static u32 *decode_read_list(u32 *va, u32 *vaend)
>> +{
>> + struct rpcrdma_read_chunk *ch = (struct rpcrdma_read_chunk*)va;
>> +
>> + while (ch->rc_discrim != xdr_zero) {
>> + u64 ch_offset;
>> +
>> + if (((unsigned long)ch + sizeof(struct rpcrdma_read_chunk)) >
>> + (unsigned long)vaend) {
>> + dprintk("svcrdma: vaend=%p, ch=%p\n", vaend, ch);
>> + return NULL;
>> + }
>> +
>> + ch->rc_discrim = ntohl(ch->rc_discrim);
>> + ch->rc_position = ntohl(ch->rc_position);
>> + ch->rc_target.rs_handle = ntohl(ch->rc_target.rs_handle);
>> + ch->rc_target.rs_length = ntohl(ch->rc_target.rs_length);
>> + va = (u32*)&ch->rc_target.rs_offset;
>> + xdr_decode_hyper(va, &ch_offset);
>> + put_unaligned(ch_offset, (u64*)va);
>> + ch++;
>> + }
>> + return (u32*)&ch->rc_position;
>> +}
>> +
>> +/*
>> + * Determine number of chunks and total bytes in chunk list. The chunk
>> + * list has already been verified to fit within the RPCRDMA header.
>> + */
>> +void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *ch,
>> + int *ch_count, int *byte_count)
>> +{
>> + /* compute the number of bytes represented by read chunks */
>> + *byte_count = 0;
>> + *ch_count = 0;
>> + for (; ch->rc_discrim != 0; ch++) {
>> + *byte_count = *byte_count + ch->rc_target.rs_length;
>> + *ch_count = *ch_count + 1;
>> + }
>> +}
>> +
>> +/*
>> + * Decodes a write chunk list. The expected format is as follows:
>> + * descrim : xdr_one
>> + * nchunks : <count>
>> + * handle : u32 RKEY ---+
>> + * length : u32 <len of segment> |
>> + * offset : remove va + <count>
>> + * . . . |
>> + * ---+
>> + */
>> +static u32 *decode_write_list(u32 *va, u32 *vaend)
>> +{
>> + int ch_no;
>> + struct rpcrdma_write_array *ary =
>> + (struct rpcrdma_write_array*)va;
>> +
>> + /* Check for not write-array */
>> + if (ary->wc_discrim == xdr_zero)
>> + return (u32*)&ary->wc_nchunks;
>> +
>> + if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
>> + (unsigned long)vaend) {
>> + dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
>> + return NULL;
>> + }
>> + ary->wc_discrim = ntohl(ary->wc_discrim);
>> + ary->wc_nchunks = ntohl(ary->wc_nchunks);
>> + if (((unsigned long)&ary->wc_array[0] +
>> + (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
>> + (unsigned long)vaend) {
>> + dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
>> + ary, ary->wc_nchunks, vaend);
>> + return NULL;
>> + }
>> + for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
>> + u64 ch_offset;
>> +
>> + ary->wc_array[ch_no].wc_target.rs_handle =
>> + ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
>> + ary->wc_array[ch_no].wc_target.rs_length =
>> + ntohl(ary->wc_array[ch_no].wc_target.rs_length);
>> + va = (u32*)&ary->wc_array[ch_no].wc_target.rs_offset;
>> + xdr_decode_hyper(va, &ch_offset);
>> + put_unaligned(ch_offset, (u64*)va);
>> + }
>> +
>> + /*
>> + * rs_length is the 2nd 4B field in wc_target and taking its
>> + * address skips the list terminator
>> + */
>> + return (u32*)&ary->wc_array[ch_no].wc_target.rs_length;
>> +}
>> +
>> +static u32 *decode_reply_array(u32 *va, u32 *vaend)
>> +{
>> + int ch_no;
>> + struct rpcrdma_write_array *ary =
>> + (struct rpcrdma_write_array*)va;
>> +
>> + /* Check for no reply-array */
>> + if (ary->wc_discrim == xdr_zero)
>> + return (u32*)&ary->wc_nchunks;
>> +
>> + if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
>> + (unsigned long)vaend) {
>> + dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
>> + return NULL;
>> + }
>> + ary->wc_discrim = ntohl(ary->wc_discrim);
>> + ary->wc_nchunks = ntohl(ary->wc_nchunks);
>> + if (((unsigned long)&ary->wc_array[0] +
>> + (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
>> + (unsigned long)vaend) {
>> + dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
>> + ary, ary->wc_nchunks, vaend);
>> + return NULL;
>> + }
>> + for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
>> + u64 ch_offset;
>> +
>> + ary->wc_array[ch_no].wc_target.rs_handle =
>> + ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
>> + ary->wc_array[ch_no].wc_target.rs_length =
>> + ntohl(ary->wc_array[ch_no].wc_target.rs_length);
>> + va = (u32*)&ary->wc_array[ch_no].wc_target.rs_offset;
>> + xdr_decode_hyper(va, &ch_offset);
>> + put_unaligned(ch_offset, (u64*)va);
>> + }
>> +
>> + return (u32*)&ary->wc_array[ch_no];
>> +}
>> +
>> +int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req, struct svc_rqst
>> *rqstp)
>> +{
>> + struct rpcrdma_msg *rmsgp = NULL;
>> + u32 *va;
>> + u32 *vaend;
>> + u32 hdr_len;
>> +
>> + rmsgp = (struct rpcrdma_msg*)rqstp->rq_arg.head[0].iov_base;
>> +
>> + /* Verify that there's enough bytes for header + something */
>> + if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
>> + dprintk("svcrdma: header too short = %d\n",
>> + rqstp->rq_arg.len);
>> + return -EINVAL;
>> + }
>> +
>> + /* Decode the header */
>> + rmsgp->rm_xid = ntohl(rmsgp->rm_xid);
>> + rmsgp->rm_vers = ntohl(rmsgp->rm_vers);
>> + rmsgp->rm_credit = ntohl(rmsgp->rm_credit);
>> + rmsgp->rm_type = ntohl(rmsgp->rm_type);
>> +
>> + if (rmsgp->rm_vers != RPCRDMA_VERSION)
>> + return -ENOSYS;
>> +
>> + /* Pull in the extra for the padded case and bump our pointer */
>> + if (rmsgp->rm_type == RDMA_MSGP) {
>> + int hdrlen;
>> + rmsgp->rm_body.rm_padded.rm_align =
>> + ntohl(rmsgp->rm_body.rm_padded.rm_align);
>> + rmsgp->rm_body.rm_padded.rm_thresh =
>> + ntohl(rmsgp->rm_body.rm_padded.rm_thresh);
>> +
>> + va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
>> + rqstp->rq_arg.head[0].iov_base = va;
>> + hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
>> + rqstp->rq_arg.head[0].iov_len -= hdrlen;
>> + if (hdrlen > rqstp->rq_arg.len)
>> + return -EINVAL;
>> + return hdrlen;
>> + }
>> +
>> + /* The chunk list may contain either a read chunk list or a write
>> + * chunk list and a reply chunk list.
>> + */
>> + va = &rmsgp->rm_body.rm_chunks[0];
>> + vaend = (u32*)((unsigned long)rmsgp + rqstp->rq_arg.len);
>> + va = decode_read_list(va,vaend);
>> + if (!va)
>> + return -EINVAL;
>> + va = decode_write_list(va, vaend);
>> + if (!va)
>> + return -EINVAL;
>> + va = decode_reply_array(va,vaend);
>> + if (!va)
>> + return -EINVAL;
>> +
>> + rqstp->rq_arg.head[0].iov_base = va;
>> + hdr_len = (unsigned long)va - (unsigned long)rmsgp;
>> + rqstp->rq_arg.head[0].iov_len -= hdr_len;
>> +
>> + *rdma_req = rmsgp;
>> + return hdr_len;
>> +}
>> +
>> +int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *rqstp)
>> +{
>> + struct rpcrdma_msg *rmsgp = NULL;
>> + struct rpcrdma_read_chunk *ch;
>> + struct rpcrdma_write_array *ary;
>> + u32 *va;
>> + u32 hdrlen;
>> +
>> + dprintk("svcrdma: processing deferred RDMA header on rqstp=%p\n",
>> + rqstp);
>> + rmsgp = (struct rpcrdma_msg*)rqstp->rq_arg.head[0].iov_base;
>> +
>> + /* Pull in the extra for the padded case and bump our pointer */
>> + if (rmsgp->rm_type == RDMA_MSGP) {
>> + va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
>> + rqstp->rq_arg.head[0].iov_base = va;
>> + hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
>> + rqstp->rq_arg.head[0].iov_len -= hdrlen;
>> + return hdrlen;
>> + }
>> +
>> + /*
>> + * Skip all chunks to find RPC msg. These were previously processed
>> + */
>> + va = &rmsgp->rm_body.rm_chunks[0];
>> +
>> + /* Skip read-list */
>> + for (ch = (struct rpcrdma_read_chunk*)va;
>> + ch->rc_discrim != xdr_zero; ch++);
>> + va = (u32*)&ch->rc_position;
>> +
>> + /* Skip write-list */
>> + ary = (struct rpcrdma_write_array*)va;
>> + if (ary->wc_discrim == xdr_zero)
>> + va = (u32*)&ary->wc_nchunks;
>> + else
>> + /*
>> + * rs_length is the 2nd 4B field in wc_target and taking its
>> + * address skips the list terminator
>> + */
>> + va = (u32*)&ary->wc_array[ary->wc_nchunks].wc_target.rs_length;
>> +
>> + /* Skip reply-array */
>> + ary = (struct rpcrdma_write_array*)va;
>> + if (ary->wc_discrim == xdr_zero)
>> + va = (u32*)&ary->wc_nchunks;
>> + else
>> + va = (u32*)&ary->wc_array[ary->wc_nchunks];
>> +
>> + rqstp->rq_arg.head[0].iov_base = va;
>> + hdrlen = (unsigned long)va - (unsigned long)rmsgp;
>> + rqstp->rq_arg.head[0].iov_len -= hdrlen;
>> +
>> + return hdrlen;
>> +}
>> +
>> +int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg
>> *rmsgp,
>> + enum rpcrdma_errcode err, u32 *va)
>> +{
>> + u32* startp = va;
>> +
>> + *va++ = htonl(rmsgp->rm_xid);
>> + *va++ = htonl(rmsgp->rm_vers);
>> + *va++ = htonl(xprt->sc_max_requests);
>> + *va++ = htonl(RDMA_ERROR);
>> + *va++ = htonl(err);
>> + if (err == ERR_VERS) {
>> + *va++ = htonl(RPCRDMA_VERSION);
>> + *va++ = htonl(RPCRDMA_VERSION);
>> + }
>> +
>> + return (int)((unsigned long)va - (unsigned long)startp);
>> +}
>> +
>> +int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *rmsgp)
>> +{
>> + struct rpcrdma_write_array *wr_ary;
>> +
>> + /* There is no read-list in a reply */
>> +
>> + /* skip write list */
>> + wr_ary = (struct rpcrdma_write_array *)
>> + &rmsgp->rm_body.rm_chunks[1];
>> + if (wr_ary->wc_discrim)
>> + wr_ary = (struct rpcrdma_write_array *)
>> + &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)].
>> + wc_target.rs_length;
>> + else
>> + wr_ary = (struct rpcrdma_write_array *)
>> + &wr_ary->wc_nchunks;
>> +
>> + /* skip reply array */
>> + if (wr_ary->wc_discrim)
>> + wr_ary = (struct rpcrdma_write_array *)
>> + &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)];
>> + else
>> + wr_ary = (struct rpcrdma_write_array *)
>> + &wr_ary->wc_nchunks;
>> +
>> + return (unsigned long) wr_ary - (unsigned long) rmsgp;
>> +}
>> +
>> +void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
>> +{
>> + struct rpcrdma_write_array *ary;
>> +
>> + /* no read-list */
>> + rmsgp->rm_body.rm_chunks[0] = xdr_zero;
>> +
>> + /* write-array discrim */
>> + ary = (struct rpcrdma_write_array *)
>> + &rmsgp->rm_body.rm_chunks[1];
>> + ary->wc_discrim = xdr_one;
>> + ary->wc_nchunks = htonl(chunks);
>> +
>> + /* write-list terminator */
>> + ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
>> +
>> + /* reply-array discriminator */
>> + ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
>> +}
>> +
>> +void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
>> + int chunks)
>> +{
>> + ary->wc_discrim = xdr_one;
>> + ary->wc_nchunks = htonl(chunks);
>> +}
>> +
>> +void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary, int
>> chunk_no,
>> + u32 rs_handle, u64 rs_offset, u32 write_len)
>> +{
>> + struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
>> + seg->rs_handle = htonl(rs_handle);
>> + seg->rs_length = htonl(write_len);
>> + xdr_encode_hyper((u32*) &seg->rs_offset, rs_offset);
>> +}
>> +
>> +void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt,
>> + struct rpcrdma_msg *rdma_argp,
>> + struct rpcrdma_msg *rdma_resp,
>> + enum rpcrdma_proc rdma_type)
>> +{
>> + rdma_resp->rm_xid = htonl(rdma_argp->rm_xid);
>> + rdma_resp->rm_vers = htonl(rdma_argp->rm_vers);
>> + rdma_resp->rm_credit = htonl(xprt->sc_max_requests);
>> + rdma_resp->rm_type = htonl(rdma_type);
>> +
>> + /* Encode <nul> chunks lists */
>> + rdma_resp->rm_body.rm_chunks[0] = xdr_zero;
>> + rdma_resp->rm_body.rm_chunks[1] = xdr_zero;
>> + rdma_resp->rm_body.rm_chunks[2] = xdr_zero;
>> +}
>> +
>>
>> -------------------------------------------------------------------------
>> This SF.net email is sponsored by DB2 Express
>> Download DB2 Express C - the FREE version of DB2 express and take
>> control of your XML. No limits. Just data. Click to get it now.
>> http://sourceforge.net/powerbar/db2/
>> _______________________________________________
>> NFS maillist - [email protected]
>> https://lists.sourceforge.net/lists/listinfo/nfs
>



-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs