2007-05-18 17:46:00

by Tom Tucker

[permalink] [raw]
Subject: [RFC,PATCH 11/15] knfsd: RDMA transport core


This file implements the core transport data management and I/O
path. The I/O path for RDMA involves receiving callbacks on interrupt
context. Since all the svc transport locks are _bh locks we enqueue the
transport on a list, schedule a tasklet to dequeue data indications from
the RDMA completion queue. The tasklet in turn takes _bh locks to
enqueue receive data indications on a list for the transport. The
svc_rdma_recvfrom transport function dequeues data from this list in an
NFSD thread context.

Signed-off-by: Tom Tucker <[email protected]>
---

net/sunrpc/svc_rdma_transport.c | 1199 +++++++++++++++++++++++++++++++++++++++
1 files changed, 1199 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/svc_rdma_transport.c b/net/sunrpc/svc_rdma_transport.c
new file mode 100644
index 0000000..8b5ddda
--- /dev/null
+++ b/net/sunrpc/svc_rdma_transport.c
@@ -0,0 +1,1199 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <[email protected]>
+ */
+
+#include <asm/semaphore.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/err.h>
+#include <linux/time.h>
+#include <linux/delay.h>
+
+#include <linux/sunrpc/svcsock.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/mm.h> /* num_physpages */
+#include <linux/spinlock.h>
+#include <linux/net.h>
+#include <net/sock.h>
+#include <asm/io.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <net/ipv6.h>
+#include <linux/sunrpc/svc_rdma.h>
+#include "svc_rdma_debug.h"
+
+static int svc_rdma_accept(struct svc_rqst *rqstp);
+static void svc_rdma_delete(struct svc_sock *xprt);
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt);
+static void svc_rdma_put(struct svc_sock *xprt);
+static int svc_rdma_prep_reply_buf(struct svc_rqst *rqstp);
+static void dto_tasklet_func(unsigned long data);
+static struct cache_deferred_req *svc_rdma_defer(struct cache_req *req);
+static void svc_rdma_revisit(struct cache_deferred_req *dreq, int too_many);
+
+DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
+static spinlock_t dto_lock = SPIN_LOCK_UNLOCKED;
+static LIST_HEAD(dto_xprt_q);
+
+static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
+{
+ int target;
+ int at_least_one = 0;
+ struct svc_rdma_op_ctxt *ctxt;
+ unsigned long flags;
+
+ target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump,
+ xprt->sc_ctxt_max);
+
+ spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
+ while (xprt->sc_ctxt_cnt < target) {
+ xprt->sc_ctxt_cnt ++;
+ spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
+
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+
+ spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
+ if (ctxt) {
+ at_least_one = 1;
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ } else {
+ /* kmalloc failed...give up for now */
+ xprt->sc_ctxt_cnt --;
+ break;
+ }
+ }
+ spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
+
+ return at_least_one;
+}
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+ unsigned long flags;
+
+ while (1) {
+ spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
+ if (unlikely(xprt->sc_ctxt_head == NULL)) {
+ /* Try to bump my cache. */
+ spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
+
+ if (rdma_bump_context_cache(xprt))
+ continue;
+
+ printk(KERN_INFO "svcrdma: sleeping waiting for context "
+ "memory on xprt=%p\n",
+ xprt);
+ schedule_timeout_uninterruptible(msecs_to_jiffies(500));
+ continue;
+ }
+ ctxt = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt->next;
+ spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
+ ctxt->xprt = xprt;
+ INIT_LIST_HEAD(&ctxt->dto_q);
+ break;
+ }
+ ctxt->count = 0;
+ return ctxt;
+}
+
+void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
+{
+ unsigned long flags;
+ struct svcxprt_rdma *xprt;
+ int i;
+
+ BUG_ON(!ctxt);
+ xprt = ctxt->xprt;
+ if (free_pages) {
+ for (i=0; i < ctxt->count; i++)
+ put_page(ctxt->pages[i]);
+ }
+
+ for (i=0; i < ctxt->count; i++) {
+ dma_unmap_single(xprt->sc_cm_id->device->dma_device,
+ ctxt->sge[i].addr,
+ ctxt->sge[i].length,
+ ctxt->direction);
+ }
+ spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
+}
+
+/* ib_cq event handler */
+static void cq_event_handler(struct ib_event *event, void *context)
+{
+ struct svcxprt_rdma *xprt = (struct svcxprt_rdma *)context;
+ printk(KERN_INFO "svcrdma: received CQ event id=%d, context=%p\n",
+ event->event, context);
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+}
+
+/* QP event handler */
+static void qp_event_handler(struct ib_event *event, void *context)
+{
+ struct svcxprt_rdma *xprt = context;
+
+ switch (event->event) {
+ /* These are considered benign events */
+ case IB_EVENT_PATH_MIG:
+ case IB_EVENT_COMM_EST:
+ case IB_EVENT_SQ_DRAINED:
+ case IB_EVENT_QP_LAST_WQE_REACHED:
+ printk(KERN_INFO "svcrdma: QP event %d received for QP=%p\n",
+ event->event, event->element.qp);
+ break;
+ /* These are considered fatal events */
+ case IB_EVENT_PATH_MIG_ERR:
+ case IB_EVENT_QP_FATAL:
+ case IB_EVENT_QP_REQ_ERR:
+ case IB_EVENT_QP_ACCESS_ERR:
+ case IB_EVENT_DEVICE_FATAL:
+ default:
+ printk(KERN_ERR "svcrdma: QP ERROR event %d received for QP=%p, "
+ "closing transport\n",
+ event->event, event->element.qp);
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ break;
+ }
+}
+
+/*
+ * Data Transfer Operation Tasklet
+ *
+ * Walks a list of transports with I/O pending, removing entries as
+ * they are added to the server's I/O pending list.
+ */
+static void dto_tasklet_func(unsigned long data)
+{
+ struct svcxprt_rdma *xprt;
+ unsigned long flags;
+
+ spin_lock_irqsave(&dto_lock, flags);
+ while (!list_empty(&dto_xprt_q)) {
+ xprt = list_entry(dto_xprt_q.next, struct svcxprt_rdma, sc_dto_q);
+ list_del_init(&xprt->sc_dto_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+ if (0==test_bit(SK_DEAD, &xprt->sc_xprt.sk_flags)) {
+ /* Serialize with svc_rdma_recvfrom which will also
+ * enqueue the transport
+ */
+ set_bit(SK_DATA, &xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&xprt->sc_xprt);
+ }
+ spin_lock_irqsave(&dto_lock, flags);
+ }
+ spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+/*
+ * rq_cq_reap - Process the RQ CQ.
+ *
+ * Take all completing WC off the CQE and enqueue the associated DTO context
+ * on the dto_q for the transport.
+ */
+static void
+rq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ int ret;
+ struct ib_wc wc;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ unsigned long flags;
+
+ rdma_stat_rq_poll ++;
+
+ while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
+ ctxt = (struct svc_rdma_op_ctxt*)(unsigned long)wc.wr_id;
+ ctxt->wc_status = wc.status;
+ ctxt->byte_len = wc.byte_len;
+ if (wc.status != IB_WC_SUCCESS) {
+ DBG_DUMP_WC(__FUNCTION__, &wc);
+ /* Close the transport */
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ svc_rdma_put_context(ctxt, 1);
+ continue;
+ }
+ spin_lock_irqsave(&xprt->sc_rq_dto_lock, flags);
+ list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ spin_unlock_irqrestore(&xprt->sc_rq_dto_lock, flags);
+ }
+
+ if (ctxt)
+ rdma_stat_rq_prod ++;
+}
+
+/*
+ * Receive Queue Completion Handler - potentially called on interrupt context.
+ *
+ * svc_sock_enqueue and the remainder of the svc core assumes
+ * uses _bh locks. Since the rq_comp_handler is called on interrupt
+ * context, we need to refer the handling of the I/O to a tasklet
+ */
+static void
+rq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct svcxprt_rdma *xprt = cq_context;
+ unsigned long flags;
+
+ ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+ rq_cq_reap(xprt);
+
+ /*
+ * If this transport is not already on the DTO transport queue,
+ * add it
+ */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (list_empty(&xprt->sc_dto_q))
+ list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+ tasklet_schedule(&dto_tasklet);
+}
+
+/*
+ * Send Queue Completion Handler - potentially called on interrupt context.
+ *
+ * - Purges the CQ
+ * - Wakes up threads waiting on SQ WR space
+ * - Wakes up threads waiting on the ORD throttle
+ * - Wakes up threads waiting for an RDMA_READ to complete.
+ */
+static void
+sq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct ib_wc wc;
+ struct ib_cq *cq = xprt->sc_sq_cq;
+ int ret;
+
+ rdma_stat_sq_poll ++;
+
+ while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
+ ctxt = (struct svc_rdma_op_ctxt*)(unsigned long)wc.wr_id;
+ xprt = ctxt->xprt;
+
+ if (wc.status != IB_WC_SUCCESS) {
+ /* Close the transport */
+ DBG_DUMP_WC(__FUNCTION__, &wc);
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ }
+
+ /* Decrement used SQ WR count */
+ atomic_dec(&xprt->sc_sq_count);
+ wake_up(&xprt->sc_send_wait);
+
+ switch (ctxt->wr_op) {
+ case IB_WR_SEND:
+ case IB_WR_RDMA_WRITE:
+ svc_rdma_put_context(ctxt,1);
+ break;
+
+ case IB_WR_RDMA_READ:
+ if (svcrdma_read_throttle) {
+ atomic_dec(&xprt->sc_read_count);
+ wake_up(&xprt->sc_read_wait);
+ }
+ /*
+ * Set the the RDMA_DONE flag in the context and
+ * wakeup any waiters.
+ */
+ set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+ wake_up(&ctxt->read_wait);
+ break;
+
+ default:
+ printk(KERN_ERR "svcrdma: unexpected completion type, "
+ "opcode=%d, status=%d\n",
+ wc.opcode, wc.status);
+ break;
+ }
+ }
+
+ if (ctxt)
+ rdma_stat_sq_prod ++;
+}
+
+void svc_sq_reap(struct svcxprt_rdma *xprt)
+{
+ sq_cq_reap(xprt);
+}
+
+void svc_rq_reap(struct svcxprt_rdma *xprt)
+{
+ rq_cq_reap(xprt);
+}
+
+static void
+sq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+ sq_cq_reap(cq_context);
+}
+
+static void
+create_context_cache(struct svcxprt_rdma *xprt,
+ int ctxt_count, int ctxt_bump, int ctxt_max)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+ int i;
+
+ xprt->sc_ctxt_max = ctxt_max;
+ xprt->sc_ctxt_bump = ctxt_bump;
+ xprt->sc_ctxt_cnt = 0;
+ xprt->sc_ctxt_head = NULL;
+ for (i=0; i < ctxt_count; i++) {
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+ if (ctxt) {
+ ctxt->next = xprt->sc_ctxt_head;
+ xprt->sc_ctxt_head = ctxt;
+ xprt->sc_ctxt_cnt ++;
+ }
+ }
+}
+
+static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+{
+ struct svc_rdma_op_ctxt *next;
+ if (!ctxt)
+ return;
+
+ do {
+ next = ctxt->next;
+ kfree(ctxt);
+ ctxt = next;
+ } while (next);
+}
+
+static struct svcxprt_rdma *rdma_create_xprt(int listener)
+{
+ struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
+
+ if (!cma_xprt)
+ return NULL;
+
+ INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
+ init_waitqueue_head(&cma_xprt->sc_send_wait);
+ init_waitqueue_head(&cma_xprt->sc_read_wait);
+
+ spin_lock_init(&cma_xprt->sc_lock);
+ spin_lock_init(&cma_xprt->sc_read_lock);
+ spin_lock_init(&cma_xprt->sc_ctxt_lock);
+ spin_lock_init(&cma_xprt->sc_rq_dto_lock);
+
+ cma_xprt->sc_ord = svcrdma_ord;
+
+ cma_xprt->sc_max_req_size = svcrdma_max_req_size;
+ cma_xprt->sc_max_requests = svcrdma_max_requests;
+ cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
+ atomic_set(&cma_xprt->sc_sq_count,0);
+ atomic_set(&cma_xprt->sc_read_count,0);
+
+ if (!listener) {
+ int reqs = cma_xprt->sc_max_requests;
+ create_context_cache(cma_xprt,
+ reqs << 1, /* starting size */
+ reqs, /* bump amount */
+ reqs +
+ cma_xprt->sc_sq_depth +
+ RPCRDMA_MAX_THREADS); /* max */
+
+ if (!cma_xprt->sc_ctxt_head) {
+ kfree(cma_xprt);
+ return NULL;
+ }
+ }
+
+ return cma_xprt;
+}
+
+static void svc_rdma_put(struct svc_sock *xprt)
+{
+ struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+
+ if (atomic_dec_and_test(&xprt->sk_inuse)) {
+ BUG_ON(! test_bit(SK_DEAD, &xprt->sk_flags));
+
+ printk("svcrdma: Destroying transport %p, cm_id=%p, "
+ "sk_flags=%lx\n",
+ xprt, rdma->sc_cm_id, xprt->sk_flags);
+
+ rdma_disconnect(rdma->sc_cm_id);
+ rdma_destroy_id(rdma->sc_cm_id);
+ rdma_destroy_xprt(rdma);
+ }
+}
+
+struct page *svc_rdma_get_page(void)
+{
+ struct page *page;
+
+ while ((page = alloc_page(GFP_KERNEL))==NULL) {
+ /* If we can't get memory, wait a bit and try again */
+ printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 jiffies.\n");
+ schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
+ }
+ return page;
+}
+
+int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
+{
+ struct ib_recv_wr recv_wr, *bad_recv_wr;
+ struct svc_rdma_op_ctxt *ctxt;
+ struct page *page;
+ unsigned long pa;
+ int sge_no;
+ int buflen;
+ int ret;
+
+ ctxt = svc_rdma_get_context(xprt);
+ buflen = 0;
+ ctxt->direction = DMA_FROM_DEVICE;
+ for (sge_no=0; buflen < xprt->sc_max_req_size; sge_no++) {
+ BUG_ON(sge_no >= xprt->sc_max_sge);
+ page = svc_rdma_get_page();
+ ctxt->pages[sge_no] = page;
+ pa = ib_dma_map_page(xprt->sc_cm_id->device,
+ page, 0, PAGE_SIZE,
+ DMA_FROM_DEVICE);
+ ctxt->sge[sge_no].addr = pa;
+ ctxt->sge[sge_no].length = PAGE_SIZE;
+ ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+ buflen += PAGE_SIZE;
+ }
+ ctxt->count = sge_no;
+ recv_wr.next = NULL;
+ recv_wr.sg_list = &ctxt->sge[0];
+ recv_wr.num_sge = ctxt->count;
+ recv_wr.wr_id = (u64)(unsigned long)ctxt;
+
+ ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+ return ret;
+}
+
+
+/*
+ * This function handles the CONNECT_REQUEST event on a listening
+ * endpoint. It is passed the cma_id for the _new_ connection. The context in
+ * this cma_id is inherited from the listening cma_id and is the svc_sock
+ * structure for the listening endpoint.
+ *
+ * This function creates a new xprt for the new connection and enqueues it on
+ * the accept queue for the listent xprt. When the listen thread is kicked, it
+ * will call the recvfrom method on the listen xprt which will accept the new
+ * connection.
+ */
+static void handle_connect_req(struct rdma_cm_id *new_cma_id)
+{
+ struct svcxprt_rdma *listen_xprt = new_cma_id->context;
+ struct svcxprt_rdma *newxprt;
+
+ /* Create a new transport */
+ newxprt = rdma_create_xprt(0);
+ if (!newxprt) {
+ dprintk("svcrdma: failed to create new transport\n");
+ return;
+ }
+ newxprt->sc_cm_id = new_cma_id;
+ new_cma_id->context = newxprt;
+ dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
+ newxprt, newxprt->sc_cm_id, listen_xprt);
+
+ /* Initialize the new transport */
+ newxprt->sc_xprt.sk_server = listen_xprt->sc_xprt.sk_server;
+ newxprt->sc_xprt.sk_lastrecv = get_seconds();
+ newxprt->sc_xprt.sk_delete = svc_rdma_delete;
+ newxprt->sc_xprt.sk_recvfrom = svc_rdma_recvfrom;
+ newxprt->sc_xprt.sk_sendto = svc_rdma_sendto;
+ newxprt->sc_xprt.sk_put = svc_rdma_put;
+ newxprt->sc_xprt.sk_prep_reply_buf = svc_rdma_prep_reply_buf;
+ newxprt->sc_xprt.sk_defer = svc_rdma_defer;
+ newxprt->sc_xprt.sk_revisit = svc_rdma_revisit;
+ newxprt->sc_xprt.sk_pool = NULL;
+
+ atomic_set(&newxprt->sc_xprt.sk_inuse, 1);
+ set_bit(SK_TEMP, &newxprt->sc_xprt.sk_flags);
+ INIT_LIST_HEAD(&newxprt->sc_xprt.sk_ready);
+ INIT_LIST_HEAD(&newxprt->sc_xprt.sk_list);
+ INIT_LIST_HEAD(&newxprt->sc_xprt.sk_deferred);
+ spin_lock_init(&newxprt->sc_xprt.sk_defer_lock);
+ mutex_init(&newxprt->sc_xprt.sk_mutex);
+
+ /* Enqueue the new transport on the accept queue of the listening
+ * transport */
+ spin_lock_bh(&listen_xprt->sc_lock);
+ list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
+ spin_unlock_bh(&listen_xprt->sc_lock);
+
+ listen_xprt->sc_xprt.sk_pool = NULL;
+ set_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&listen_xprt->sc_xprt);
+}
+
+/*
+ * Handles events generated on the listening endpoint. These events will be
+ * either be incoming connect requests or adapter removal events.
+ * @param cma_id The CMA ID for the listening endpoint
+ * @event the event being delivered.
+ */
+static int
+rdma_listen_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
+{
+ struct svcxprt_rdma *xprt = cma_id->context;
+ int ret = 0;
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, event=%d\n",
+ cma_id, cma_id->context, event->event);
+ handle_connect_req(cma_id);
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ dprintk("svcrdma: Connection completed on LISTEN xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt)
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ break;
+
+ default:
+ dprintk("svcrdma: Unexpected event on listening endpoint %p, event=%d\n",
+ cma_id, event->event);
+ break;
+ }
+
+ return ret;
+}
+
+static int
+rdma_cma_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
+{
+ struct svcxprt_rdma *xprt = cma_id->context;
+ int ret = 0;
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ dprintk("svcrdma: Connection completed on DTO xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ break;
+
+ case RDMA_CM_EVENT_DISCONNECTED:
+ dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt) {
+ xprt->sc_xprt.sk_pool = NULL;
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&xprt->sc_xprt);
+ }
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, event=%d\n",
+ cma_id, cma_id->context, event->event);
+ if (xprt) {
+ xprt->sc_xprt.sk_pool = NULL;
+ set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&xprt->sc_xprt);
+ }
+ break;
+
+ default:
+ dprintk("svcrdma: Unexpected event on DTO endpoint %p, event=%d\n",
+ cma_id, event->event);
+ break;
+ }
+
+ return ret;
+}
+
+/*
+ * Create a listening RDMA service endpoint
+ * @param serv the RPC service this instance will belong to
+ * @param protocol the protocol for the instance
+ * @param sa the address to bind the local interface to
+ * @return 0 on success, negative value for errors
+ */
+int svc_rdma_create_listen(struct svc_serv *serv, int protocol,
+ struct sockaddr *sa)
+{
+ struct rdma_cm_id *listen_id;
+ struct svcxprt_rdma *cma_xprt;
+ struct svc_sock *xprt;
+ int ret;
+
+ dprintk("svcrdma: Creating RDMA socket\n");
+
+ cma_xprt = rdma_create_xprt(1);
+ if (!cma_xprt)
+ return -ENOMEM;
+
+ xprt = &cma_xprt->sc_xprt;
+ xprt->sk_delete = svc_rdma_delete;
+ xprt->sk_recvfrom = svc_rdma_accept;
+ xprt->sk_put = svc_rdma_put;
+ xprt->sk_prep_reply_buf = svc_rdma_prep_reply_buf;
+ xprt->sk_server = serv;
+ xprt->sk_lastrecv = get_seconds();
+ INIT_LIST_HEAD(&xprt->sk_ready);
+ INIT_LIST_HEAD(&xprt->sk_list);
+ INIT_LIST_HEAD(&xprt->sk_deferred);
+ spin_lock_init(&xprt->sk_defer_lock);
+ mutex_init(&xprt->sk_mutex);
+ xprt->sk_pool = NULL;
+ atomic_set(&xprt->sk_inuse, 1);
+ spin_lock_bh(&serv->sv_lock);
+ list_add(&xprt->sk_list, &serv->sv_permsocks);
+ spin_unlock_bh(&serv->sv_lock);
+ clear_bit(SK_BUSY, &xprt->sk_flags);
+
+ /*
+ * We shouldn't receive any events (except device removal) on
+ * the id until we submit the listen request. Any events that
+ * we do receive will get logged as errors and ignored
+ */
+ listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
+ if (IS_ERR(listen_id)) {
+ ret = PTR_ERR(listen_id);
+ rdma_destroy_xprt(cma_xprt);
+ dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
+ return ret;
+ }
+ ret = rdma_bind_addr(listen_id, sa);
+ if (ret) {
+ ret = PTR_ERR(listen_id);
+ rdma_destroy_xprt(cma_xprt);
+ rdma_destroy_id(listen_id);
+ dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
+ return ret;
+ }
+ cma_xprt->sc_cm_id = listen_id;
+
+ /* The xprt is ready to process events at this point */
+ ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
+ if (ret) {
+ ret = PTR_ERR(listen_id);
+ rdma_destroy_id(listen_id);
+ rdma_destroy_xprt(cma_xprt);
+ dprintk("svcrdma: rdma_listen failed = %d\n", ret);
+ return ret;
+ }
+
+ return 0;
+}
+
+/*
+ * This is the sk_recvfrom function for listening endpoints. It's purpose is
+ * to accept incoming connections. The CMA callback handler has already
+ * created a new transport and attached the new CMA ID.
+ *
+ * There is a queue of pending connections hung on the listening
+ * transport. This queue contains the new svc_sock structure. This function
+ * takes svc_sock structures off the accept_q and completes the
+ * connection.
+ */
+static int
+svc_rdma_accept(struct svc_rqst *rqstp)
+{
+ struct svc_sock *xprt = rqstp->rq_sock;
+ struct svcxprt_rdma *listen_xprt;
+ struct svcxprt_rdma *newxprt;
+ struct rdma_conn_param conn_param;
+ struct ib_qp_init_attr qp_attr;
+ struct ib_device_attr devattr;
+ int ret;
+ int i;
+
+ listen_xprt = (struct svcxprt_rdma*)xprt;
+ if (list_empty(&listen_xprt->sc_accept_q)) {
+ printk(KERN_INFO
+ "svcrdma: woke-up with no pending connection!\n");
+ clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
+ BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
+ clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
+ return 0;
+ }
+
+ /* Get the next entry off the accept list */
+ spin_lock_bh(&listen_xprt->sc_lock);
+ newxprt = list_entry(listen_xprt->sc_accept_q.next,
+ struct svcxprt_rdma, sc_accept_q);
+ list_del_init(&newxprt->sc_accept_q);
+ spin_unlock_bh(&listen_xprt->sc_lock);
+
+ dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
+ newxprt, newxprt->sc_cm_id);
+
+ ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
+ if (ret) {
+ printk(KERN_ERR
+ "svcrdma: could not query device attributes on "
+ "device %p, rc=%d\n",
+ newxprt->sc_cm_id->device, ret);
+ goto errout;
+ }
+
+ /* Qualify the transport resource defaults with the
+ * capabilities of this particular device */
+ newxprt->sc_max_sge = min((size_t)devattr.max_sge,
+ (size_t)RPCSVC_MAXPAGES);
+ newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
+ (size_t)svcrdma_max_requests);
+ newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
+
+ newxprt->sc_ord = min((size_t)devattr.max_qp_rd_atom,
+ (size_t)svcrdma_ord);
+ spin_lock_bh(&rqstp->rq_server->sv_lock);
+ list_add(&newxprt->sc_xprt.sk_list, &rqstp->rq_server->sv_tempsocks);
+ rqstp->rq_server->sv_tmpcnt ++;
+ spin_unlock_bh(&rqstp->rq_server->sv_lock);
+
+ newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
+ if (IS_ERR(newxprt->sc_pd)) {
+ printk(KERN_ERR
+ "svcrdma: error creating PD for connect request\n");
+ ret = PTR_ERR(newxprt->sc_pd);
+ goto errout;
+ }
+ newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ sq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_sq_depth);
+ if (IS_ERR(newxprt->sc_sq_cq)) {
+ printk(KERN_ERR
+ "svcrdma: error creating SQ CQ for connect request\n");
+ ret = PTR_ERR(newxprt->sc_sq_cq);
+ goto errout;
+ }
+ newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ rq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_max_requests);
+ if (IS_ERR(newxprt->sc_rq_cq)) {
+ printk(KERN_ERR
+ "svcrdma: error creating RQ CQ for connect request\n");
+ ret = PTR_ERR(newxprt->sc_rq_cq);
+ goto errout;
+ }
+
+ memset(&qp_attr, 0, sizeof qp_attr);
+ qp_attr.event_handler = qp_event_handler;
+ qp_attr.qp_context = newxprt;
+ qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
+ qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+ qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
+ qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
+ qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ qp_attr.qp_type = IB_QPT_RC;
+ qp_attr.send_cq = newxprt->sc_sq_cq;
+ qp_attr.recv_cq = newxprt->sc_rq_cq;
+ printk("newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
+ "cm_id->device=%p, sc_pd->device=%p\n"
+ "qp_attr.cap.max_send_wr = %d\n"
+ "qp_attr.cap.max_recv_wr = %d\n"
+ "qp_attr.cap.max_send_sge = %d\n"
+ "qp_attr.cap.max_recv_sge = %d\n",
+ newxprt->sc_cm_id, newxprt->sc_pd,
+ newxprt->sc_cm_id->device, newxprt->sc_pd->device,
+ qp_attr.cap.max_send_wr,
+ qp_attr.cap.max_recv_wr,
+ qp_attr.cap.max_send_sge,
+ qp_attr.cap.max_recv_sge);
+
+ ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
+ if (ret) {
+ /*
+ * XXX: This is a hack. We need a xx_request_qp interface
+ * that will adjust the qp_attr's with a best-effort
+ * number
+ */
+ qp_attr.cap.max_send_sge -= 2;
+ qp_attr.cap.max_recv_sge -= 2;
+ ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
+ if (ret) {
+ printk(KERN_ERR "svcrdma: failed to create QP, ret=%d\n", ret);
+ goto errout;
+ }
+ newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
+ newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
+ newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
+ newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
+ }
+ newxprt->sc_qp = newxprt->sc_cm_id->qp;
+ DBG_DUMP_QP(__FUNCTION__, newxprt->sc_qp, &qp_attr);
+
+ /* Register all of physical memory */
+ newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd,
+ IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_WRITE);
+ if (IS_ERR(newxprt->sc_phys_mr)) {
+ ret = PTR_ERR(newxprt->sc_phys_mr);
+ printk(KERN_ERR
+ "svcrdma: Failed to create DMA MR ret=%d\n", ret);
+ goto errout;
+ }
+
+ /* Post receive buffers */
+ for (i=0; i < newxprt->sc_max_requests; i++)
+ if ((ret = svc_rdma_post_recv(newxprt))) {
+ printk(KERN_ERR
+ "svcrdma: failure posting receive buffers\n");
+ goto errout;
+ }
+
+ /* Swap out the handler */
+ newxprt->sc_cm_id->event_handler = rdma_cma_handler;
+
+ /* We will get a getattr request from the client before we see
+ * the connect complete event because DTO's run on tasklets,
+ * and connection events run on threads
+ */
+ clear_bit(SK_BUSY, &newxprt->sc_xprt.sk_flags);
+
+ /* Accept Connection */
+ memset(&conn_param, 0, sizeof conn_param);
+ conn_param.responder_resources = 0;
+ conn_param.initiator_depth = newxprt->sc_ord;
+ ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
+ if (ret) {
+ printk(KERN_ERR
+ "svcrdma: failed to accept new connection, ret=%d\n",
+ ret);
+ goto errout;
+ }
+
+ printk("svcrdma: new connection %p accepted with the following "
+ "attributes:\n"
+ "\tlocal_ip : %d.%d.%d.%d\n"
+ "\tlocal_port : %d\n"
+ "\tremote_ip : %d.%d.%d.%d\n"
+ "\tremote_port : %d\n"
+ "\tmax_sge : %d\n"
+ "\tsq_depth : %d\n"
+ "\tmax_requests : %d\n"
+ "\tread throttle : %s\n"
+ "\tord : %d\n",
+ newxprt,
+ NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_port),
+ NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_port),
+ newxprt->sc_max_sge,
+ newxprt->sc_sq_depth,
+ newxprt->sc_max_requests,
+ (svcrdma_read_throttle?"TRUE":"FALSE"),
+ newxprt->sc_ord);
+
+ spin_lock_bh(&listen_xprt->sc_lock);
+ if (list_empty(&listen_xprt->sc_accept_q))
+ clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
+ spin_unlock_bh(&listen_xprt->sc_lock);
+ listen_xprt->sc_xprt.sk_pool = NULL;
+ BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
+ clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
+ svc_sock_enqueue(&listen_xprt->sc_xprt);
+
+ ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+ return ret;
+
+ errout:
+ printk(KERN_ERR "svcrdma: failure accepting new connection rc=%d.\n",
+ ret);
+ BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
+ clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
+ clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
+ rdma_destroy_id(newxprt->sc_cm_id);
+ rdma_destroy_xprt(newxprt);
+ return 0; /* ret; */
+}
+
+static void svc_rdma_delete(struct svc_sock *xprt)
+{
+ struct svc_serv *serv = xprt->sk_server;
+
+ spin_lock_bh(&serv->sv_lock);
+ if (!test_and_set_bit(SK_DETACHED, &xprt->sk_flags))
+ list_del_init(&xprt->sk_list);
+
+ if (!test_and_set_bit(SK_DEAD, &xprt->sk_flags)) {
+ BUG_ON(atomic_read(&xprt->sk_inuse)<2);
+ atomic_dec(&xprt->sk_inuse);
+ if (test_bit(SK_TEMP, &xprt->sk_flags))
+ serv->sv_tmpcnt--;
+ }
+ spin_unlock_bh(&serv->sv_lock);
+}
+
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
+{
+ if (xprt->sc_qp)
+ ib_destroy_qp(xprt->sc_qp);
+
+ if (xprt->sc_sq_cq)
+ ib_destroy_cq(xprt->sc_sq_cq);
+
+ if (xprt->sc_rq_cq)
+ ib_destroy_cq(xprt->sc_rq_cq);
+
+ if (xprt->sc_pd)
+ ib_dealloc_pd(xprt->sc_pd);
+
+ destroy_context_cache(xprt->sc_ctxt_head);
+
+ if (xprt->sc_xprt.sk_info_authunix != NULL)
+ svcauth_unix_info_release(xprt->sc_xprt.sk_info_authunix);
+
+ kfree(xprt);
+}
+
+int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
+{
+ struct ib_send_wr *bad_wr;
+ int ret;
+
+ if (test_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags))
+ return 0;
+
+ BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
+ BUG_ON(((struct svc_rdma_op_ctxt*)(unsigned long)wr->wr_id)->wr_op !=
+ wr->opcode);
+ /* If the SQ is full, wait until an SQ entry is available */
+ while (1) {
+ spin_lock_bh(&xprt->sc_lock);
+ if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
+ spin_unlock_bh(&xprt->sc_lock);
+ rdma_stat_sq_starve ++;
+ /* First see if we can opportunistically reap some SQ WR */
+ sq_cq_reap(xprt);
+
+ /* Wait until SQ WR available if SQ still full*/
+ wait_event(xprt->sc_send_wait,
+ atomic_read(&xprt->sc_sq_count) < xprt->sc_sq_depth);
+ continue;
+ }
+ /* Bumped used SQ WR count and post */
+ ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
+ if (!ret)
+ atomic_inc(&xprt->sc_sq_count);
+ else {
+ printk(KERN_ERR "svcrdma: failed to post SQ WR rc=%d, "
+ "sc_sq_count=%d, sc_sq_depth=%d\n",
+ ret, atomic_read(&xprt->sc_sq_count),
+ xprt->sc_sq_depth);
+ }
+ spin_unlock_bh(&xprt->sc_lock);
+ break;
+ }
+
+ return ret;
+}
+
+int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err)
+{
+ struct ib_send_wr err_wr;
+ struct ib_sge sge;
+ struct page *p;
+ struct svc_rdma_op_ctxt *ctxt;
+ u32 *va;
+ int length;
+ int ret;
+
+ p = svc_rdma_get_page();
+ va = page_address(p);
+
+ /* XDR encode error */
+ length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+ /* Prepare SGE for local address */
+ sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
+ p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
+ sge.lkey = xprt->sc_phys_mr->lkey;
+ sge.length = length;
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->count = 1;
+ ctxt->pages[0] = p;
+
+ /* Prepare SEND WR */
+ memset(&err_wr, 0, sizeof err_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ err_wr.wr_id = (unsigned long)ctxt;
+ err_wr.sg_list = &sge;
+ err_wr.num_sge = 1;
+ err_wr.opcode = IB_WR_SEND;
+ err_wr.send_flags = IB_SEND_SIGNALED;
+
+ /* Post It */
+ ret = svc_rdma_send(xprt, &err_wr);
+ if (ret) {
+ dprintk("svcrdma: Error posting send = %d\n", ret);
+ svc_rdma_put_context(ctxt,1);
+ }
+
+ return ret;
+}
+
+/*
+ * Setup the reply buffer for the svc_process function to write the
+ * RPC into.
+ */
+static int svc_rdma_prep_reply_buf(struct svc_rqst *rqstp)
+{
+ struct kvec *resv = &rqstp->rq_res.head[0];
+
+ /* setup response xdr_buf.
+ * Initially it has just one page
+ */
+ rqstp->rq_resused = 1;
+ resv->iov_base = page_address(rqstp->rq_respages[0]);
+ resv->iov_len = 0;
+ rqstp->rq_res.pages = rqstp->rq_respages+1;
+ rqstp->rq_res.len = 0;
+ rqstp->rq_res.page_base = 0;
+ rqstp->rq_res.page_len = 0;
+ rqstp->rq_res.buflen = PAGE_SIZE;
+ rqstp->rq_res.tail[0].iov_base = NULL;
+ rqstp->rq_res.tail[0].iov_len = 0;
+
+ return 0;
+}
+
+/*
+ * This request cannot be handled right now. Allocate a structure to
+ * keep it's state pending completion processing. To accomplish this, the
+ * function creates an svc_rdma_op_ctxt that looks like a receive completion and
+ * enqueues it on the svc_sock's deferred request list. When*
+ * svc_rdma_recvfrom is subsequently called, it first checks if there is a
+ * deferred RPC and if there is:
+ * - Takes the deferred request off the deferred request queue
+ * - Extracts the svc_rdma_op_ctxt from the deferred request structure
+ * - Frees the deferred request structure
+ * - Skips the ib_cq_poll call and processes the svc_rdma_op_ctxt as if it had
+ * just come out of an WR pulled from the CQ.
+ */
+static struct cache_deferred_req *
+svc_rdma_defer(struct cache_req *req)
+{
+ struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
+ struct svcxprt_rdma *xprt;
+ struct svc_rdma_deferred_req *dr;
+
+ dprintk("svcrdma: deferring request on \n"
+ " rqstp=%p\n"
+ " rqstp->rq_arg.len=%d\n",
+ rqstp,
+ rqstp->rq_arg.len);
+
+ /* if more than a page, give up FIXME */
+ if (rqstp->rq_arg.page_len)
+ return NULL;
+ BUG_ON(rqstp->rq_deferred);
+ xprt = (struct svcxprt_rdma*)rqstp->rq_sock;
+ retry:
+ dr = kmalloc(sizeof(struct svc_rdma_deferred_req), GFP_KERNEL);
+ if (!dr) {
+ printk(KERN_INFO "svcrdma: sleeping waiting for memory\n");
+ schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
+ goto retry;
+ }
+ dr->req.handle.owner = rqstp->rq_server;
+ dr->req.prot = rqstp->rq_prot;
+ dr->req.addr = rqstp->rq_addr;
+ dr->req.daddr = rqstp->rq_daddr;
+ dr->req.argslen = rqstp->rq_arg.len >> 2;
+ dr->arg_page = rqstp->rq_pages[0];
+ dr->arg_len = rqstp->rq_arg.len;
+ rqstp->rq_pages[0] = svc_rdma_get_page();
+
+ atomic_inc(&rqstp->rq_sock->sk_inuse);
+ dr->req.svsk = rqstp->rq_sock;
+ dr->req.handle.revisit = rqstp->rq_sock->sk_revisit;
+
+ return &dr->req.handle;
+}
+
+/*
+ * This is called by the cache code when it either gets an answer from
+ * a user-mode daemon or gives up...as indicated by 'too_many'
+ */
+static void svc_rdma_revisit(struct cache_deferred_req *dreq, int too_many)
+{
+ struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle);
+ struct svc_serv *serv = dreq->owner;
+ struct svc_sock *svsk;
+
+ if (unlikely(too_many)) {
+ printk(KERN_INFO "svcrdma: giving up on deferred request "
+ "on svc_sock=%p, too many outstanding\n", dr->svsk);
+ dr->svsk->sk_put(dr->svsk);
+ kfree(dr);
+ return;
+ }
+ svsk = dr->svsk;
+ dprintk("svcrdma: revisit deferred RPC on xprt=%p\n", svsk);
+ dr->svsk = NULL;
+ spin_lock_bh(&serv->sv_lock);
+ list_add(&dr->handle.recent, &svsk->sk_deferred);
+ spin_unlock_bh(&serv->sv_lock);
+ svsk->sk_pool = NULL;
+ set_bit(SK_DEFERRED, &svsk->sk_flags);
+ svc_sock_enqueue(svsk);
+ svsk->sk_put(svsk);
+}
+


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs


2007-05-23 18:59:40

by Talpey, Thomas

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

At 02:37 PM 5/23/2007, Trond Myklebust wrote:
>On Wed, 2007-05-23 at 14:19 -0400, Talpey, Thomas wrote:
>> I feel strongly that we need a good, workable defer mechanism
>> that actually defers. Yes, it's maybe hard. But it's important!
>
>Unless you have a way of capping the number of requests that are
>deferred, you can quickly end up turning this into a resource issue.

Agreed, and I will add that it's because of the thread-based nfsd
architecture. The nfsd threads can't afford to wait (though frankly
they do today, in the filesystem vfs calls and also in write gathering).

>On TCP sockets you can probably set a per-socket limit on the number of
>deferrals. As soon as you hit that number, then just stop handling any
>further requests on that particular socket (i.e. leave any further data
>queued in the socket buffer and let the server thread go to work on
>another socket) until a sufficient number of deferrals have been cleared
>out.
>I assume that you could devise a similar scheme with RDMA pretty much by
>substituting the word 'slot' for 'socket' in the previous paragraph,
>right?

Yes, absolutely the simplest aproach is to stop processing of new requests
when the nfsd's back up. The apparent latency will rise, but the clients will
become flow controlled due to the RDMA credits, and this in turn will push
back to their RPC stream.

Personally, I'm not completely sure I see the problem here. If an RDMA
adapter is going out to lunch and hanging what should be a very fast
operation (the RDMA Read data pull), then that's an adapter problem
which we should address in the adapter layer, or via some sort of interface
hardening between it and RPC. Trying to push the issue back down the RPC
pipe to the sending peer seems to me a very unworkable solution.

Tom.


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 20:01:37

by Trond Myklebust

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Wed, 2007-05-23 at 14:59 -0400, Talpey, Thomas wrote:
> Personally, I'm not completely sure I see the problem here. If an RDMA
> adapter is going out to lunch and hanging what should be a very fast
> operation (the RDMA Read data pull), then that's an adapter problem
> which we should address in the adapter layer, or via some sort of interface
> hardening between it and RPC. Trying to push the issue back down the RPC
> pipe to the sending peer seems to me a very unworkable solution.

AFAIK, the most common reason for wanting to defer a request is if the
server needs to make an upcall in order to talk to mountd, or to resolve
an NFSv4 name using idmapd. I don't think you really want to treat
hardware failures by deferring requests...

Trond


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 21:00:44

by Talpey, Thomas

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

At 04:01 PM 5/23/2007, Trond Myklebust wrote:
>On Wed, 2007-05-23 at 14:59 -0400, Talpey, Thomas wrote:
>> Personally, I'm not completely sure I see the problem here. If an RDMA
>> adapter is going out to lunch and hanging what should be a very fast
>> operation (the RDMA Read data pull), then that's an adapter problem
>> which we should address in the adapter layer, or via some sort of interface
>> hardening between it and RPC. Trying to push the issue back down the RPC
>> pipe to the sending peer seems to me a very unworkable solution.
>
>AFAIK, the most common reason for wanting to defer a request is if the
>server needs to make an upcall in order to talk to mountd, or to resolve
>an NFSv4 name using idmapd. I don't think you really want to treat
>hardware failures by deferring requests...

Well, the most common occurrence would be a lost conenction, this
would prevent sending even nfserr_jukebox. I'm suggesting that if
we're concerned about using nfsd thread context to pull data, then
we should also be concerned about calling into filesystems, which might
hang on their storage adapters, or whatever just as easily.

Basically, I'm saying there shouldn't be any special handling for the
RDMA Reads used to pull write data. In the success case, they happen
quite rapidly (at wire speed), and in the failure case, there isn't any
peer to talk to anyway. So what are we protecting?

Tom.


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-24 08:35:23

by Greg Banks

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Wed, May 23, 2007 at 05:00:03PM -0400, Talpey, Thomas wrote:
> At 04:01 PM 5/23/2007, Trond Myklebust wrote:
> >On Wed, 2007-05-23 at 14:59 -0400, Talpey, Thomas wrote:
> >> Personally, I'm not completely sure I see the problem here. If an RDMA
> >> adapter is going out to lunch and hanging what should be a very fast
> >> operation (the RDMA Read data pull), then that's an adapter problem
> >> which we should address in the adapter layer, or via some sort of interface
> >> hardening between it and RPC. Trying to push the issue back down the RPC
> >> pipe to the sending peer seems to me a very unworkable solution.
> >
> >AFAIK, the most common reason for wanting to defer a request is if the
> >server needs to make an upcall in order to talk to mountd,

This is the original and AFAICT only reason svc_defer() is called.

> > or to resolve
> >an NFSv4 name using idmapd.

It seems the idmap code deliberately circumvents the asynchronous
defer/revisit behaviour, and has code which blocks the calling thread
for up to 1 second in the case of a cache miss and subsequent upcall
to userspace. After 1 second it gives up.

So with NFSv4, if the LDAP server goes AWOL, some portion of NFS
calls will experience multiple-second delays, 1 second for each user
and group name in the call. Wonderful.

> > I don't think you really want to treat
> >hardware failures by deferring requests...

Agreed, the right way to handle hardware issues is disconnect.

> Well, the most common occurrence would be a lost conenction, this
> would prevent sending even nfserr_jukebox. I'm suggesting that if
> we're concerned about using nfsd thread context to pull data, then
> we should also be concerned about calling into filesystems, which might
> hang on their storage adapters, or whatever just as easily.

Two comments.

Firstly, some of us *are* concerned about those issues

http://marc.info/?l=linux-nfs&m=114683005119982&w=2
http://oss.sgi.com/archives/xfs/2007-04/msg00114.html

Secondly, there's a fundamental difference between blocking
for storage-side reasons and blocking for network-side reasons.

The former is effectively internal(*) to the NAS server and reflects
it's inherent capability to provide service. If the disks are broken,
then mechanisms internal to the server host (RAID, failover, whatever)
take care of this. So blocking (for short periods) in the filesystem
because the disks are fully loaded is fine, in fact this is the
fundamental purpose of the nfsd threads.

The latter is external to the server and is subject to the vagaries
of client machines, which can have hardware faults, software flaws,
or even be malicious and attempting to crash the server or lock it up.
Here we have a service boundary which the knfsd code needs to enforce.
We need firstly to protect the server from the effects of bad clients
and secondly to protect other clients from the effects of bad clients.

(*) Here I am ignoring the case of NFS exporting a clustered fs

> Basically, I'm saying there shouldn't be any special handling for the
> RDMA Reads used to pull write data. In the success case, they happen
> quite rapidly (at wire speed), and in the failure case, there isn't any
> peer to talk to anyway. So what are we protecting?

All the *other* clients who can't get any service, or get slower
service, because many nfsd threads are blocked. The problem here
is fairness between multiple clients in the face of a few greedy,
broken or malicious ones.

Greg.
--
Greg Banks, R&D Software Engineer, SGI Australian Software Group.
Apparently, I'm Bedevere. Which MPHG character are you?
I don't speak for SGI.

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-24 13:46:32

by Talpey, Thomas

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

At 04:35 AM 5/24/2007, Greg Banks wrote:
>So with NFSv4, if the LDAP server goes AWOL, some portion of NFS
>calls will experience multiple-second delays, 1 second for each user
>and group name in the call. Wonderful.
...
>> we should also be concerned about calling into filesystems, which might
>> hang on their storage adapters, or whatever just as easily.
>
>Two comments.
>
>Firstly, some of us *are* concerned about those issues

Great! I like the idea of a nonblocking FS API that nfsd can use, though
a full asynchronous API (with a cancel capability) might be better. This
is just an aside to the current discussion, of course.


>Secondly, there's a fundamental difference between blocking
>for storage-side reasons and blocking for network-side reasons.
>...
>The latter is external to the server and is subject to the vagaries
>of client machines, which can have hardware faults, software flaws,
>or even be malicious and attempting to crash the server or lock it up.
>Here we have a service boundary which the knfsd code needs to enforce.
>We need firstly to protect the server from the effects of bad clients
>and secondly to protect other clients from the effects of bad clients.

Oh, absolutely the knfsd needs to have some sort of hardening from this
type of issue. I think a timer on any RDMA Read wire operation would be
very well advised. And, if the timer fires, the entire WRITE operation would
obviously be aborted, this in turn would naturally indicate the knfsd should
disconnect (terminating all other client operations), because that is the only
way to abort an in-progress RDMA.

My concern is using nfserr_jukebox to somehow manage the queue of RDMA
Read operations the server is processing, as was originally suggested.
You can think of the adapter's RDMA Read engine as a very short work
queue - it will generally be quite busy with work queued. The good news is,
it is a very fast engine. So, I think it is reasonable to use a timeout of just
a few seconds once an operation begins. But I don't think it's reasonable to
somehow limit the amount of WRITEs the server should handle in order to
simplify this.

>All the *other* clients who can't get any service, or get slower
>service, because many nfsd threads are blocked. The problem here
>is fairness between multiple clients in the face of a few greedy,
>broken or malicious ones.

So, the attack you're suggesting is a client would issue a large number
of chunked WRITES, and then delay the resulting RDMA Reads that the
server issued to fetch the data, in an attempt to tie up all the server
threads? That would be a challenging attack to implement, but I guess
I would say there are several things that will protect us here.

- the client can only send as many WRITES as he has credits to send,
this is a server-managed limit
- the server is free to leave client operations in their receive buffers
unprocessed until it cares to execute them, as is done for tcp etc.
- the client can't block operations to other connections (clients), i.e
the RDMA Read limits are per-connection only.
- any failure such as an RDMA Read timer expiration will cause the
connection to be lost, immediately freeing up all threads servicing
that client.

Bottom line, my feeling is that adding a timeout to the RDMA Read
requests we make of the local adapter is all we need to implement
the necessary protection. If we want to address the situation for
N nfsd threads to fairly service a much larger number of arriving client
requests, that's a much deeper issue (and a good one, but not an
RDMA one).

Tom.


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-18 19:07:44

by Trond Myklebust

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Fri, 2007-05-18 at 12:45 -0500, Tom Tucker wrote:
> This file implements the core transport data management and I/O
> path. The I/O path for RDMA involves receiving callbacks on interrupt
> context. Since all the svc transport locks are _bh locks we enqueue the
> transport on a list, schedule a tasklet to dequeue data indications from
> the RDMA completion queue. The tasklet in turn takes _bh locks to
> enqueue receive data indications on a list for the transport. The
> svc_rdma_recvfrom transport function dequeues data from this list in an
> NFSD thread context.
>
> Signed-off-by: Tom Tucker <[email protected]>
> ---
>
> net/sunrpc/svc_rdma_transport.c | 1199 +++++++++++++++++++++++++++++++++++++++
> 1 files changed, 1199 insertions(+), 0 deletions(-)
>
> diff --git a/net/sunrpc/svc_rdma_transport.c b/net/sunrpc/svc_rdma_transport.c
> new file mode 100644
> index 0000000..8b5ddda
> --- /dev/null
> +++ b/net/sunrpc/svc_rdma_transport.c
> @@ -0,0 +1,1199 @@
> +/*
> + * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
> + *
> + * This software is available to you under a choice of one of two
> + * licenses. You may choose to be licensed under the terms of the GNU
> + * General Public License (GPL) Version 2, available from the file
> + * COPYING in the main directory of this source tree, or the BSD-type
> + * license below:
> + *
> + * Redistribution and use in source and binary forms, with or without
> + * modification, are permitted provided that the following conditions
> + * are met:
> + *
> + * Redistributions of source code must retain the above copyright
> + * notice, this list of conditions and the following disclaimer.
> + *
> + * Redistributions in binary form must reproduce the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer in the documentation and/or other materials provided
> + * with the distribution.
> + *
> + * Neither the name of the Network Appliance, Inc. nor the names of
> + * its contributors may be used to endorse or promote products
> + * derived from this software without specific prior written
> + * permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + *
> + * Author: Tom Tucker <[email protected]>
> + */
> +
> +#include <asm/semaphore.h>
> +#include <linux/device.h>
> +#include <linux/in.h>
> +#include <linux/err.h>
> +#include <linux/time.h>
> +#include <linux/delay.h>
> +
> +#include <linux/sunrpc/svcsock.h>
> +#include <linux/sunrpc/debug.h>
> +#include <linux/sunrpc/rpc_rdma.h>
> +#include <linux/mm.h> /* num_physpages */
> +#include <linux/spinlock.h>
> +#include <linux/net.h>
> +#include <net/sock.h>
> +#include <asm/io.h>
> +#include <rdma/ib_verbs.h>
> +#include <rdma/rdma_cm.h>
> +#include <net/ipv6.h>
> +#include <linux/sunrpc/svc_rdma.h>
> +#include "svc_rdma_debug.h"
> +
> +static int svc_rdma_accept(struct svc_rqst *rqstp);
> +static void svc_rdma_delete(struct svc_sock *xprt);
> +static void rdma_destroy_xprt(struct svcxprt_rdma *xprt);
> +static void svc_rdma_put(struct svc_sock *xprt);
> +static int svc_rdma_prep_reply_buf(struct svc_rqst *rqstp);
> +static void dto_tasklet_func(unsigned long data);
> +static struct cache_deferred_req *svc_rdma_defer(struct cache_req *req);
> +static void svc_rdma_revisit(struct cache_deferred_req *dreq, int too_many);
> +
> +DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
> +static spinlock_t dto_lock = SPIN_LOCK_UNLOCKED;
> +static LIST_HEAD(dto_xprt_q);
> +
> +static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
> +{
> + int target;
> + int at_least_one = 0;
> + struct svc_rdma_op_ctxt *ctxt;
> + unsigned long flags;
> +
> + target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump,
> + xprt->sc_ctxt_max);
> +
> + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);

Why do you need an irqsafe spinlock to protect this list?

> + while (xprt->sc_ctxt_cnt < target) {
> + xprt->sc_ctxt_cnt ++;
> + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> +
> + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
> +
> + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);

You've now dropped the spinlock. How can you know that the condition
xprt->sc_ctxt_cnt <= target is still valid?

> + if (ctxt) {
> + at_least_one = 1;
> + ctxt->next = xprt->sc_ctxt_head;
> + xprt->sc_ctxt_head = ctxt;
> + } else {
> + /* kmalloc failed...give up for now */
> + xprt->sc_ctxt_cnt --;
> + break;
> + }
> + }
> + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> +
> + return at_least_one;
> +}
> +
> +struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
> +{
> + struct svc_rdma_op_ctxt *ctxt;
> + unsigned long flags;
> +
> + while (1) {
> + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
> + if (unlikely(xprt->sc_ctxt_head == NULL)) {
> + /* Try to bump my cache. */
> + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> +
> + if (rdma_bump_context_cache(xprt))
> + continue;
> +
> + printk(KERN_INFO "svcrdma: sleeping waiting for context "
> + "memory on xprt=%p\n",
> + xprt);
> + schedule_timeout_uninterruptible(msecs_to_jiffies(500));

(HZ >> 1)
However this is rather naughty: you are tying up an nfsd thread that
could be put to doing useful work elsewhere.

> + continue;
> + }
> + ctxt = xprt->sc_ctxt_head;
> + xprt->sc_ctxt_head = ctxt->next;
> + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> + ctxt->xprt = xprt;
> + INIT_LIST_HEAD(&ctxt->dto_q);
> + break;
> + }
> + ctxt->count = 0;
> + return ctxt;
> +}
> +
> +void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
> +{
> + unsigned long flags;
> + struct svcxprt_rdma *xprt;
> + int i;
> +
> + BUG_ON(!ctxt);
> + xprt = ctxt->xprt;
> + if (free_pages) {
> + for (i=0; i < ctxt->count; i++)
> + put_page(ctxt->pages[i]);
> + }
> +
> + for (i=0; i < ctxt->count; i++) {
> + dma_unmap_single(xprt->sc_cm_id->device->dma_device,
> + ctxt->sge[i].addr,
> + ctxt->sge[i].length,
> + ctxt->direction);
> + }
> + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
> + ctxt->next = xprt->sc_ctxt_head;
> + xprt->sc_ctxt_head = ctxt;
> + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> +}
> +
> +/* ib_cq event handler */
> +static void cq_event_handler(struct ib_event *event, void *context)
> +{
> + struct svcxprt_rdma *xprt = (struct svcxprt_rdma *)context;
> + printk(KERN_INFO "svcrdma: received CQ event id=%d, context=%p\n",
> + event->event, context);
> + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> +}
> +
> +/* QP event handler */
> +static void qp_event_handler(struct ib_event *event, void *context)
> +{
> + struct svcxprt_rdma *xprt = context;
> +
> + switch (event->event) {
> + /* These are considered benign events */
> + case IB_EVENT_PATH_MIG:
> + case IB_EVENT_COMM_EST:
> + case IB_EVENT_SQ_DRAINED:
> + case IB_EVENT_QP_LAST_WQE_REACHED:
> + printk(KERN_INFO "svcrdma: QP event %d received for QP=%p\n",
> + event->event, event->element.qp);
> + break;
> + /* These are considered fatal events */
> + case IB_EVENT_PATH_MIG_ERR:
> + case IB_EVENT_QP_FATAL:
> + case IB_EVENT_QP_REQ_ERR:
> + case IB_EVENT_QP_ACCESS_ERR:
> + case IB_EVENT_DEVICE_FATAL:
> + default:
> + printk(KERN_ERR "svcrdma: QP ERROR event %d received for QP=%p, "
> + "closing transport\n",
> + event->event, event->element.qp);
> + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> + break;
> + }
> +}
> +
> +/*
> + * Data Transfer Operation Tasklet
> + *
> + * Walks a list of transports with I/O pending, removing entries as
> + * they are added to the server's I/O pending list.
> + */
> +static void dto_tasklet_func(unsigned long data)
> +{
> + struct svcxprt_rdma *xprt;
> + unsigned long flags;
> +
> + spin_lock_irqsave(&dto_lock, flags);
> + while (!list_empty(&dto_xprt_q)) {
> + xprt = list_entry(dto_xprt_q.next, struct svcxprt_rdma, sc_dto_q);
> + list_del_init(&xprt->sc_dto_q);
> + spin_unlock_irqrestore(&dto_lock, flags);
> + if (0==test_bit(SK_DEAD, &xprt->sc_xprt.sk_flags)) {
> + /* Serialize with svc_rdma_recvfrom which will also
> + * enqueue the transport
> + */
> + set_bit(SK_DATA, &xprt->sc_xprt.sk_flags);
> + svc_sock_enqueue(&xprt->sc_xprt);
> + }
> + spin_lock_irqsave(&dto_lock, flags);
> + }
> + spin_unlock_irqrestore(&dto_lock, flags);
> +}
> +
> +/*
> + * rq_cq_reap - Process the RQ CQ.
> + *
> + * Take all completing WC off the CQE and enqueue the associated DTO context
> + * on the dto_q for the transport.
> + */
> +static void
> +rq_cq_reap(struct svcxprt_rdma *xprt)
> +{
> + int ret;
> + struct ib_wc wc;
> + struct svc_rdma_op_ctxt *ctxt = NULL;
> + unsigned long flags;
> +
> + rdma_stat_rq_poll ++;

Is this a global variable? How are you ensuring atomicity above?

> +
> + while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
> + ctxt = (struct svc_rdma_op_ctxt*)(unsigned long)wc.wr_id;
> + ctxt->wc_status = wc.status;
> + ctxt->byte_len = wc.byte_len;
> + if (wc.status != IB_WC_SUCCESS) {
> + DBG_DUMP_WC(__FUNCTION__, &wc);
> + /* Close the transport */
> + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> + svc_rdma_put_context(ctxt, 1);
> + continue;
> + }
> + spin_lock_irqsave(&xprt->sc_rq_dto_lock, flags);
> + list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
> + spin_unlock_irqrestore(&xprt->sc_rq_dto_lock, flags);
> + }
> +
> + if (ctxt)
> + rdma_stat_rq_prod ++;
> +}
> +
> +/*
> + * Receive Queue Completion Handler - potentially called on interrupt context.
> + *
> + * svc_sock_enqueue and the remainder of the svc core assumes
> + * uses _bh locks. Since the rq_comp_handler is called on interrupt
> + * context, we need to refer the handling of the I/O to a tasklet
> + */
> +static void
> +rq_comp_handler(struct ib_cq *cq, void *cq_context)
> +{
> + struct svcxprt_rdma *xprt = cq_context;
> + unsigned long flags;
> +
> + ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
> + rq_cq_reap(xprt);
> +
> + /*
> + * If this transport is not already on the DTO transport queue,
> + * add it
> + */
> + spin_lock_irqsave(&dto_lock, flags);
> + if (list_empty(&xprt->sc_dto_q))
> + list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
> + spin_unlock_irqrestore(&dto_lock, flags);
> + tasklet_schedule(&dto_tasklet);
> +}
> +
> +/*
> + * Send Queue Completion Handler - potentially called on interrupt context.
> + *
> + * - Purges the CQ
> + * - Wakes up threads waiting on SQ WR space
> + * - Wakes up threads waiting on the ORD throttle
> + * - Wakes up threads waiting for an RDMA_READ to complete.
> + */
> +static void
> +sq_cq_reap(struct svcxprt_rdma *xprt)
> +{
> + struct svc_rdma_op_ctxt *ctxt = NULL;
> + struct ib_wc wc;
> + struct ib_cq *cq = xprt->sc_sq_cq;
> + int ret;
> +
> + rdma_stat_sq_poll ++;
> +
> + while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
> + ctxt = (struct svc_rdma_op_ctxt*)(unsigned long)wc.wr_id;
> + xprt = ctxt->xprt;
> +
> + if (wc.status != IB_WC_SUCCESS) {
> + /* Close the transport */
> + DBG_DUMP_WC(__FUNCTION__, &wc);
> + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> + }
> +
> + /* Decrement used SQ WR count */
> + atomic_dec(&xprt->sc_sq_count);
> + wake_up(&xprt->sc_send_wait);
> +
> + switch (ctxt->wr_op) {
> + case IB_WR_SEND:
> + case IB_WR_RDMA_WRITE:
> + svc_rdma_put_context(ctxt,1);
> + break;
> +
> + case IB_WR_RDMA_READ:
> + if (svcrdma_read_throttle) {
> + atomic_dec(&xprt->sc_read_count);
> + wake_up(&xprt->sc_read_wait);
> + }
> + /*
> + * Set the the RDMA_DONE flag in the context and
> + * wakeup any waiters.
> + */
> + set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
> + wake_up(&ctxt->read_wait);
> + break;
> +
> + default:
> + printk(KERN_ERR "svcrdma: unexpected completion type, "
> + "opcode=%d, status=%d\n",
> + wc.opcode, wc.status);
> + break;
> + }
> + }
> +
> + if (ctxt)
> + rdma_stat_sq_prod ++;
> +}
> +
> +void svc_sq_reap(struct svcxprt_rdma *xprt)
> +{
> + sq_cq_reap(xprt);
> +}
> +
> +void svc_rq_reap(struct svcxprt_rdma *xprt)
> +{
> + rq_cq_reap(xprt);
> +}
> +
> +static void
> +sq_comp_handler(struct ib_cq *cq, void *cq_context)
> +{
> + ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
> + sq_cq_reap(cq_context);
> +}
> +
> +static void
> +create_context_cache(struct svcxprt_rdma *xprt,
> + int ctxt_count, int ctxt_bump, int ctxt_max)
> +{
> + struct svc_rdma_op_ctxt *ctxt;
> + int i;
> +
> + xprt->sc_ctxt_max = ctxt_max;
> + xprt->sc_ctxt_bump = ctxt_bump;
> + xprt->sc_ctxt_cnt = 0;
> + xprt->sc_ctxt_head = NULL;
> + for (i=0; i < ctxt_count; i++) {
> + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
> + if (ctxt) {
> + ctxt->next = xprt->sc_ctxt_head;
> + xprt->sc_ctxt_head = ctxt;
> + xprt->sc_ctxt_cnt ++;
> + }
> + }
> +}
> +
> +static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
> +{
> + struct svc_rdma_op_ctxt *next;
> + if (!ctxt)
> + return;
> +
> + do {
> + next = ctxt->next;
> + kfree(ctxt);
> + ctxt = next;
> + } while (next);
> +}
> +
> +static struct svcxprt_rdma *rdma_create_xprt(int listener)
> +{
> + struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
> +
> + if (!cma_xprt)
> + return NULL;
> +
> + INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
> + INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
> + INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
> + init_waitqueue_head(&cma_xprt->sc_send_wait);
> + init_waitqueue_head(&cma_xprt->sc_read_wait);
> +
> + spin_lock_init(&cma_xprt->sc_lock);
> + spin_lock_init(&cma_xprt->sc_read_lock);
> + spin_lock_init(&cma_xprt->sc_ctxt_lock);
> + spin_lock_init(&cma_xprt->sc_rq_dto_lock);
> +
> + cma_xprt->sc_ord = svcrdma_ord;
> +
> + cma_xprt->sc_max_req_size = svcrdma_max_req_size;
> + cma_xprt->sc_max_requests = svcrdma_max_requests;
> + cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
> + atomic_set(&cma_xprt->sc_sq_count,0);
> + atomic_set(&cma_xprt->sc_read_count,0);
> +
> + if (!listener) {
> + int reqs = cma_xprt->sc_max_requests;
> + create_context_cache(cma_xprt,
> + reqs << 1, /* starting size */
> + reqs, /* bump amount */
> + reqs +
> + cma_xprt->sc_sq_depth +
> + RPCRDMA_MAX_THREADS); /* max */
> +
> + if (!cma_xprt->sc_ctxt_head) {
> + kfree(cma_xprt);
> + return NULL;
> + }
> + }
> +
> + return cma_xprt;
> +}
> +
> +static void svc_rdma_put(struct svc_sock *xprt)
> +{
> + struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
> +
> + if (atomic_dec_and_test(&xprt->sk_inuse)) {
> + BUG_ON(! test_bit(SK_DEAD, &xprt->sk_flags));
> +
> + printk("svcrdma: Destroying transport %p, cm_id=%p, "
> + "sk_flags=%lx\n",
> + xprt, rdma->sc_cm_id, xprt->sk_flags);
> +
> + rdma_disconnect(rdma->sc_cm_id);
> + rdma_destroy_id(rdma->sc_cm_id);
> + rdma_destroy_xprt(rdma);
> + }
> +}
> +
> +struct page *svc_rdma_get_page(void)
> +{
> + struct page *page;
> +
> + while ((page = alloc_page(GFP_KERNEL))==NULL) {
> + /* If we can't get memory, wait a bit and try again */
> + printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 jiffies.\n");
> + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
HZ
See comment above about tying up threads. Also note that you are
probably better off using __GFP_NOFAIL instead of the loop.

> + }
> + return page;
> +}
> +
> +int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
> +{
> + struct ib_recv_wr recv_wr, *bad_recv_wr;
> + struct svc_rdma_op_ctxt *ctxt;
> + struct page *page;
> + unsigned long pa;
> + int sge_no;
> + int buflen;
> + int ret;
> +
> + ctxt = svc_rdma_get_context(xprt);
> + buflen = 0;
> + ctxt->direction = DMA_FROM_DEVICE;
> + for (sge_no=0; buflen < xprt->sc_max_req_size; sge_no++) {
> + BUG_ON(sge_no >= xprt->sc_max_sge);
> + page = svc_rdma_get_page();
> + ctxt->pages[sge_no] = page;
> + pa = ib_dma_map_page(xprt->sc_cm_id->device,
> + page, 0, PAGE_SIZE,
> + DMA_FROM_DEVICE);
> + ctxt->sge[sge_no].addr = pa;
> + ctxt->sge[sge_no].length = PAGE_SIZE;
> + ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
> + buflen += PAGE_SIZE;
> + }
> + ctxt->count = sge_no;
> + recv_wr.next = NULL;
> + recv_wr.sg_list = &ctxt->sge[0];
> + recv_wr.num_sge = ctxt->count;
> + recv_wr.wr_id = (u64)(unsigned long)ctxt;
> +
> + ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
> + return ret;
> +}
> +
> +
> +/*
> + * This function handles the CONNECT_REQUEST event on a listening
> + * endpoint. It is passed the cma_id for the _new_ connection. The context in
> + * this cma_id is inherited from the listening cma_id and is the svc_sock
> + * structure for the listening endpoint.
> + *
> + * This function creates a new xprt for the new connection and enqueues it on
> + * the accept queue for the listent xprt. When the listen thread is kicked, it
> + * will call the recvfrom method on the listen xprt which will accept the new
> + * connection.
> + */
> +static void handle_connect_req(struct rdma_cm_id *new_cma_id)
> +{
> + struct svcxprt_rdma *listen_xprt = new_cma_id->context;
> + struct svcxprt_rdma *newxprt;
> +
> + /* Create a new transport */
> + newxprt = rdma_create_xprt(0);
> + if (!newxprt) {
> + dprintk("svcrdma: failed to create new transport\n");
> + return;
> + }
> + newxprt->sc_cm_id = new_cma_id;
> + new_cma_id->context = newxprt;
> + dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
> + newxprt, newxprt->sc_cm_id, listen_xprt);
> +
> + /* Initialize the new transport */
> + newxprt->sc_xprt.sk_server = listen_xprt->sc_xprt.sk_server;
> + newxprt->sc_xprt.sk_lastrecv = get_seconds();
> + newxprt->sc_xprt.sk_delete = svc_rdma_delete;
> + newxprt->sc_xprt.sk_recvfrom = svc_rdma_recvfrom;
> + newxprt->sc_xprt.sk_sendto = svc_rdma_sendto;
> + newxprt->sc_xprt.sk_put = svc_rdma_put;
> + newxprt->sc_xprt.sk_prep_reply_buf = svc_rdma_prep_reply_buf;
> + newxprt->sc_xprt.sk_defer = svc_rdma_defer;
> + newxprt->sc_xprt.sk_revisit = svc_rdma_revisit;
> + newxprt->sc_xprt.sk_pool = NULL;
> +
> + atomic_set(&newxprt->sc_xprt.sk_inuse, 1);
> + set_bit(SK_TEMP, &newxprt->sc_xprt.sk_flags);
> + INIT_LIST_HEAD(&newxprt->sc_xprt.sk_ready);
> + INIT_LIST_HEAD(&newxprt->sc_xprt.sk_list);
> + INIT_LIST_HEAD(&newxprt->sc_xprt.sk_deferred);
> + spin_lock_init(&newxprt->sc_xprt.sk_defer_lock);
> + mutex_init(&newxprt->sc_xprt.sk_mutex);
> +
> + /* Enqueue the new transport on the accept queue of the listening
> + * transport */
> + spin_lock_bh(&listen_xprt->sc_lock);
> + list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
> + spin_unlock_bh(&listen_xprt->sc_lock);

Why do you need bh-safe spinlocks here?

> +
> + listen_xprt->sc_xprt.sk_pool = NULL;
> + set_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
> + svc_sock_enqueue(&listen_xprt->sc_xprt);
> +}
> +
> +/*
> + * Handles events generated on the listening endpoint. These events will be
> + * either be incoming connect requests or adapter removal events.
> + * @param cma_id The CMA ID for the listening endpoint
> + * @event the event being delivered.
> + */
> +static int
> +rdma_listen_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
> +{
> + struct svcxprt_rdma *xprt = cma_id->context;
> + int ret = 0;
> +
> + switch (event->event) {
> + case RDMA_CM_EVENT_CONNECT_REQUEST:
> + dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, event=%d\n",
> + cma_id, cma_id->context, event->event);
> + handle_connect_req(cma_id);
> + break;
> +
> + case RDMA_CM_EVENT_ESTABLISHED:
> + /* Accept complete */
> + dprintk("svcrdma: Connection completed on LISTEN xprt=%p, cm_id=%p\n",
> + xprt, cma_id);
> + break;
> +
> + case RDMA_CM_EVENT_DEVICE_REMOVAL:
> + dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
> + xprt, cma_id);
> + if (xprt)
> + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> + break;
> +
> + default:
> + dprintk("svcrdma: Unexpected event on listening endpoint %p, event=%d\n",
> + cma_id, event->event);
> + break;
> + }
> +
> + return ret;
> +}
> +
> +static int
> +rdma_cma_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
> +{
> + struct svcxprt_rdma *xprt = cma_id->context;
> + int ret = 0;
> +
> + switch (event->event) {
> + case RDMA_CM_EVENT_ESTABLISHED:
> + /* Accept complete */
> + dprintk("svcrdma: Connection completed on DTO xprt=%p, cm_id=%p\n",
> + xprt, cma_id);
> + break;
> +
> + case RDMA_CM_EVENT_DISCONNECTED:
> + dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
> + xprt, cma_id);
> + if (xprt) {
> + xprt->sc_xprt.sk_pool = NULL;
> + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> + svc_sock_enqueue(&xprt->sc_xprt);
> + }
> + break;
> +
> + case RDMA_CM_EVENT_DEVICE_REMOVAL:
> + dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, event=%d\n",
> + cma_id, cma_id->context, event->event);
> + if (xprt) {
> + xprt->sc_xprt.sk_pool = NULL;
> + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> + svc_sock_enqueue(&xprt->sc_xprt);
> + }
> + break;
> +
> + default:
> + dprintk("svcrdma: Unexpected event on DTO endpoint %p, event=%d\n",
> + cma_id, event->event);
> + break;
> + }
> +
> + return ret;
> +}
> +
> +/*
> + * Create a listening RDMA service endpoint
> + * @param serv the RPC service this instance will belong to
> + * @param protocol the protocol for the instance
> + * @param sa the address to bind the local interface to
> + * @return 0 on success, negative value for errors
> + */
> +int svc_rdma_create_listen(struct svc_serv *serv, int protocol,
> + struct sockaddr *sa)
> +{
> + struct rdma_cm_id *listen_id;
> + struct svcxprt_rdma *cma_xprt;
> + struct svc_sock *xprt;
> + int ret;
> +
> + dprintk("svcrdma: Creating RDMA socket\n");
> +
> + cma_xprt = rdma_create_xprt(1);
> + if (!cma_xprt)
> + return -ENOMEM;
> +
> + xprt = &cma_xprt->sc_xprt;
> + xprt->sk_delete = svc_rdma_delete;
> + xprt->sk_recvfrom = svc_rdma_accept;
> + xprt->sk_put = svc_rdma_put;
> + xprt->sk_prep_reply_buf = svc_rdma_prep_reply_buf;
> + xprt->sk_server = serv;
> + xprt->sk_lastrecv = get_seconds();
> + INIT_LIST_HEAD(&xprt->sk_ready);
> + INIT_LIST_HEAD(&xprt->sk_list);
> + INIT_LIST_HEAD(&xprt->sk_deferred);
> + spin_lock_init(&xprt->sk_defer_lock);
> + mutex_init(&xprt->sk_mutex);
> + xprt->sk_pool = NULL;
> + atomic_set(&xprt->sk_inuse, 1);
> + spin_lock_bh(&serv->sv_lock);
> + list_add(&xprt->sk_list, &serv->sv_permsocks);
> + spin_unlock_bh(&serv->sv_lock);
> + clear_bit(SK_BUSY, &xprt->sk_flags);
> +
> + /*
> + * We shouldn't receive any events (except device removal) on
> + * the id until we submit the listen request. Any events that
> + * we do receive will get logged as errors and ignored
> + */
> + listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
> + if (IS_ERR(listen_id)) {
> + ret = PTR_ERR(listen_id);
> + rdma_destroy_xprt(cma_xprt);
> + dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
> + return ret;
> + }
> + ret = rdma_bind_addr(listen_id, sa);
> + if (ret) {
> + ret = PTR_ERR(listen_id);
> + rdma_destroy_xprt(cma_xprt);
> + rdma_destroy_id(listen_id);
> + dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
> + return ret;
> + }
> + cma_xprt->sc_cm_id = listen_id;
> +
> + /* The xprt is ready to process events at this point */
> + ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
> + if (ret) {
> + ret = PTR_ERR(listen_id);
> + rdma_destroy_id(listen_id);
> + rdma_destroy_xprt(cma_xprt);
> + dprintk("svcrdma: rdma_listen failed = %d\n", ret);
> + return ret;
> + }
> +
> + return 0;
> +}
> +
> +/*
> + * This is the sk_recvfrom function for listening endpoints. It's purpose is
> + * to accept incoming connections. The CMA callback handler has already
> + * created a new transport and attached the new CMA ID.
> + *
> + * There is a queue of pending connections hung on the listening
> + * transport. This queue contains the new svc_sock structure. This function
> + * takes svc_sock structures off the accept_q and completes the
> + * connection.
> + */
> +static int
> +svc_rdma_accept(struct svc_rqst *rqstp)
> +{
> + struct svc_sock *xprt = rqstp->rq_sock;
> + struct svcxprt_rdma *listen_xprt;
> + struct svcxprt_rdma *newxprt;
> + struct rdma_conn_param conn_param;
> + struct ib_qp_init_attr qp_attr;
> + struct ib_device_attr devattr;
> + int ret;
> + int i;
> +
> + listen_xprt = (struct svcxprt_rdma*)xprt;
> + if (list_empty(&listen_xprt->sc_accept_q)) {
> + printk(KERN_INFO
> + "svcrdma: woke-up with no pending connection!\n");
> + clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
> + BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
> + clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
> + return 0;
> + }
> +
> + /* Get the next entry off the accept list */
> + spin_lock_bh(&listen_xprt->sc_lock);
> + newxprt = list_entry(listen_xprt->sc_accept_q.next,
> + struct svcxprt_rdma, sc_accept_q);
> + list_del_init(&newxprt->sc_accept_q);
> + spin_unlock_bh(&listen_xprt->sc_lock);
> +
> + dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
> + newxprt, newxprt->sc_cm_id);
> +
> + ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
> + if (ret) {
> + printk(KERN_ERR
> + "svcrdma: could not query device attributes on "
> + "device %p, rc=%d\n",
> + newxprt->sc_cm_id->device, ret);
> + goto errout;
> + }
> +
> + /* Qualify the transport resource defaults with the
> + * capabilities of this particular device */
> + newxprt->sc_max_sge = min((size_t)devattr.max_sge,
> + (size_t)RPCSVC_MAXPAGES);
> + newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
> + (size_t)svcrdma_max_requests);
> + newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
> +
> + newxprt->sc_ord = min((size_t)devattr.max_qp_rd_atom,
> + (size_t)svcrdma_ord);
> + spin_lock_bh(&rqstp->rq_server->sv_lock);
> + list_add(&newxprt->sc_xprt.sk_list, &rqstp->rq_server->sv_tempsocks);
> + rqstp->rq_server->sv_tmpcnt ++;
> + spin_unlock_bh(&rqstp->rq_server->sv_lock);
> +
> + newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
> + if (IS_ERR(newxprt->sc_pd)) {
> + printk(KERN_ERR
> + "svcrdma: error creating PD for connect request\n");
> + ret = PTR_ERR(newxprt->sc_pd);
> + goto errout;
> + }
> + newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
> + sq_comp_handler,
> + cq_event_handler,
> + newxprt,
> + newxprt->sc_sq_depth);
> + if (IS_ERR(newxprt->sc_sq_cq)) {
> + printk(KERN_ERR
> + "svcrdma: error creating SQ CQ for connect request\n");
> + ret = PTR_ERR(newxprt->sc_sq_cq);
> + goto errout;
> + }
> + newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
> + rq_comp_handler,
> + cq_event_handler,
> + newxprt,
> + newxprt->sc_max_requests);
> + if (IS_ERR(newxprt->sc_rq_cq)) {
> + printk(KERN_ERR
> + "svcrdma: error creating RQ CQ for connect request\n");
> + ret = PTR_ERR(newxprt->sc_rq_cq);
> + goto errout;
> + }
> +
> + memset(&qp_attr, 0, sizeof qp_attr);
> + qp_attr.event_handler = qp_event_handler;
> + qp_attr.qp_context = newxprt;
> + qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
> + qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
> + qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
> + qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
> + qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
> + qp_attr.qp_type = IB_QPT_RC;
> + qp_attr.send_cq = newxprt->sc_sq_cq;
> + qp_attr.recv_cq = newxprt->sc_rq_cq;
> + printk("newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
> + "cm_id->device=%p, sc_pd->device=%p\n"
> + "qp_attr.cap.max_send_wr = %d\n"
> + "qp_attr.cap.max_recv_wr = %d\n"
> + "qp_attr.cap.max_send_sge = %d\n"
> + "qp_attr.cap.max_recv_sge = %d\n",
> + newxprt->sc_cm_id, newxprt->sc_pd,
> + newxprt->sc_cm_id->device, newxprt->sc_pd->device,
> + qp_attr.cap.max_send_wr,
> + qp_attr.cap.max_recv_wr,
> + qp_attr.cap.max_send_sge,
> + qp_attr.cap.max_recv_sge);
> +
> + ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
> + if (ret) {
> + /*
> + * XXX: This is a hack. We need a xx_request_qp interface
> + * that will adjust the qp_attr's with a best-effort
> + * number
> + */
> + qp_attr.cap.max_send_sge -= 2;
> + qp_attr.cap.max_recv_sge -= 2;
> + ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
> + if (ret) {
> + printk(KERN_ERR "svcrdma: failed to create QP, ret=%d\n", ret);
> + goto errout;
> + }
> + newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
> + newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
> + newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
> + newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
> + }
> + newxprt->sc_qp = newxprt->sc_cm_id->qp;
> + DBG_DUMP_QP(__FUNCTION__, newxprt->sc_qp, &qp_attr);
> +
> + /* Register all of physical memory */
> + newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd,
> + IB_ACCESS_LOCAL_WRITE |
> + IB_ACCESS_REMOTE_WRITE);
> + if (IS_ERR(newxprt->sc_phys_mr)) {
> + ret = PTR_ERR(newxprt->sc_phys_mr);
> + printk(KERN_ERR
> + "svcrdma: Failed to create DMA MR ret=%d\n", ret);
> + goto errout;
> + }
> +
> + /* Post receive buffers */
> + for (i=0; i < newxprt->sc_max_requests; i++)
> + if ((ret = svc_rdma_post_recv(newxprt))) {
> + printk(KERN_ERR
> + "svcrdma: failure posting receive buffers\n");
> + goto errout;
> + }
> +
> + /* Swap out the handler */
> + newxprt->sc_cm_id->event_handler = rdma_cma_handler;
> +
> + /* We will get a getattr request from the client before we see
> + * the connect complete event because DTO's run on tasklets,
> + * and connection events run on threads
> + */
> + clear_bit(SK_BUSY, &newxprt->sc_xprt.sk_flags);
> +
> + /* Accept Connection */
> + memset(&conn_param, 0, sizeof conn_param);
> + conn_param.responder_resources = 0;
> + conn_param.initiator_depth = newxprt->sc_ord;
> + ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
> + if (ret) {
> + printk(KERN_ERR
> + "svcrdma: failed to accept new connection, ret=%d\n",
> + ret);
> + goto errout;
> + }
> +
> + printk("svcrdma: new connection %p accepted with the following "
> + "attributes:\n"
> + "\tlocal_ip : %d.%d.%d.%d\n"
> + "\tlocal_port : %d\n"
> + "\tremote_ip : %d.%d.%d.%d\n"
> + "\tremote_port : %d\n"
> + "\tmax_sge : %d\n"
> + "\tsq_depth : %d\n"
> + "\tmax_requests : %d\n"
> + "\tread throttle : %s\n"
> + "\tord : %d\n",
> + newxprt,
> + NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
> + route.addr.src_addr)->sin_addr.s_addr),
> + ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
> + route.addr.src_addr)->sin_port),
> + NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
> + route.addr.dst_addr)->sin_addr.s_addr),
> + ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
> + route.addr.dst_addr)->sin_port),
> + newxprt->sc_max_sge,
> + newxprt->sc_sq_depth,
> + newxprt->sc_max_requests,
> + (svcrdma_read_throttle?"TRUE":"FALSE"),
> + newxprt->sc_ord);
> +
> + spin_lock_bh(&listen_xprt->sc_lock);
> + if (list_empty(&listen_xprt->sc_accept_q))
> + clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
> + spin_unlock_bh(&listen_xprt->sc_lock);
> + listen_xprt->sc_xprt.sk_pool = NULL;
> + BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
> + clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
> + svc_sock_enqueue(&listen_xprt->sc_xprt);
> +
> + ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
> + ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
> + return ret;
> +
> + errout:
> + printk(KERN_ERR "svcrdma: failure accepting new connection rc=%d.\n",
> + ret);
> + BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
> + clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
> + clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
> + rdma_destroy_id(newxprt->sc_cm_id);
> + rdma_destroy_xprt(newxprt);
> + return 0; /* ret; */
> +}
> +
> +static void svc_rdma_delete(struct svc_sock *xprt)
> +{
> + struct svc_serv *serv = xprt->sk_server;
> +
> + spin_lock_bh(&serv->sv_lock);
> + if (!test_and_set_bit(SK_DETACHED, &xprt->sk_flags))
> + list_del_init(&xprt->sk_list);
> +
> + if (!test_and_set_bit(SK_DEAD, &xprt->sk_flags)) {
> + BUG_ON(atomic_read(&xprt->sk_inuse)<2);
> + atomic_dec(&xprt->sk_inuse);
> + if (test_bit(SK_TEMP, &xprt->sk_flags))
> + serv->sv_tmpcnt--;
> + }
> + spin_unlock_bh(&serv->sv_lock);
> +}
> +
> +static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
> +{
> + if (xprt->sc_qp)
> + ib_destroy_qp(xprt->sc_qp);
> +
> + if (xprt->sc_sq_cq)
> + ib_destroy_cq(xprt->sc_sq_cq);
> +
> + if (xprt->sc_rq_cq)
> + ib_destroy_cq(xprt->sc_rq_cq);
> +
> + if (xprt->sc_pd)
> + ib_dealloc_pd(xprt->sc_pd);
> +
> + destroy_context_cache(xprt->sc_ctxt_head);
> +
> + if (xprt->sc_xprt.sk_info_authunix != NULL)
> + svcauth_unix_info_release(xprt->sc_xprt.sk_info_authunix);
> +
> + kfree(xprt);
> +}
> +
> +int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
> +{
> + struct ib_send_wr *bad_wr;
> + int ret;
> +
> + if (test_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags))
> + return 0;
> +
> + BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
> + BUG_ON(((struct svc_rdma_op_ctxt*)(unsigned long)wr->wr_id)->wr_op !=
> + wr->opcode);
> + /* If the SQ is full, wait until an SQ entry is available */
> + while (1) {
> + spin_lock_bh(&xprt->sc_lock);
> + if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
> + spin_unlock_bh(&xprt->sc_lock);
> + rdma_stat_sq_starve ++;
> + /* First see if we can opportunistically reap some SQ WR */
> + sq_cq_reap(xprt);
> +
> + /* Wait until SQ WR available if SQ still full*/
> + wait_event(xprt->sc_send_wait,
> + atomic_read(&xprt->sc_sq_count) < xprt->sc_sq_depth);
> + continue;
> + }
> + /* Bumped used SQ WR count and post */
> + ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
> + if (!ret)
> + atomic_inc(&xprt->sc_sq_count);
> + else {
> + printk(KERN_ERR "svcrdma: failed to post SQ WR rc=%d, "
> + "sc_sq_count=%d, sc_sq_depth=%d\n",
> + ret, atomic_read(&xprt->sc_sq_count),
> + xprt->sc_sq_depth);
> + }
> + spin_unlock_bh(&xprt->sc_lock);
> + break;
> + }
> +
> + return ret;
> +}
> +
> +int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
> + enum rpcrdma_errcode err)
> +{
> + struct ib_send_wr err_wr;
> + struct ib_sge sge;
> + struct page *p;
> + struct svc_rdma_op_ctxt *ctxt;
> + u32 *va;
> + int length;
> + int ret;
> +
> + p = svc_rdma_get_page();
> + va = page_address(p);
> +
> + /* XDR encode error */
> + length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
> +
> + /* Prepare SGE for local address */
> + sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
> + p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
> + sge.lkey = xprt->sc_phys_mr->lkey;
> + sge.length = length;
> +
> + ctxt = svc_rdma_get_context(xprt);
> + ctxt->count = 1;
> + ctxt->pages[0] = p;
> +
> + /* Prepare SEND WR */
> + memset(&err_wr, 0, sizeof err_wr);
> + ctxt->wr_op = IB_WR_SEND;
> + err_wr.wr_id = (unsigned long)ctxt;
> + err_wr.sg_list = &sge;
> + err_wr.num_sge = 1;
> + err_wr.opcode = IB_WR_SEND;
> + err_wr.send_flags = IB_SEND_SIGNALED;
> +
> + /* Post It */
> + ret = svc_rdma_send(xprt, &err_wr);
> + if (ret) {
> + dprintk("svcrdma: Error posting send = %d\n", ret);
> + svc_rdma_put_context(ctxt,1);
> + }
> +
> + return ret;
> +}
> +
> +/*
> + * Setup the reply buffer for the svc_process function to write the
> + * RPC into.
> + */
> +static int svc_rdma_prep_reply_buf(struct svc_rqst *rqstp)
> +{
> + struct kvec *resv = &rqstp->rq_res.head[0];
> +
> + /* setup response xdr_buf.
> + * Initially it has just one page
> + */
> + rqstp->rq_resused = 1;
> + resv->iov_base = page_address(rqstp->rq_respages[0]);
> + resv->iov_len = 0;
> + rqstp->rq_res.pages = rqstp->rq_respages+1;
> + rqstp->rq_res.len = 0;
> + rqstp->rq_res.page_base = 0;
> + rqstp->rq_res.page_len = 0;
> + rqstp->rq_res.buflen = PAGE_SIZE;
> + rqstp->rq_res.tail[0].iov_base = NULL;
> + rqstp->rq_res.tail[0].iov_len = 0;
> +
> + return 0;
> +}
> +
> +/*
> + * This request cannot be handled right now. Allocate a structure to
> + * keep it's state pending completion processing. To accomplish this, the
> + * function creates an svc_rdma_op_ctxt that looks like a receive completion and
> + * enqueues it on the svc_sock's deferred request list. When*
> + * svc_rdma_recvfrom is subsequently called, it first checks if there is a
> + * deferred RPC and if there is:
> + * - Takes the deferred request off the deferred request queue
> + * - Extracts the svc_rdma_op_ctxt from the deferred request structure
> + * - Frees the deferred request structure
> + * - Skips the ib_cq_poll call and processes the svc_rdma_op_ctxt as if it had
> + * just come out of an WR pulled from the CQ.
> + */
> +static struct cache_deferred_req *
> +svc_rdma_defer(struct cache_req *req)
> +{
> + struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
> + struct svcxprt_rdma *xprt;
> + struct svc_rdma_deferred_req *dr;
> +
> + dprintk("svcrdma: deferring request on \n"
> + " rqstp=%p\n"
> + " rqstp->rq_arg.len=%d\n",
> + rqstp,
> + rqstp->rq_arg.len);
> +
> + /* if more than a page, give up FIXME */
> + if (rqstp->rq_arg.page_len)
> + return NULL;
> + BUG_ON(rqstp->rq_deferred);
> + xprt = (struct svcxprt_rdma*)rqstp->rq_sock;
> + retry:
> + dr = kmalloc(sizeof(struct svc_rdma_deferred_req), GFP_KERNEL);
> + if (!dr) {
> + printk(KERN_INFO "svcrdma: sleeping waiting for memory\n");
> + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
> + goto retry;
> + }
> + dr->req.handle.owner = rqstp->rq_server;
> + dr->req.prot = rqstp->rq_prot;
> + dr->req.addr = rqstp->rq_addr;
> + dr->req.daddr = rqstp->rq_daddr;
> + dr->req.argslen = rqstp->rq_arg.len >> 2;
> + dr->arg_page = rqstp->rq_pages[0];
> + dr->arg_len = rqstp->rq_arg.len;
> + rqstp->rq_pages[0] = svc_rdma_get_page();
> +
> + atomic_inc(&rqstp->rq_sock->sk_inuse);
> + dr->req.svsk = rqstp->rq_sock;
> + dr->req.handle.revisit = rqstp->rq_sock->sk_revisit;
> +
> + return &dr->req.handle;
> +}
> +
> +/*
> + * This is called by the cache code when it either gets an answer from
> + * a user-mode daemon or gives up...as indicated by 'too_many'
> + */
> +static void svc_rdma_revisit(struct cache_deferred_req *dreq, int too_many)
> +{
> + struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle);
> + struct svc_serv *serv = dreq->owner;
> + struct svc_sock *svsk;
> +
> + if (unlikely(too_many)) {
> + printk(KERN_INFO "svcrdma: giving up on deferred request "
> + "on svc_sock=%p, too many outstanding\n", dr->svsk);
> + dr->svsk->sk_put(dr->svsk);
> + kfree(dr);
> + return;
> + }
> + svsk = dr->svsk;
> + dprintk("svcrdma: revisit deferred RPC on xprt=%p\n", svsk);
> + dr->svsk = NULL;
> + spin_lock_bh(&serv->sv_lock);
> + list_add(&dr->handle.recent, &svsk->sk_deferred);
> + spin_unlock_bh(&serv->sv_lock);
> + svsk->sk_pool = NULL;
> + set_bit(SK_DEFERRED, &svsk->sk_flags);
> + svc_sock_enqueue(svsk);
> + svsk->sk_put(svsk);
> +}
> +
>
>
> -------------------------------------------------------------------------
> This SF.net email is sponsored by DB2 Express
> Download DB2 Express C - the FREE version of DB2 express and take
> control of your XML. No limits. Just data. Click to get it now.
> http://sourceforge.net/powerbar/db2/
> _______________________________________________
> NFS maillist - [email protected]
> https://lists.sourceforge.net/lists/listinfo/nfs


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-18 19:24:44

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Fri, May 18, 2007 at 12:45:52PM -0500, Tom Tucker wrote:
> + printk("svcrdma: new connection %p accepted with the following "
> + "attributes:\n"
> + "\tlocal_ip : %d.%d.%d.%d\n"
> + "\tlocal_port : %d\n"
> + "\tremote_ip : %d.%d.%d.%d\n"
> + "\tremote_port : %d\n"
> + "\tmax_sge : %d\n"
> + "\tsq_depth : %d\n"
> + "\tmax_requests : %d\n"
> + "\tread throttle : %s\n"
> + "\tord : %d\n",
> + newxprt,
> + NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
> + route.addr.src_addr)->sin_addr.s_addr),
> + ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
> + route.addr.src_addr)->sin_port),
> + NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
> + route.addr.dst_addr)->sin_addr.s_addr),
> + ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
> + route.addr.dst_addr)->sin_port),
> + newxprt->sc_max_sge,
> + newxprt->sc_sq_depth,
> + newxprt->sc_max_requests,
> + (svcrdma_read_throttle?"TRUE":"FALSE"),
> + newxprt->sc_ord);

I assume these massive printk's are meant to be dprintk's (if they're
meant to be left in at all).

> +/*
> + * Setup the reply buffer for the svc_process function to write the
> + * RPC into.
> + */
> +static int svc_rdma_prep_reply_buf(struct svc_rqst *rqstp)
> +{
> + struct kvec *resv = &rqstp->rq_res.head[0];
> +
> + /* setup response xdr_buf.
> + * Initially it has just one page
> + */
> + rqstp->rq_resused = 1;
> + resv->iov_base = page_address(rqstp->rq_respages[0]);
> + resv->iov_len = 0;
> + rqstp->rq_res.pages = rqstp->rq_respages+1;
> + rqstp->rq_res.len = 0;
> + rqstp->rq_res.page_base = 0;
> + rqstp->rq_res.page_len = 0;
> + rqstp->rq_res.buflen = PAGE_SIZE;
> + rqstp->rq_res.tail[0].iov_base = NULL;
> + rqstp->rq_res.tail[0].iov_len = 0;
> +
> + return 0;
> +}

I think this is a repeat of a question from yesterday--I'm not sure why
we bothered to pull this out into separate function when the
implementations all end up being identical, except for the one line for
the record length in the tcp case?

> +
> +/*
> + * This request cannot be handled right now. Allocate a structure to
> + * keep it's state pending completion processing. To accomplish this, the
> + * function creates an svc_rdma_op_ctxt that looks like a receive completion and
> + * enqueues it on the svc_sock's deferred request list. When*
> + * svc_rdma_recvfrom is subsequently called, it first checks if there is a
> + * deferred RPC and if there is:
> + * - Takes the deferred request off the deferred request queue
> + * - Extracts the svc_rdma_op_ctxt from the deferred request structure
> + * - Frees the deferred request structure
> + * - Skips the ib_cq_poll call and processes the svc_rdma_op_ctxt as if it had
> + * just come out of an WR pulled from the CQ.
> + */
> +static struct cache_deferred_req *
> +svc_rdma_defer(struct cache_req *req)
> +{
> + struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
> + struct svcxprt_rdma *xprt;
> + struct svc_rdma_deferred_req *dr;
> +
> + dprintk("svcrdma: deferring request on \n"
> + " rqstp=%p\n"
> + " rqstp->rq_arg.len=%d\n",
> + rqstp,
> + rqstp->rq_arg.len);
> +
> + /* if more than a page, give up FIXME */
> + if (rqstp->rq_arg.page_len)
> + return NULL;
> + BUG_ON(rqstp->rq_deferred);
> + xprt = (struct svcxprt_rdma*)rqstp->rq_sock;
> + retry:
> + dr = kmalloc(sizeof(struct svc_rdma_deferred_req), GFP_KERNEL);
> + if (!dr) {
> + printk(KERN_INFO "svcrdma: sleeping waiting for memory\n");
> + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
> + goto retry;
> + }

Why not return NULL, as svc_defer() does?

--b.

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-18 19:36:32

by Tom Tucker

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Fri, 2007-05-18 at 15:24 -0400, J. Bruce Fields wrote:
> On Fri, May 18, 2007 at 12:45:52PM -0500, Tom Tucker wrote:
> > + printk("svcrdma: new connection %p accepted with the following "
> > + "attributes:\n"
> > + "\tlocal_ip : %d.%d.%d.%d\n"
> > + "\tlocal_port : %d\n"
> > + "\tremote_ip : %d.%d.%d.%d\n"
> > + "\tremote_port : %d\n"
> > + "\tmax_sge : %d\n"
> > + "\tsq_depth : %d\n"
> > + "\tmax_requests : %d\n"
> > + "\tread throttle : %s\n"
> > + "\tord : %d\n",
> > + newxprt,
> > + NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
> > + route.addr.src_addr)->sin_addr.s_addr),
> > + ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
> > + route.addr.src_addr)->sin_port),
> > + NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
> > + route.addr.dst_addr)->sin_addr.s_addr),
> > + ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
> > + route.addr.dst_addr)->sin_port),
> > + newxprt->sc_max_sge,
> > + newxprt->sc_sq_depth,
> > + newxprt->sc_max_requests,
> > + (svcrdma_read_throttle?"TRUE":"FALSE"),
> > + newxprt->sc_ord);
>
> I assume these massive printk's are meant to be dprintk's (if they're
> meant to be left in at all).

It shows up once per mount. I thought it was useful, but perhaps not...

>
> > +/*
> > + * Setup the reply buffer for the svc_process function to write the
> > + * RPC into.
> > + */
> > +static int svc_rdma_prep_reply_buf(struct svc_rqst *rqstp)
> > +{
> > + struct kvec *resv = &rqstp->rq_res.head[0];
> > +
> > + /* setup response xdr_buf.
> > + * Initially it has just one page
> > + */
> > + rqstp->rq_resused = 1;
> > + resv->iov_base = page_address(rqstp->rq_respages[0]);
> > + resv->iov_len = 0;
> > + rqstp->rq_res.pages = rqstp->rq_respages+1;
> > + rqstp->rq_res.len = 0;
> > + rqstp->rq_res.page_base = 0;
> > + rqstp->rq_res.page_len = 0;
> > + rqstp->rq_res.buflen = PAGE_SIZE;
> > + rqstp->rq_res.tail[0].iov_base = NULL;
> > + rqstp->rq_res.tail[0].iov_len = 0;
> > +
> > + return 0;
> > +}
>
> I think this is a repeat of a question from yesterday--I'm not sure why
> we bothered to pull this out into separate function when the
> implementations all end up being identical, except for the one line for
> the record length in the tcp case?
>

Yeah, you're right. Maybe it shouldn't be prepare_buffer, but rather
"prepare_header". Then you'd keep the buffer code common and just have
the one line function for the TCP record len.


> > +
> > +/*
> > + * This request cannot be handled right now. Allocate a structure to
> > + * keep it's state pending completion processing. To accomplish this, the
> > + * function creates an svc_rdma_op_ctxt that looks like a receive completion and
> > + * enqueues it on the svc_sock's deferred request list. When*
> > + * svc_rdma_recvfrom is subsequently called, it first checks if there is a
> > + * deferred RPC and if there is:
> > + * - Takes the deferred request off the deferred request queue
> > + * - Extracts the svc_rdma_op_ctxt from the deferred request structure
> > + * - Frees the deferred request structure
> > + * - Skips the ib_cq_poll call and processes the svc_rdma_op_ctxt as if it had
> > + * just come out of an WR pulled from the CQ.
> > + */
> > +static struct cache_deferred_req *
> > +svc_rdma_defer(struct cache_req *req)
> > +{
> > + struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
> > + struct svcxprt_rdma *xprt;
> > + struct svc_rdma_deferred_req *dr;
> > +
> > + dprintk("svcrdma: deferring request on \n"
> > + " rqstp=%p\n"
> > + " rqstp->rq_arg.len=%d\n",
> > + rqstp,
> > + rqstp->rq_arg.len);
> > +
> > + /* if more than a page, give up FIXME */
> > + if (rqstp->rq_arg.page_len)
> > + return NULL;
> > + BUG_ON(rqstp->rq_deferred);
> > + xprt = (struct svcxprt_rdma*)rqstp->rq_sock;
> > + retry:
> > + dr = kmalloc(sizeof(struct svc_rdma_deferred_req), GFP_KERNEL);
> > + if (!dr) {
> > + printk(KERN_INFO "svcrdma: sleeping waiting for memory\n");
> > + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
> > + goto retry;
> > + }
>
> Why not return NULL, as svc_defer() does?
>

I think you're right. I'll fix this.


> --b.


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-18 19:42:34

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Fri, May 18, 2007 at 02:36:28PM -0500, Tom Tucker wrote:
> On Fri, 2007-05-18 at 15:24 -0400, J. Bruce Fields wrote:
> > I assume these massive printk's are meant to be dprintk's (if they're
> > meant to be left in at all).
>
> It shows up once per mount. I thought it was useful, but perhaps not...

Especially for a server with a few clients, it'd be a lot of noise in
the logs, for something that's a normal occurrence.

But if you think it'd be helpful for debugging, then leave it as a
dprintk....

--b.

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-18 20:07:48

by Tom Tucker

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Fri, 2007-05-18 at 15:07 -0400, Trond Myklebust wrote:
[...snip...]
> + xprt->sc_ctxt_max);
> > +
> > + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
>
> Why do you need an irqsafe spinlock to protect this list?
>

The svc_rdma_put_context function is called from interrupt context. This
function adds free contexts back to this list.

> > + while (xprt->sc_ctxt_cnt < target) {
> > + xprt->sc_ctxt_cnt ++;
> > + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> > +
> > + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
> > +
> > + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
>
> You've now dropped the spinlock. How can you know that the condition
> xprt->sc_ctxt_cnt <= target is still valid?
>

I increment the sc_ctxt_cnt with the lock held. If I'm at the limit, the
competing thread will find it equal -- correct?

> > + if (ctxt) {
> > + at_least_one = 1;
> > + ctxt->next = xprt->sc_ctxt_head;
> > + xprt->sc_ctxt_head = ctxt;
> > + } else {
> > + /* kmalloc failed...give up for now */
> > + xprt->sc_ctxt_cnt --;
> > + break;
> > + }
> > + }
> > + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> > +
> > + return at_least_one;
> > +}
> > +
> > +struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
> > +{
> > + struct svc_rdma_op_ctxt *ctxt;
> > + unsigned long flags;
> > +
> > + while (1) {
> > + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
> > + if (unlikely(xprt->sc_ctxt_head == NULL)) {
> > + /* Try to bump my cache. */
> > + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> > +
> > + if (rdma_bump_context_cache(xprt))
> > + continue;
> > +
> > + printk(KERN_INFO "svcrdma: sleeping waiting for context "
> > + "memory on xprt=%p\n",
> > + xprt);
> > + schedule_timeout_uninterruptible(msecs_to_jiffies(500));
>
> (HZ >> 1)
> However this is rather naughty: you are tying up an nfsd thread that
> could be put to doing useful work elsewhere.
>

True, it could be processing requests for another client.

Here was my dilemma. I can actually predict what the worst case number
is so that I'm guaranteed not to fail. The problem is that this is a
very big number and I wanted to have a footprint driven by load as
opposed to worst-case. This led to the code above.

As an intermediate approach, I believe that I need about 4 contexts to
be guaranteed I can process a request. I could attempt to acquire them
all (at the top of svc_rdma_recvfrom) and cache them in the rqstp
structure. If I couldn't get them, I would just return 0 (EAGAIN). This
would be marginally wasteful on a given request, but would avoid ever
sleeping for a context.

Comments?

> > + continue;
> > + }
> > + ctxt = xprt->sc_ctxt_head;
> > + xprt->sc_ctxt_head = ctxt->next;
> > + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> > + ctxt->xprt = xprt;
> > + INIT_LIST_HEAD(&ctxt->dto_q);
> > + break;
> > + }
> > + ctxt->count = 0;
> > + return ctxt;
> > +}
> > +
> > +void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
> > +{
> > + unsigned long flags;
> > + struct svcxprt_rdma *xprt;
> > + int i;
> > +
> > + BUG_ON(!ctxt);
> > + xprt = ctxt->xprt;
> > + if (free_pages) {
> > + for (i=0; i < ctxt->count; i++)
> > + put_page(ctxt->pages[i]);
> > + }
> > +
> > + for (i=0; i < ctxt->count; i++) {
> > + dma_unmap_single(xprt->sc_cm_id->device->dma_device,
> > + ctxt->sge[i].addr,
> > + ctxt->sge[i].length,
> > + ctxt->direction);
> > + }
> > + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
> > + ctxt->next = xprt->sc_ctxt_head;
> > + xprt->sc_ctxt_head = ctxt;
> > + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> > +}
> > +
> > +/* ib_cq event handler */
> > +static void cq_event_handler(struct ib_event *event, void *context)
> > +{
> > + struct svcxprt_rdma *xprt = (struct svcxprt_rdma *)context;
> > + printk(KERN_INFO "svcrdma: received CQ event id=%d, context=%p\n",
> > + event->event, context);
> > + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> > +}
> > +
> > +/* QP event handler */
> > +static void qp_event_handler(struct ib_event *event, void *context)
> > +{
> > + struct svcxprt_rdma *xprt = context;
> > +
> > + switch (event->event) {
> > + /* These are considered benign events */
> > + case IB_EVENT_PATH_MIG:
> > + case IB_EVENT_COMM_EST:
> > + case IB_EVENT_SQ_DRAINED:
> > + case IB_EVENT_QP_LAST_WQE_REACHED:
> > + printk(KERN_INFO "svcrdma: QP event %d received for QP=%p\n",
> > + event->event, event->element.qp);
> > + break;
> > + /* These are considered fatal events */
> > + case IB_EVENT_PATH_MIG_ERR:
> > + case IB_EVENT_QP_FATAL:
> > + case IB_EVENT_QP_REQ_ERR:
> > + case IB_EVENT_QP_ACCESS_ERR:
> > + case IB_EVENT_DEVICE_FATAL:
> > + default:
> > + printk(KERN_ERR "svcrdma: QP ERROR event %d received for QP=%p, "
> > + "closing transport\n",
> > + event->event, event->element.qp);
> > + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> > + break;
> > + }
> > +}
> > +
> > +/*
> > + * Data Transfer Operation Tasklet
> > + *
> > + * Walks a list of transports with I/O pending, removing entries as
> > + * they are added to the server's I/O pending list.
> > + */
> > +static void dto_tasklet_func(unsigned long data)
> > +{
> > + struct svcxprt_rdma *xprt;
> > + unsigned long flags;
> > +
> > + spin_lock_irqsave(&dto_lock, flags);
> > + while (!list_empty(&dto_xprt_q)) {
> > + xprt = list_entry(dto_xprt_q.next, struct svcxprt_rdma, sc_dto_q);
> > + list_del_init(&xprt->sc_dto_q);
> > + spin_unlock_irqrestore(&dto_lock, flags);
> > + if (0==test_bit(SK_DEAD, &xprt->sc_xprt.sk_flags)) {
> > + /* Serialize with svc_rdma_recvfrom which will also
> > + * enqueue the transport
> > + */
> > + set_bit(SK_DATA, &xprt->sc_xprt.sk_flags);
> > + svc_sock_enqueue(&xprt->sc_xprt);
> > + }
> > + spin_lock_irqsave(&dto_lock, flags);
> > + }
> > + spin_unlock_irqrestore(&dto_lock, flags);
> > +}
> > +
> > +/*
> > + * rq_cq_reap - Process the RQ CQ.
> > + *
> > + * Take all completing WC off the CQE and enqueue the associated DTO context
> > + * on the dto_q for the transport.
> > + */
> > +static void
> > +rq_cq_reap(struct svcxprt_rdma *xprt)
> > +{
> > + int ret;
> > + struct ib_wc wc;
> > + struct svc_rdma_op_ctxt *ctxt = NULL;
> > + unsigned long flags;
> > +
> > + rdma_stat_rq_poll ++;
>
> Is this a global variable? How are you ensuring atomicity above?
>

It's global and it's really debug code. It should be removed along with
a few of the others. The remaining ones should be atomic_inc.

> > +
> > + while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
> > + ctxt = (struct svc_rdma_op_ctxt*)(unsigned long)wc.wr_id;
> > + ctxt->wc_status = wc.status;
> > + ctxt->byte_len = wc.byte_len;
> > + if (wc.status != IB_WC_SUCCESS) {
> > + DBG_DUMP_WC(__FUNCTION__, &wc);
> > + /* Close the transport */
> > + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> > + svc_rdma_put_context(ctxt, 1);
> > + continue;
> > + }
> > + spin_lock_irqsave(&xprt->sc_rq_dto_lock, flags);
> > + list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
> > + spin_unlock_irqrestore(&xprt->sc_rq_dto_lock, flags);
> > + }
> > +
> > + if (ctxt)
> > + rdma_stat_rq_prod ++;
> > +}
> > +
> > +/*
> > + * Receive Queue Completion Handler - potentially called on interrupt context.
> > + *
> > + * svc_sock_enqueue and the remainder of the svc core assumes
> > + * uses _bh locks. Since the rq_comp_handler is called on interrupt
> > + * context, we need to refer the handling of the I/O to a tasklet
> > + */
> > +static void
> > +rq_comp_handler(struct ib_cq *cq, void *cq_context)
> > +{
> > + struct svcxprt_rdma *xprt = cq_context;
> > + unsigned long flags;
> > +
> > + ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
> > + rq_cq_reap(xprt);
> > +
> > + /*
> > + * If this transport is not already on the DTO transport queue,
> > + * add it
> > + */
> > + spin_lock_irqsave(&dto_lock, flags);
> > + if (list_empty(&xprt->sc_dto_q))
> > + list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
> > + spin_unlock_irqrestore(&dto_lock, flags);
> > + tasklet_schedule(&dto_tasklet);
> > +}
> > +
> > +/*
> > + * Send Queue Completion Handler - potentially called on interrupt context.
> > + *
> > + * - Purges the CQ
> > + * - Wakes up threads waiting on SQ WR space
> > + * - Wakes up threads waiting on the ORD throttle
> > + * - Wakes up threads waiting for an RDMA_READ to complete.
> > + */
> > +static void
> > +sq_cq_reap(struct svcxprt_rdma *xprt)
> > +{
> > + struct svc_rdma_op_ctxt *ctxt = NULL;
> > + struct ib_wc wc;
> > + struct ib_cq *cq = xprt->sc_sq_cq;
> > + int ret;
> > +
> > + rdma_stat_sq_poll ++;
> > +
> > + while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
> > + ctxt = (struct svc_rdma_op_ctxt*)(unsigned long)wc.wr_id;
> > + xprt = ctxt->xprt;
> > +
> > + if (wc.status != IB_WC_SUCCESS) {
> > + /* Close the transport */
> > + DBG_DUMP_WC(__FUNCTION__, &wc);
> > + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> > + }
> > +
> > + /* Decrement used SQ WR count */
> > + atomic_dec(&xprt->sc_sq_count);
> > + wake_up(&xprt->sc_send_wait);
> > +
> > + switch (ctxt->wr_op) {
> > + case IB_WR_SEND:
> > + case IB_WR_RDMA_WRITE:
> > + svc_rdma_put_context(ctxt,1);
> > + break;
> > +
> > + case IB_WR_RDMA_READ:
> > + if (svcrdma_read_throttle) {
> > + atomic_dec(&xprt->sc_read_count);
> > + wake_up(&xprt->sc_read_wait);
> > + }
> > + /*
> > + * Set the the RDMA_DONE flag in the context and
> > + * wakeup any waiters.
> > + */
> > + set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
> > + wake_up(&ctxt->read_wait);
> > + break;
> > +
> > + default:
> > + printk(KERN_ERR "svcrdma: unexpected completion type, "
> > + "opcode=%d, status=%d\n",
> > + wc.opcode, wc.status);
> > + break;
> > + }
> > + }
> > +
> > + if (ctxt)
> > + rdma_stat_sq_prod ++;
> > +}
> > +
> > +void svc_sq_reap(struct svcxprt_rdma *xprt)
> > +{
> > + sq_cq_reap(xprt);
> > +}
> > +
> > +void svc_rq_reap(struct svcxprt_rdma *xprt)
> > +{
> > + rq_cq_reap(xprt);
> > +}
> > +
> > +static void
> > +sq_comp_handler(struct ib_cq *cq, void *cq_context)
> > +{
> > + ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
> > + sq_cq_reap(cq_context);
> > +}
> > +
> > +static void
> > +create_context_cache(struct svcxprt_rdma *xprt,
> > + int ctxt_count, int ctxt_bump, int ctxt_max)
> > +{
> > + struct svc_rdma_op_ctxt *ctxt;
> > + int i;
> > +
> > + xprt->sc_ctxt_max = ctxt_max;
> > + xprt->sc_ctxt_bump = ctxt_bump;
> > + xprt->sc_ctxt_cnt = 0;
> > + xprt->sc_ctxt_head = NULL;
> > + for (i=0; i < ctxt_count; i++) {
> > + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
> > + if (ctxt) {
> > + ctxt->next = xprt->sc_ctxt_head;
> > + xprt->sc_ctxt_head = ctxt;
> > + xprt->sc_ctxt_cnt ++;
> > + }
> > + }
> > +}
> > +
> > +static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
> > +{
> > + struct svc_rdma_op_ctxt *next;
> > + if (!ctxt)
> > + return;
> > +
> > + do {
> > + next = ctxt->next;
> > + kfree(ctxt);
> > + ctxt = next;
> > + } while (next);
> > +}
> > +
> > +static struct svcxprt_rdma *rdma_create_xprt(int listener)
> > +{
> > + struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
> > +
> > + if (!cma_xprt)
> > + return NULL;
> > +
> > + INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
> > + INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
> > + INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
> > + init_waitqueue_head(&cma_xprt->sc_send_wait);
> > + init_waitqueue_head(&cma_xprt->sc_read_wait);
> > +
> > + spin_lock_init(&cma_xprt->sc_lock);
> > + spin_lock_init(&cma_xprt->sc_read_lock);
> > + spin_lock_init(&cma_xprt->sc_ctxt_lock);
> > + spin_lock_init(&cma_xprt->sc_rq_dto_lock);
> > +
> > + cma_xprt->sc_ord = svcrdma_ord;
> > +
> > + cma_xprt->sc_max_req_size = svcrdma_max_req_size;
> > + cma_xprt->sc_max_requests = svcrdma_max_requests;
> > + cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
> > + atomic_set(&cma_xprt->sc_sq_count,0);
> > + atomic_set(&cma_xprt->sc_read_count,0);
> > +
> > + if (!listener) {
> > + int reqs = cma_xprt->sc_max_requests;
> > + create_context_cache(cma_xprt,
> > + reqs << 1, /* starting size */
> > + reqs, /* bump amount */
> > + reqs +
> > + cma_xprt->sc_sq_depth +
> > + RPCRDMA_MAX_THREADS); /* max */
> > +
> > + if (!cma_xprt->sc_ctxt_head) {
> > + kfree(cma_xprt);
> > + return NULL;
> > + }
> > + }
> > +
> > + return cma_xprt;
> > +}
> > +
> > +static void svc_rdma_put(struct svc_sock *xprt)
> > +{
> > + struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
> > +
> > + if (atomic_dec_and_test(&xprt->sk_inuse)) {
> > + BUG_ON(! test_bit(SK_DEAD, &xprt->sk_flags));
> > +
> > + printk("svcrdma: Destroying transport %p, cm_id=%p, "
> > + "sk_flags=%lx\n",
> > + xprt, rdma->sc_cm_id, xprt->sk_flags);
> > +
> > + rdma_disconnect(rdma->sc_cm_id);
> > + rdma_destroy_id(rdma->sc_cm_id);
> > + rdma_destroy_xprt(rdma);
> > + }
> > +}
> > +
> > +struct page *svc_rdma_get_page(void)
> > +{
> > + struct page *page;
> > +
> > + while ((page = alloc_page(GFP_KERNEL))==NULL) {
> > + /* If we can't get memory, wait a bit and try again */
> > + printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 jiffies.\n");
> > + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
> HZ
> See comment above about tying up threads. Also note that you are
> probably better off using __GFP_NOFAIL instead of the loop.
>

I thought that the __GFP_NOFAIL pool was a precious resource and that a
memory out condition was one of the places you'd actually want to sleep.
I basically copied this approach from the svc_recv implementation in
svcsock.c

> > + }
> > + return page;
> > +}
> > +
> > +int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
> > +{
> > + struct ib_recv_wr recv_wr, *bad_recv_wr;
> > + struct svc_rdma_op_ctxt *ctxt;
> > + struct page *page;
> > + unsigned long pa;
> > + int sge_no;
> > + int buflen;
> > + int ret;
> > +
> > + ctxt = svc_rdma_get_context(xprt);
> > + buflen = 0;
> > + ctxt->direction = DMA_FROM_DEVICE;
> > + for (sge_no=0; buflen < xprt->sc_max_req_size; sge_no++) {
> > + BUG_ON(sge_no >= xprt->sc_max_sge);
> > + page = svc_rdma_get_page();
> > + ctxt->pages[sge_no] = page;
> > + pa = ib_dma_map_page(xprt->sc_cm_id->device,
> > + page, 0, PAGE_SIZE,
> > + DMA_FROM_DEVICE);
> > + ctxt->sge[sge_no].addr = pa;
> > + ctxt->sge[sge_no].length = PAGE_SIZE;
> > + ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
> > + buflen += PAGE_SIZE;
> > + }
> > + ctxt->count = sge_no;
> > + recv_wr.next = NULL;
> > + recv_wr.sg_list = &ctxt->sge[0];
> > + recv_wr.num_sge = ctxt->count;
> > + recv_wr.wr_id = (u64)(unsigned long)ctxt;
> > +
> > + ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
> > + return ret;
> > +}
> > +
> > +
> > +/*
> > + * This function handles the CONNECT_REQUEST event on a listening
> > + * endpoint. It is passed the cma_id for the _new_ connection. The context in
> > + * this cma_id is inherited from the listening cma_id and is the svc_sock
> > + * structure for the listening endpoint.
> > + *
> > + * This function creates a new xprt for the new connection and enqueues it on
> > + * the accept queue for the listent xprt. When the listen thread is kicked, it
> > + * will call the recvfrom method on the listen xprt which will accept the new
> > + * connection.
> > + */
> > +static void handle_connect_req(struct rdma_cm_id *new_cma_id)
> > +{
> > + struct svcxprt_rdma *listen_xprt = new_cma_id->context;
> > + struct svcxprt_rdma *newxprt;
> > +
> > + /* Create a new transport */
> > + newxprt = rdma_create_xprt(0);
> > + if (!newxprt) {
> > + dprintk("svcrdma: failed to create new transport\n");
> > + return;
> > + }
> > + newxprt->sc_cm_id = new_cma_id;
> > + new_cma_id->context = newxprt;
> > + dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
> > + newxprt, newxprt->sc_cm_id, listen_xprt);
> > +
> > + /* Initialize the new transport */
> > + newxprt->sc_xprt.sk_server = listen_xprt->sc_xprt.sk_server;
> > + newxprt->sc_xprt.sk_lastrecv = get_seconds();
> > + newxprt->sc_xprt.sk_delete = svc_rdma_delete;
> > + newxprt->sc_xprt.sk_recvfrom = svc_rdma_recvfrom;
> > + newxprt->sc_xprt.sk_sendto = svc_rdma_sendto;
> > + newxprt->sc_xprt.sk_put = svc_rdma_put;
> > + newxprt->sc_xprt.sk_prep_reply_buf = svc_rdma_prep_reply_buf;
> > + newxprt->sc_xprt.sk_defer = svc_rdma_defer;
> > + newxprt->sc_xprt.sk_revisit = svc_rdma_revisit;
> > + newxprt->sc_xprt.sk_pool = NULL;
> > +
> > + atomic_set(&newxprt->sc_xprt.sk_inuse, 1);
> > + set_bit(SK_TEMP, &newxprt->sc_xprt.sk_flags);
> > + INIT_LIST_HEAD(&newxprt->sc_xprt.sk_ready);
> > + INIT_LIST_HEAD(&newxprt->sc_xprt.sk_list);
> > + INIT_LIST_HEAD(&newxprt->sc_xprt.sk_deferred);
> > + spin_lock_init(&newxprt->sc_xprt.sk_defer_lock);
> > + mutex_init(&newxprt->sc_xprt.sk_mutex);
> > +
> > + /* Enqueue the new transport on the accept queue of the listening
> > + * transport */
> > + spin_lock_bh(&listen_xprt->sc_lock);
> > + list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
> > + spin_unlock_bh(&listen_xprt->sc_lock);
>
> Why do you need bh-safe spinlocks here?
>

This list is appended to by a callback running in another context in the
rdma_cm. I actually think that this is a thread and not a tasklet. I
think this is a case of blindly copying the svcsock locking style.

> > +
> > + listen_xprt->sc_xprt.sk_pool = NULL;
> > + set_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
> > + svc_sock_enqueue(&listen_xprt->sc_xprt);
> > +}
> > +
> > +/*
> > + * Handles events generated on the listening endpoint. These events will be
> > + * either be incoming connect requests or adapter removal events.
> > + * @param cma_id The CMA ID for the listening endpoint
> > + * @event the event being delivered.
> > + */
> > +static int
> > +rdma_listen_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
> > +{
> > + struct svcxprt_rdma *xprt = cma_id->context;
> > + int ret = 0;
> > +
> > + switch (event->event) {
> > + case RDMA_CM_EVENT_CONNECT_REQUEST:
> > + dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, event=%d\n",
> > + cma_id, cma_id->context, event->event);
> > + handle_connect_req(cma_id);
> > + break;
> > +
> > + case RDMA_CM_EVENT_ESTABLISHED:
> > + /* Accept complete */
> > + dprintk("svcrdma: Connection completed on LISTEN xprt=%p, cm_id=%p\n",
> > + xprt, cma_id);
> > + break;
> > +
> > + case RDMA_CM_EVENT_DEVICE_REMOVAL:
> > + dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
> > + xprt, cma_id);
> > + if (xprt)
> > + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> > + break;
> > +
> > + default:
> > + dprintk("svcrdma: Unexpected event on listening endpoint %p, event=%d\n",
> > + cma_id, event->event);
> > + break;
> > + }
> > +
> > + return ret;
> > +}
> > +
> > +static int
> > +rdma_cma_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
> > +{
> > + struct svcxprt_rdma *xprt = cma_id->context;
> > + int ret = 0;
> > +
> > + switch (event->event) {
> > + case RDMA_CM_EVENT_ESTABLISHED:
> > + /* Accept complete */
> > + dprintk("svcrdma: Connection completed on DTO xprt=%p, cm_id=%p\n",
> > + xprt, cma_id);
> > + break;
> > +
> > + case RDMA_CM_EVENT_DISCONNECTED:
> > + dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
> > + xprt, cma_id);
> > + if (xprt) {
> > + xprt->sc_xprt.sk_pool = NULL;
> > + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> > + svc_sock_enqueue(&xprt->sc_xprt);
> > + }
> > + break;
> > +
> > + case RDMA_CM_EVENT_DEVICE_REMOVAL:
> > + dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, event=%d\n",
> > + cma_id, cma_id->context, event->event);
> > + if (xprt) {
> > + xprt->sc_xprt.sk_pool = NULL;
> > + set_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags);
> > + svc_sock_enqueue(&xprt->sc_xprt);
> > + }
> > + break;
> > +
> > + default:
> > + dprintk("svcrdma: Unexpected event on DTO endpoint %p, event=%d\n",
> > + cma_id, event->event);
> > + break;
> > + }
> > +
> > + return ret;
> > +}
> > +
> > +/*
> > + * Create a listening RDMA service endpoint
> > + * @param serv the RPC service this instance will belong to
> > + * @param protocol the protocol for the instance
> > + * @param sa the address to bind the local interface to
> > + * @return 0 on success, negative value for errors
> > + */
> > +int svc_rdma_create_listen(struct svc_serv *serv, int protocol,
> > + struct sockaddr *sa)
> > +{
> > + struct rdma_cm_id *listen_id;
> > + struct svcxprt_rdma *cma_xprt;
> > + struct svc_sock *xprt;
> > + int ret;
> > +
> > + dprintk("svcrdma: Creating RDMA socket\n");
> > +
> > + cma_xprt = rdma_create_xprt(1);
> > + if (!cma_xprt)
> > + return -ENOMEM;
> > +
> > + xprt = &cma_xprt->sc_xprt;
> > + xprt->sk_delete = svc_rdma_delete;
> > + xprt->sk_recvfrom = svc_rdma_accept;
> > + xprt->sk_put = svc_rdma_put;
> > + xprt->sk_prep_reply_buf = svc_rdma_prep_reply_buf;
> > + xprt->sk_server = serv;
> > + xprt->sk_lastrecv = get_seconds();
> > + INIT_LIST_HEAD(&xprt->sk_ready);
> > + INIT_LIST_HEAD(&xprt->sk_list);
> > + INIT_LIST_HEAD(&xprt->sk_deferred);
> > + spin_lock_init(&xprt->sk_defer_lock);
> > + mutex_init(&xprt->sk_mutex);
> > + xprt->sk_pool = NULL;
> > + atomic_set(&xprt->sk_inuse, 1);
> > + spin_lock_bh(&serv->sv_lock);
> > + list_add(&xprt->sk_list, &serv->sv_permsocks);
> > + spin_unlock_bh(&serv->sv_lock);
> > + clear_bit(SK_BUSY, &xprt->sk_flags);
> > +
> > + /*
> > + * We shouldn't receive any events (except device removal) on
> > + * the id until we submit the listen request. Any events that
> > + * we do receive will get logged as errors and ignored
> > + */
> > + listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
> > + if (IS_ERR(listen_id)) {
> > + ret = PTR_ERR(listen_id);
> > + rdma_destroy_xprt(cma_xprt);
> > + dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
> > + return ret;
> > + }
> > + ret = rdma_bind_addr(listen_id, sa);
> > + if (ret) {
> > + ret = PTR_ERR(listen_id);
> > + rdma_destroy_xprt(cma_xprt);
> > + rdma_destroy_id(listen_id);
> > + dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
> > + return ret;
> > + }
> > + cma_xprt->sc_cm_id = listen_id;
> > +
> > + /* The xprt is ready to process events at this point */
> > + ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
> > + if (ret) {
> > + ret = PTR_ERR(listen_id);
> > + rdma_destroy_id(listen_id);
> > + rdma_destroy_xprt(cma_xprt);
> > + dprintk("svcrdma: rdma_listen failed = %d\n", ret);
> > + return ret;
> > + }
> > +
> > + return 0;
> > +}
> > +
> > +/*
> > + * This is the sk_recvfrom function for listening endpoints. It's purpose is
> > + * to accept incoming connections. The CMA callback handler has already
> > + * created a new transport and attached the new CMA ID.
> > + *
> > + * There is a queue of pending connections hung on the listening
> > + * transport. This queue contains the new svc_sock structure. This function
> > + * takes svc_sock structures off the accept_q and completes the
> > + * connection.
> > + */
> > +static int
> > +svc_rdma_accept(struct svc_rqst *rqstp)
> > +{
> > + struct svc_sock *xprt = rqstp->rq_sock;
> > + struct svcxprt_rdma *listen_xprt;
> > + struct svcxprt_rdma *newxprt;
> > + struct rdma_conn_param conn_param;
> > + struct ib_qp_init_attr qp_attr;
> > + struct ib_device_attr devattr;
> > + int ret;
> > + int i;
> > +
> > + listen_xprt = (struct svcxprt_rdma*)xprt;
> > + if (list_empty(&listen_xprt->sc_accept_q)) {
> > + printk(KERN_INFO
> > + "svcrdma: woke-up with no pending connection!\n");
> > + clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
> > + BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
> > + clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
> > + return 0;
> > + }
> > +
> > + /* Get the next entry off the accept list */
> > + spin_lock_bh(&listen_xprt->sc_lock);
> > + newxprt = list_entry(listen_xprt->sc_accept_q.next,
> > + struct svcxprt_rdma, sc_accept_q);
> > + list_del_init(&newxprt->sc_accept_q);
> > + spin_unlock_bh(&listen_xprt->sc_lock);
> > +
> > + dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
> > + newxprt, newxprt->sc_cm_id);
> > +
> > + ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
> > + if (ret) {
> > + printk(KERN_ERR
> > + "svcrdma: could not query device attributes on "
> > + "device %p, rc=%d\n",
> > + newxprt->sc_cm_id->device, ret);
> > + goto errout;
> > + }
> > +
> > + /* Qualify the transport resource defaults with the
> > + * capabilities of this particular device */
> > + newxprt->sc_max_sge = min((size_t)devattr.max_sge,
> > + (size_t)RPCSVC_MAXPAGES);
> > + newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
> > + (size_t)svcrdma_max_requests);
> > + newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
> > +
> > + newxprt->sc_ord = min((size_t)devattr.max_qp_rd_atom,
> > + (size_t)svcrdma_ord);
> > + spin_lock_bh(&rqstp->rq_server->sv_lock);
> > + list_add(&newxprt->sc_xprt.sk_list, &rqstp->rq_server->sv_tempsocks);
> > + rqstp->rq_server->sv_tmpcnt ++;
> > + spin_unlock_bh(&rqstp->rq_server->sv_lock);
> > +
> > + newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
> > + if (IS_ERR(newxprt->sc_pd)) {
> > + printk(KERN_ERR
> > + "svcrdma: error creating PD for connect request\n");
> > + ret = PTR_ERR(newxprt->sc_pd);
> > + goto errout;
> > + }
> > + newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
> > + sq_comp_handler,
> > + cq_event_handler,
> > + newxprt,
> > + newxprt->sc_sq_depth);
> > + if (IS_ERR(newxprt->sc_sq_cq)) {
> > + printk(KERN_ERR
> > + "svcrdma: error creating SQ CQ for connect request\n");
> > + ret = PTR_ERR(newxprt->sc_sq_cq);
> > + goto errout;
> > + }
> > + newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
> > + rq_comp_handler,
> > + cq_event_handler,
> > + newxprt,
> > + newxprt->sc_max_requests);
> > + if (IS_ERR(newxprt->sc_rq_cq)) {
> > + printk(KERN_ERR
> > + "svcrdma: error creating RQ CQ for connect request\n");
> > + ret = PTR_ERR(newxprt->sc_rq_cq);
> > + goto errout;
> > + }
> > +
> > + memset(&qp_attr, 0, sizeof qp_attr);
> > + qp_attr.event_handler = qp_event_handler;
> > + qp_attr.qp_context = newxprt;
> > + qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
> > + qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
> > + qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
> > + qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
> > + qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
> > + qp_attr.qp_type = IB_QPT_RC;
> > + qp_attr.send_cq = newxprt->sc_sq_cq;
> > + qp_attr.recv_cq = newxprt->sc_rq_cq;
> > + printk("newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
> > + "cm_id->device=%p, sc_pd->device=%p\n"
> > + "qp_attr.cap.max_send_wr = %d\n"
> > + "qp_attr.cap.max_recv_wr = %d\n"
> > + "qp_attr.cap.max_send_sge = %d\n"
> > + "qp_attr.cap.max_recv_sge = %d\n",
> > + newxprt->sc_cm_id, newxprt->sc_pd,
> > + newxprt->sc_cm_id->device, newxprt->sc_pd->device,
> > + qp_attr.cap.max_send_wr,
> > + qp_attr.cap.max_recv_wr,
> > + qp_attr.cap.max_send_sge,
> > + qp_attr.cap.max_recv_sge);
> > +
> > + ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
> > + if (ret) {
> > + /*
> > + * XXX: This is a hack. We need a xx_request_qp interface
> > + * that will adjust the qp_attr's with a best-effort
> > + * number
> > + */
> > + qp_attr.cap.max_send_sge -= 2;
> > + qp_attr.cap.max_recv_sge -= 2;
> > + ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
> > + if (ret) {
> > + printk(KERN_ERR "svcrdma: failed to create QP, ret=%d\n", ret);
> > + goto errout;
> > + }
> > + newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
> > + newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
> > + newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
> > + newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
> > + }
> > + newxprt->sc_qp = newxprt->sc_cm_id->qp;
> > + DBG_DUMP_QP(__FUNCTION__, newxprt->sc_qp, &qp_attr);
> > +
> > + /* Register all of physical memory */
> > + newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd,
> > + IB_ACCESS_LOCAL_WRITE |
> > + IB_ACCESS_REMOTE_WRITE);
> > + if (IS_ERR(newxprt->sc_phys_mr)) {
> > + ret = PTR_ERR(newxprt->sc_phys_mr);
> > + printk(KERN_ERR
> > + "svcrdma: Failed to create DMA MR ret=%d\n", ret);
> > + goto errout;
> > + }
> > +
> > + /* Post receive buffers */
> > + for (i=0; i < newxprt->sc_max_requests; i++)
> > + if ((ret = svc_rdma_post_recv(newxprt))) {
> > + printk(KERN_ERR
> > + "svcrdma: failure posting receive buffers\n");
> > + goto errout;
> > + }
> > +
> > + /* Swap out the handler */
> > + newxprt->sc_cm_id->event_handler = rdma_cma_handler;
> > +
> > + /* We will get a getattr request from the client before we see
> > + * the connect complete event because DTO's run on tasklets,
> > + * and connection events run on threads
> > + */
> > + clear_bit(SK_BUSY, &newxprt->sc_xprt.sk_flags);
> > +
> > + /* Accept Connection */
> > + memset(&conn_param, 0, sizeof conn_param);
> > + conn_param.responder_resources = 0;
> > + conn_param.initiator_depth = newxprt->sc_ord;
> > + ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
> > + if (ret) {
> > + printk(KERN_ERR
> > + "svcrdma: failed to accept new connection, ret=%d\n",
> > + ret);
> > + goto errout;
> > + }
> > +
> > + printk("svcrdma: new connection %p accepted with the following "
> > + "attributes:\n"
> > + "\tlocal_ip : %d.%d.%d.%d\n"
> > + "\tlocal_port : %d\n"
> > + "\tremote_ip : %d.%d.%d.%d\n"
> > + "\tremote_port : %d\n"
> > + "\tmax_sge : %d\n"
> > + "\tsq_depth : %d\n"
> > + "\tmax_requests : %d\n"
> > + "\tread throttle : %s\n"
> > + "\tord : %d\n",
> > + newxprt,
> > + NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
> > + route.addr.src_addr)->sin_addr.s_addr),
> > + ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
> > + route.addr.src_addr)->sin_port),
> > + NIPQUAD(((struct sockaddr_in*)&newxprt->sc_cm_id->
> > + route.addr.dst_addr)->sin_addr.s_addr),
> > + ntohs(((struct sockaddr_in*)&newxprt->sc_cm_id->
> > + route.addr.dst_addr)->sin_port),
> > + newxprt->sc_max_sge,
> > + newxprt->sc_sq_depth,
> > + newxprt->sc_max_requests,
> > + (svcrdma_read_throttle?"TRUE":"FALSE"),
> > + newxprt->sc_ord);
> > +
> > + spin_lock_bh(&listen_xprt->sc_lock);
> > + if (list_empty(&listen_xprt->sc_accept_q))
> > + clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
> > + spin_unlock_bh(&listen_xprt->sc_lock);
> > + listen_xprt->sc_xprt.sk_pool = NULL;
> > + BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
> > + clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
> > + svc_sock_enqueue(&listen_xprt->sc_xprt);
> > +
> > + ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
> > + ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
> > + return ret;
> > +
> > + errout:
> > + printk(KERN_ERR "svcrdma: failure accepting new connection rc=%d.\n",
> > + ret);
> > + BUG_ON(test_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags)==0);
> > + clear_bit(SK_BUSY, &listen_xprt->sc_xprt.sk_flags);
> > + clear_bit(SK_CONN, &listen_xprt->sc_xprt.sk_flags);
> > + rdma_destroy_id(newxprt->sc_cm_id);
> > + rdma_destroy_xprt(newxprt);
> > + return 0; /* ret; */
> > +}
> > +
> > +static void svc_rdma_delete(struct svc_sock *xprt)
> > +{
> > + struct svc_serv *serv = xprt->sk_server;
> > +
> > + spin_lock_bh(&serv->sv_lock);
> > + if (!test_and_set_bit(SK_DETACHED, &xprt->sk_flags))
> > + list_del_init(&xprt->sk_list);
> > +
> > + if (!test_and_set_bit(SK_DEAD, &xprt->sk_flags)) {
> > + BUG_ON(atomic_read(&xprt->sk_inuse)<2);
> > + atomic_dec(&xprt->sk_inuse);
> > + if (test_bit(SK_TEMP, &xprt->sk_flags))
> > + serv->sv_tmpcnt--;
> > + }
> > + spin_unlock_bh(&serv->sv_lock);
> > +}
> > +
> > +static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
> > +{
> > + if (xprt->sc_qp)
> > + ib_destroy_qp(xprt->sc_qp);
> > +
> > + if (xprt->sc_sq_cq)
> > + ib_destroy_cq(xprt->sc_sq_cq);
> > +
> > + if (xprt->sc_rq_cq)
> > + ib_destroy_cq(xprt->sc_rq_cq);
> > +
> > + if (xprt->sc_pd)
> > + ib_dealloc_pd(xprt->sc_pd);
> > +
> > + destroy_context_cache(xprt->sc_ctxt_head);
> > +
> > + if (xprt->sc_xprt.sk_info_authunix != NULL)
> > + svcauth_unix_info_release(xprt->sc_xprt.sk_info_authunix);
> > +
> > + kfree(xprt);
> > +}
> > +
> > +int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
> > +{
> > + struct ib_send_wr *bad_wr;
> > + int ret;
> > +
> > + if (test_bit(SK_CLOSE, &xprt->sc_xprt.sk_flags))
> > + return 0;
> > +
> > + BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
> > + BUG_ON(((struct svc_rdma_op_ctxt*)(unsigned long)wr->wr_id)->wr_op !=
> > + wr->opcode);
> > + /* If the SQ is full, wait until an SQ entry is available */
> > + while (1) {
> > + spin_lock_bh(&xprt->sc_lock);
> > + if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
> > + spin_unlock_bh(&xprt->sc_lock);
> > + rdma_stat_sq_starve ++;
> > + /* First see if we can opportunistically reap some SQ WR */
> > + sq_cq_reap(xprt);
> > +
> > + /* Wait until SQ WR available if SQ still full*/
> > + wait_event(xprt->sc_send_wait,
> > + atomic_read(&xprt->sc_sq_count) < xprt->sc_sq_depth);
> > + continue;
> > + }
> > + /* Bumped used SQ WR count and post */
> > + ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
> > + if (!ret)
> > + atomic_inc(&xprt->sc_sq_count);
> > + else {
> > + printk(KERN_ERR "svcrdma: failed to post SQ WR rc=%d, "
> > + "sc_sq_count=%d, sc_sq_depth=%d\n",
> > + ret, atomic_read(&xprt->sc_sq_count),
> > + xprt->sc_sq_depth);
> > + }
> > + spin_unlock_bh(&xprt->sc_lock);
> > + break;
> > + }
> > +
> > + return ret;
> > +}
> > +
> > +int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
> > + enum rpcrdma_errcode err)
> > +{
> > + struct ib_send_wr err_wr;
> > + struct ib_sge sge;
> > + struct page *p;
> > + struct svc_rdma_op_ctxt *ctxt;
> > + u32 *va;
> > + int length;
> > + int ret;
> > +
> > + p = svc_rdma_get_page();
> > + va = page_address(p);
> > +
> > + /* XDR encode error */
> > + length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
> > +
> > + /* Prepare SGE for local address */
> > + sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
> > + p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
> > + sge.lkey = xprt->sc_phys_mr->lkey;
> > + sge.length = length;
> > +
> > + ctxt = svc_rdma_get_context(xprt);
> > + ctxt->count = 1;
> > + ctxt->pages[0] = p;
> > +
> > + /* Prepare SEND WR */
> > + memset(&err_wr, 0, sizeof err_wr);
> > + ctxt->wr_op = IB_WR_SEND;
> > + err_wr.wr_id = (unsigned long)ctxt;
> > + err_wr.sg_list = &sge;
> > + err_wr.num_sge = 1;
> > + err_wr.opcode = IB_WR_SEND;
> > + err_wr.send_flags = IB_SEND_SIGNALED;
> > +
> > + /* Post It */
> > + ret = svc_rdma_send(xprt, &err_wr);
> > + if (ret) {
> > + dprintk("svcrdma: Error posting send = %d\n", ret);
> > + svc_rdma_put_context(ctxt,1);
> > + }
> > +
> > + return ret;
> > +}
> > +
> > +/*
> > + * Setup the reply buffer for the svc_process function to write the
> > + * RPC into.
> > + */
> > +static int svc_rdma_prep_reply_buf(struct svc_rqst *rqstp)
> > +{
> > + struct kvec *resv = &rqstp->rq_res.head[0];
> > +
> > + /* setup response xdr_buf.
> > + * Initially it has just one page
> > + */
> > + rqstp->rq_resused = 1;
> > + resv->iov_base = page_address(rqstp->rq_respages[0]);
> > + resv->iov_len = 0;
> > + rqstp->rq_res.pages = rqstp->rq_respages+1;
> > + rqstp->rq_res.len = 0;
> > + rqstp->rq_res.page_base = 0;
> > + rqstp->rq_res.page_len = 0;
> > + rqstp->rq_res.buflen = PAGE_SIZE;
> > + rqstp->rq_res.tail[0].iov_base = NULL;
> > + rqstp->rq_res.tail[0].iov_len = 0;
> > +
> > + return 0;
> > +}
> > +
> > +/*
> > + * This request cannot be handled right now. Allocate a structure to
> > + * keep it's state pending completion processing. To accomplish this, the
> > + * function creates an svc_rdma_op_ctxt that looks like a receive completion and
> > + * enqueues it on the svc_sock's deferred request list. When*
> > + * svc_rdma_recvfrom is subsequently called, it first checks if there is a
> > + * deferred RPC and if there is:
> > + * - Takes the deferred request off the deferred request queue
> > + * - Extracts the svc_rdma_op_ctxt from the deferred request structure
> > + * - Frees the deferred request structure
> > + * - Skips the ib_cq_poll call and processes the svc_rdma_op_ctxt as if it had
> > + * just come out of an WR pulled from the CQ.
> > + */
> > +static struct cache_deferred_req *
> > +svc_rdma_defer(struct cache_req *req)
> > +{
> > + struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
> > + struct svcxprt_rdma *xprt;
> > + struct svc_rdma_deferred_req *dr;
> > +
> > + dprintk("svcrdma: deferring request on \n"
> > + " rqstp=%p\n"
> > + " rqstp->rq_arg.len=%d\n",
> > + rqstp,
> > + rqstp->rq_arg.len);
> > +
> > + /* if more than a page, give up FIXME */
> > + if (rqstp->rq_arg.page_len)
> > + return NULL;
> > + BUG_ON(rqstp->rq_deferred);
> > + xprt = (struct svcxprt_rdma*)rqstp->rq_sock;
> > + retry:
> > + dr = kmalloc(sizeof(struct svc_rdma_deferred_req), GFP_KERNEL);
> > + if (!dr) {
> > + printk(KERN_INFO "svcrdma: sleeping waiting for memory\n");
> > + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
> > + goto retry;
> > + }
> > + dr->req.handle.owner = rqstp->rq_server;
> > + dr->req.prot = rqstp->rq_prot;
> > + dr->req.addr = rqstp->rq_addr;
> > + dr->req.daddr = rqstp->rq_daddr;
> > + dr->req.argslen = rqstp->rq_arg.len >> 2;
> > + dr->arg_page = rqstp->rq_pages[0];
> > + dr->arg_len = rqstp->rq_arg.len;
> > + rqstp->rq_pages[0] = svc_rdma_get_page();
> > +
> > + atomic_inc(&rqstp->rq_sock->sk_inuse);
> > + dr->req.svsk = rqstp->rq_sock;
> > + dr->req.handle.revisit = rqstp->rq_sock->sk_revisit;
> > +
> > + return &dr->req.handle;
> > +}
> > +
> > +/*
> > + * This is called by the cache code when it either gets an answer from
> > + * a user-mode daemon or gives up...as indicated by 'too_many'
> > + */
> > +static void svc_rdma_revisit(struct cache_deferred_req *dreq, int too_many)
> > +{
> > + struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle);
> > + struct svc_serv *serv = dreq->owner;
> > + struct svc_sock *svsk;
> > +
> > + if (unlikely(too_many)) {
> > + printk(KERN_INFO "svcrdma: giving up on deferred request "
> > + "on svc_sock=%p, too many outstanding\n", dr->svsk);
> > + dr->svsk->sk_put(dr->svsk);
> > + kfree(dr);
> > + return;
> > + }
> > + svsk = dr->svsk;
> > + dprintk("svcrdma: revisit deferred RPC on xprt=%p\n", svsk);
> > + dr->svsk = NULL;
> > + spin_lock_bh(&serv->sv_lock);
> > + list_add(&dr->handle.recent, &svsk->sk_deferred);
> > + spin_unlock_bh(&serv->sv_lock);
> > + svsk->sk_pool = NULL;
> > + set_bit(SK_DEFERRED, &svsk->sk_flags);
> > + svc_sock_enqueue(svsk);
> > + svsk->sk_put(svsk);
> > +}
> > +
> >
> >
> > -------------------------------------------------------------------------
> > This SF.net email is sponsored by DB2 Express
> > Download DB2 Express C - the FREE version of DB2 express and take
> > control of your XML. No limits. Just data. Click to get it now.
> > http://sourceforge.net/powerbar/db2/
> > _______________________________________________
> > NFS maillist - [email protected]
> > https://lists.sourceforge.net/lists/listinfo/nfs
>


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-18 21:18:05

by Trond Myklebust

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Fri, 2007-05-18 at 15:07 -0500, Tom Tucker wrote:
> On Fri, 2007-05-18 at 15:07 -0400, Trond Myklebust wrote:
> [...snip...]
> > + xprt->sc_ctxt_max);
> > > +
> > > + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
> >
> > Why do you need an irqsafe spinlock to protect this list?
> >
>
> The svc_rdma_put_context function is called from interrupt context. This
> function adds free contexts back to this list.

Why not move the calls to rq_cq_reap() and sq_cq_reap() into the
tasklet? That way you can get away with only a bh-safe lock instead of
having to shut down interrupts again.

> > > + while (xprt->sc_ctxt_cnt < target) {
> > > + xprt->sc_ctxt_cnt ++;
> > > + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> > > +
> > > + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
> > > +
> > > + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
> >
> > You've now dropped the spinlock. How can you know that the condition
> > xprt->sc_ctxt_cnt <= target is still valid?
> >
>
> I increment the sc_ctxt_cnt with the lock held. If I'm at the limit, the
> competing thread will find it equal -- correct?

Fair enough.

> > > + if (ctxt) {
> > > + at_least_one = 1;
> > > + ctxt->next = xprt->sc_ctxt_head;
> > > + xprt->sc_ctxt_head = ctxt;
> > > + } else {
> > > + /* kmalloc failed...give up for now */
> > > + xprt->sc_ctxt_cnt --;
> > > + break;
> > > + }
> > > + }
> > > + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> > > +
> > > + return at_least_one;
> > > +}
> > > +
> > > +struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
> > > +{
> > > + struct svc_rdma_op_ctxt *ctxt;
> > > + unsigned long flags;
> > > +
> > > + while (1) {
> > > + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
> > > + if (unlikely(xprt->sc_ctxt_head == NULL)) {
> > > + /* Try to bump my cache. */
> > > + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> > > +
> > > + if (rdma_bump_context_cache(xprt))
> > > + continue;
> > > +
> > > + printk(KERN_INFO "svcrdma: sleeping waiting for context "
> > > + "memory on xprt=%p\n",
> > > + xprt);
> > > + schedule_timeout_uninterruptible(msecs_to_jiffies(500));
> >
> > (HZ >> 1)
> > However this is rather naughty: you are tying up an nfsd thread that
> > could be put to doing useful work elsewhere.
> >
>
> True, it could be processing requests for another client.
>
> Here was my dilemma. I can actually predict what the worst case number
> is so that I'm guaranteed not to fail. The problem is that this is a
> very big number and I wanted to have a footprint driven by load as
> opposed to worst-case. This led to the code above.
>
> As an intermediate approach, I believe that I need about 4 contexts to
> be guaranteed I can process a request. I could attempt to acquire them
> all (at the top of svc_rdma_recvfrom) and cache them in the rqstp
> structure. If I couldn't get them, I would just return 0 (EAGAIN). This
> would be marginally wasteful on a given request, but would avoid ever
> sleeping for a context.
>
> Comments?

How about just keeping a fixed size pool, and deferring handling more
RDMA requests until you have enough free contexts in your pool to
guarantee that you can complete the request? That would be analogous to
the way the existing RPC server socket code handles the issue of socket
send buffers.

> > > + continue;
> > > + }
> > > + ctxt = xprt->sc_ctxt_head;
> > > + xprt->sc_ctxt_head = ctxt->next;
> > > + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
> > > + ctxt->xprt = xprt;
> > > + INIT_LIST_HEAD(&ctxt->dto_q);
> > > + break;
> > > + }
> > > + ctxt->count = 0;
> > > + return ctxt;
> > > +}

<snip>

> > > +
> > > +struct page *svc_rdma_get_page(void)
> > > +{
> > > + struct page *page;
> > > +
> > > + while ((page = alloc_page(GFP_KERNEL))==NULL) {
> > > + /* If we can't get memory, wait a bit and try again */
> > > + printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 jiffies.\n");
> > > + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
> > HZ
> > See comment above about tying up threads. Also note that you are
> > probably better off using __GFP_NOFAIL instead of the loop.
> >
>
> I thought that the __GFP_NOFAIL pool was a precious resource and that a
> memory out condition was one of the places you'd actually want to sleep.
> I basically copied this approach from the svc_recv implementation in
> svcsock.c

AFAIK, __GFP_NOFAIL just means keep retrying until the allocation
succeeds. It is not tied to a specific resource.




-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-19 04:27:36

by Tom Tucker

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core




On 5/18/07 4:17 PM, "Trond Myklebust" <[email protected]> wrote:

> On Fri, 2007-05-18 at 15:07 -0500, Tom Tucker wrote:
>> On Fri, 2007-05-18 at 15:07 -0400, Trond Myklebust wrote:
>> [...snip...]
>>> + xprt->sc_ctxt_max);
>>>> +
>>>> + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
>>>
>>> Why do you need an irqsafe spinlock to protect this list?
>>>
>>
>> The svc_rdma_put_context function is called from interrupt context. This
>> function adds free contexts back to this list.
>
> Why not move the calls to rq_cq_reap() and sq_cq_reap() into the
> tasklet? That way you can get away with only a bh-safe lock instead of
> having to shut down interrupts again.
>

Yes, this could be done. The current irq lock is there to accommodate the SQ
CQ side. The RQ CQ processing is already done in a tasklet because it has to
call svc_sock_enqueue which takes _bh locks.

I would need an independent dto_q protected by irqsave locks if I wanted to
avoid unnecessary calls to ib_req_notify_cq. This call is very expensive (1+
us).

In the end I think we'd swap one irqsave lock per context (WR) for one
irqsave lock per interrupt and transport. That's probably a pretty good
trade off.

>>>> + while (xprt->sc_ctxt_cnt < target) {
>>>> + xprt->sc_ctxt_cnt ++;
>>>> + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
>>>> +
>>>> + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
>>>> +
>>>> + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
>>>
>>> You've now dropped the spinlock. How can you know that the condition
>>> xprt->sc_ctxt_cnt <= target is still valid?
>>>
>>
>> I increment the sc_ctxt_cnt with the lock held. If I'm at the limit, the
>> competing thread will find it equal -- correct?
>
> Fair enough.
>
>>>> + if (ctxt) {
>>>> + at_least_one = 1;
>>>> + ctxt->next = xprt->sc_ctxt_head;
>>>> + xprt->sc_ctxt_head = ctxt;
>>>> + } else {
>>>> + /* kmalloc failed...give up for now */
>>>> + xprt->sc_ctxt_cnt --;
>>>> + break;
>>>> + }
>>>> + }
>>>> + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
>>>> +
>>>> + return at_least_one;
>>>> +}
>>>> +
>>>> +struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
>>>> +{
>>>> + struct svc_rdma_op_ctxt *ctxt;
>>>> + unsigned long flags;
>>>> +
>>>> + while (1) {
>>>> + spin_lock_irqsave(&xprt->sc_ctxt_lock, flags);
>>>> + if (unlikely(xprt->sc_ctxt_head == NULL)) {
>>>> + /* Try to bump my cache. */
>>>> + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
>>>> +
>>>> + if (rdma_bump_context_cache(xprt))
>>>> + continue;
>>>> +
>>>> + printk(KERN_INFO "svcrdma: sleeping waiting for context "
>>>> + "memory on xprt=%p\n",
>>>> + xprt);
>>>> + schedule_timeout_uninterruptible(msecs_to_jiffies(500));
>>>
>>> (HZ >> 1)
>>> However this is rather naughty: you are tying up an nfsd thread that
>>> could be put to doing useful work elsewhere.
>>>
>>
>> True, it could be processing requests for another client.
>>
>> Here was my dilemma. I can actually predict what the worst case number
>> is so that I'm guaranteed not to fail. The problem is that this is a
>> very big number and I wanted to have a footprint driven by load as
>> opposed to worst-case. This led to the code above.
>>
>> As an intermediate approach, I believe that I need about 4 contexts to
>> be guaranteed I can process a request. I could attempt to acquire them
>> all (at the top of svc_rdma_recvfrom) and cache them in the rqstp
>> structure. If I couldn't get them, I would just return 0 (EAGAIN). This
>> would be marginally wasteful on a given request, but would avoid ever
>> sleeping for a context.
>>
>> Comments?
>
> How about just keeping a fixed size pool, and deferring handling more
> RDMA requests until you have enough free contexts in your pool to
> guarantee that you can complete the request? That would be analogous to
> the way the existing RPC server socket code handles the issue of socket
> send buffers.

Yes, I think the we're saying the same thing with respect to the deferral
mechanism, however, I'm proposing attempting to grow the pool when below the
limit but deferring instead of blocking if I couldn't get the memory. I'm
worried about the size of the RDMA transport's memory footprint with lots of
clients/mounts.

>
>>>> + continue;
>>>> + }
>>>> + ctxt = xprt->sc_ctxt_head;
>>>> + xprt->sc_ctxt_head = ctxt->next;
>>>> + spin_unlock_irqrestore(&xprt->sc_ctxt_lock, flags);
>>>> + ctxt->xprt = xprt;
>>>> + INIT_LIST_HEAD(&ctxt->dto_q);
>>>> + break;
>>>> + }
>>>> + ctxt->count = 0;
>>>> + return ctxt;
>>>> +}
>
> <snip>
>
>>>> +
>>>> +struct page *svc_rdma_get_page(void)
>>>> +{
>>>> + struct page *page;
>>>> +
>>>> + while ((page = alloc_page(GFP_KERNEL))==NULL) {
>>>> + /* If we can't get memory, wait a bit and try again */
>>>> + printk(KERN_INFO "svcrdma: out of memory...retrying in 1000
>>>> jiffies.\n");
>>>> + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
>>> HZ
>>> See comment above about tying up threads. Also note that you are
>>> probably better off using __GFP_NOFAIL instead of the loop.
>>>
>>
>> I thought that the __GFP_NOFAIL pool was a precious resource and that a
>> memory out condition was one of the places you'd actually want to sleep.
>> I basically copied this approach from the svc_recv implementation in
>> svcsock.c
>
> AFAIK, __GFP_NOFAIL just means keep retrying until the allocation
> succeeds. It is not tied to a specific resource.
>

Ah, you're right. But I think think the retry is attached to a timeout
(HZ/50) or whenever any backing store device exits it's write congestion
callback. Basically, the retries will occur a lot more aggressively. Maybe
this is OK? It will certainly make the code look better.

BTW, the code at the top of svc_recv does the same thing. Should it be
changed too?

>
>



-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 14:09:18

by Greg Banks

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Fri, May 18, 2007 at 02:36:28PM -0500, Tom Tucker wrote:
> On Fri, 2007-05-18 at 15:24 -0400, J. Bruce Fields wrote:
> > On Fri, May 18, 2007 at 12:45:52PM -0500, Tom Tucker wrote:
> >
> > > +/*
> > > + * Setup the reply buffer for the svc_process function to write the
> > > + * RPC into.
> > > + */
> > > +static int svc_rdma_prep_reply_buf(struct svc_rqst *rqstp)
> > > +{
> > > + struct kvec *resv = &rqstp->rq_res.head[0];
> > > +
> > > + /* setup response xdr_buf.
> > > + * Initially it has just one page
> > > + */
> > > + rqstp->rq_resused = 1;
> > > + resv->iov_base = page_address(rqstp->rq_respages[0]);
> > > + resv->iov_len = 0;
> > > + rqstp->rq_res.pages = rqstp->rq_respages+1;
> > > + rqstp->rq_res.len = 0;
> > > + rqstp->rq_res.page_base = 0;
> > > + rqstp->rq_res.page_len = 0;
> > > + rqstp->rq_res.buflen = PAGE_SIZE;
> > > + rqstp->rq_res.tail[0].iov_base = NULL;
> > > + rqstp->rq_res.tail[0].iov_len = 0;
> > > +
> > > + return 0;
> > > +}
> >
> > I think this is a repeat of a question from yesterday--I'm not sure why
> > we bothered to pull this out into separate function when the
> > implementations all end up being identical, except for the one line for
> > the record length in the tcp case?
> >
>
> Yeah, you're right. Maybe it shouldn't be prepare_buffer, but rather
> "prepare_header". Then you'd keep the buffer code common and just have
> the one line function for the TCP record len.

Ok, I'll change my patch to push all those initialisation lines back
into svc_process() where they came from, make svc_tcp_prepare_reply()
do just the 1 line it needs to, and have no method for UDP.

Tom, just curious but in the November there was actually a difference
between the sockets and RDMA versions, the latter didn't have this
line:

rqstp->rq_res.buflen = PAGE_SIZE;

> > > +static struct cache_deferred_req *
> > > +svc_rdma_defer(struct cache_req *req)
> > > +{
> > > + struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
> > > + struct svcxprt_rdma *xprt;
> > > + struct svc_rdma_deferred_req *dr;
> > > +
> > > + dprintk("svcrdma: deferring request on \n"
> > > + " rqstp=%p\n"
> > > + " rqstp->rq_arg.len=%d\n",
> > > + rqstp,
> > > + rqstp->rq_arg.len);
> > > +
> > > + /* if more than a page, give up FIXME */
> > > + if (rqstp->rq_arg.page_len)
> > > + return NULL;
> > > + BUG_ON(rqstp->rq_deferred);
> > > + xprt = (struct svcxprt_rdma*)rqstp->rq_sock;
> > > + retry:
> > > + dr = kmalloc(sizeof(struct svc_rdma_deferred_req), GFP_KERNEL);
> > > + if (!dr) {
> > > + printk(KERN_INFO "svcrdma: sleeping waiting for memory\n");
> > > + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
> > > + goto retry;
> > > + }
> >
> > Why not return NULL, as svc_defer() does?
> >
>
> I think you're right. I'll fix this.

Why have svc_rdma_defer() at all? The generic code should be able
to do call deferral and revisiting. After all, it just needs to
shuffle bits aside from the xdr_buf and put them back later; it's not
a performance path; and the logic is convoluted and a pain to test.
To illustrate how difficult this logic is, note that svc_rdma_revisit()
leaks a page when called with too_many=1.

Like handling SK_CLOSE, it's logic that should be in the generic
code rather than replicated in three svc_foo_recvfrom() routines.

Looking at the code, it seems to me that the reason for doing your
own defer/revisit logic is to preserve the RDMA chunking header
which lives before the RPC call header. This header is skipped
by svc_rdma_xdr_decode_req(), but a pointer to it is magically
reconstructed by svc_rdma_sendto() so it can traverse the write chunks
or reply chunks therein. Am I correct?

So the thing that RDMA is doing differently from the other transports
is that it needs to go back and look at its transport-specific
on-the-wire header again after having earlier skipped over it, and that
this header needs to be preserved across a defer & revisit. Right?

So perhaps a better approach would be to slightly tweak the existing
generic defer & revisit logic to allow a transport to specify how
many bytes of transport header need to be preserved. Then the
transport-specific code needs very little code to support defer &
revisit, and doesn't duplicate multiple complex and subtle functions
which mess with svc_sock internals.

I have 4 untested patches which attempt to do this, I'll post them
in a moment.


Greg.
--
Greg Banks, R&D Software Engineer, SGI Australian Software Group.
Apparently, I'm Bedevere. Which MPHG character are you?
I don't speak for SGI.

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 14:43:34

by Tom Tucker

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core


> Tom, just curious but in the November there was actually a difference
> between the sockets and RDMA versions, the latter didn't have this
> line:
>
> rqstp->rq_res.buflen = PAGE_SIZE;

I changed the defer logic quite a bit. In fact, I don't think it worked
last November when you got the code originally. The latest code is at
linux-nfs.org/~tomtucker/linux-nfs-2.6.git

>
> > > > +static struct cache_deferred_req *
> > > > +svc_rdma_defer(struct cache_req *req)
> > > > +{
> > > > + struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
> > > > + struct svcxprt_rdma *xprt;
> > > > + struct svc_rdma_deferred_req *dr;
> > > > +
> > > > + dprintk("svcrdma: deferring request on \n"
> > > > + " rqstp=%p\n"
> > > > + " rqstp->rq_arg.len=%d\n",
> > > > + rqstp,
> > > > + rqstp->rq_arg.len);
> > > > +
> > > > + /* if more than a page, give up FIXME */
> > > > + if (rqstp->rq_arg.page_len)
> > > > + return NULL;
> > > > + BUG_ON(rqstp->rq_deferred);
> > > > + xprt = (struct svcxprt_rdma*)rqstp->rq_sock;
> > > > + retry:
> > > > + dr = kmalloc(sizeof(struct svc_rdma_deferred_req), GFP_KERNEL);
> > > > + if (!dr) {
> > > > + printk(KERN_INFO "svcrdma: sleeping waiting for memory\n");
> > > > + schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
> > > > + goto retry;
> > > > + }
> > >
> > > Why not return NULL, as svc_defer() does?
> > >
> >
> > I think you're right. I'll fix this.
>
> Why have svc_rdma_defer() at all? The generic code should be able
> to do call deferral and revisiting. After all, it just needs to
> shuffle bits aside from the xdr_buf and put them back later; it's not
> a performance path; and the logic is convoluted and a pain to test.
> To illustrate how difficult this logic is, note that svc_rdma_revisit()
> leaks a page when called with too_many=1.
>
> Like handling SK_CLOSE, it's logic that should be in the generic
> code rather than replicated in three svc_foo_recvfrom() routines.
>
> Looking at the code, it seems to me that the reason for doing your
> own defer/revisit logic is to preserve the RDMA chunking header
> which lives before the RPC call header. This header is skipped
> by svc_rdma_xdr_decode_req(), but a pointer to it is magically
> reconstructed by svc_rdma_sendto() so it can traverse the write chunks
> or reply chunks therein. Am I correct?
>

You are absolutely correct.

> So the thing that RDMA is doing differently from the other transports
> is that it needs to go back and look at its transport-specific
> on-the-wire header again after having earlier skipped over it, and that
> this header needs to be preserved across a defer & revisit. Right?
>

Yes again.

> So perhaps a better approach would be to slightly tweak the existing
> generic defer & revisit logic to allow a transport to specify how
> many bytes of transport header need to be preserved. Then the
> transport-specific code needs very little code to support defer &
> revisit, and doesn't duplicate multiple complex and subtle functions
> which mess with svc_sock internals.

The svcsock defer logic only supports a simple RPC without a page list.
The logic even has a comment that says "FIX ME". So I assumed that in
the fullness of time, any RPC could be deferred. If this is in fact the
case, then I believe it is better to have a separate method. If it is
not the case, then the defer logic can be made generic.

> I have 4 untested patches which attempt to do this, I'll post them
> in a moment.

> Greg.


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 14:56:01

by Greg Banks

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Wed, May 23, 2007 at 09:43:30AM -0500, Tom Tucker wrote:
>
> > Tom, just curious but in the November there was actually a difference
> > between the sockets and RDMA versions, the latter didn't have this
> > line:
> >
> > rqstp->rq_res.buflen = PAGE_SIZE;
>
> I changed the defer logic quite a bit. In fact, I don't think it worked
> last November when you got the code originally. The latest code is at
> linux-nfs.org/~tomtucker/linux-nfs-2.6.git

Thanks. I have a checkout from there as of several days ago but
I haven't ported all my patches to use it yet.

> > So perhaps a better approach would be to slightly tweak the existing
> > generic defer & revisit logic to allow a transport to specify how
> > many bytes of transport header need to be preserved. Then the
> > transport-specific code needs very little code to support defer &
> > revisit, and doesn't duplicate multiple complex and subtle functions
> > which mess with svc_sock internals.
>
> The svcsock defer logic only supports a simple RPC without a page list.
> The logic even has a comment that says "FIX ME". So I assumed that in
> the fullness of time, any RPC could be deferred. If this is in fact the
> case, then I believe it is better to have a separate method. If it is
> not the case, then the defer logic can be made generic.

I'm not sure what you mean here.

Currently neither svc_defer() nor svc_rdma_defer() support calls longer
than just the head, meaning that WRITE RPCs (only) are dropped and
retried instead of deferred. This doesn't seem to be too much of a
problem; deferral happens so infrequently anyway.

If for some reason the svc_defer() was to have the "FIXME" actually
implemented, WRITE RPCs would magically start to be deferred rather
than dropped, for the sockets case and presumably also for the
RDMA case. Is this a problem?

Greg.
--
Greg Banks, R&D Software Engineer, SGI Australian Software Group.
Apparently, I'm Bedevere. Which MPHG character are you?
I don't speak for SGI.

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 15:03:27

by Trond Myklebust

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Thu, 2007-05-24 at 00:55 +1000, Greg Banks wrote:
> Currently neither svc_defer() nor svc_rdma_defer() support calls longer
> than just the head, meaning that WRITE RPCs (only) are dropped and
> retried instead of deferred. This doesn't seem to be too much of a
> problem; deferral happens so infrequently anyway.

That still isn't a good tactic.

On NFSv4, dropping a request requires the server to also drop the
connection. Unless you are completely out of resources, you are
inevitably better off having the server return an NFS4ERR_DELAY or
something like that.

Trond


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 15:03:46

by Tom Tucker

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Thu, 2007-05-24 at 00:55 +1000, Greg Banks wrote:
> On Wed, May 23, 2007 at 09:43:30AM -0500, Tom Tucker wrote:
> >
> > > Tom, just curious but in the November there was actually a difference
> > > between the sockets and RDMA versions, the latter didn't have this
> > > line:
> > >
> > > rqstp->rq_res.buflen = PAGE_SIZE;
> >
> > I changed the defer logic quite a bit. In fact, I don't think it worked
> > last November when you got the code originally. The latest code is at
> > linux-nfs.org/~tomtucker/linux-nfs-2.6.git
>
> Thanks. I have a checkout from there as of several days ago but
> I haven't ported all my patches to use it yet.
>
> > > So perhaps a better approach would be to slightly tweak the existing
> > > generic defer & revisit logic to allow a transport to specify how
> > > many bytes of transport header need to be preserved. Then the
> > > transport-specific code needs very little code to support defer &
> > > revisit, and doesn't duplicate multiple complex and subtle functions
> > > which mess with svc_sock internals.
> >
> > The svcsock defer logic only supports a simple RPC without a page list.
> > The logic even has a comment that says "FIX ME". So I assumed that in
> > the fullness of time, any RPC could be deferred. If this is in fact the
> > case, then I believe it is better to have a separate method. If it is
> > not the case, then the defer logic can be made generic.
>
> I'm not sure what you mean here.
>
> Currently neither svc_defer() nor svc_rdma_defer() support calls longer
> than just the head, meaning that WRITE RPCs (only) are dropped and
> retried instead of deferred. This doesn't seem to be too much of a
> problem; deferral happens so infrequently anyway.
>
> If for some reason the svc_defer() was to have the "FIXME" actually
> implemented, WRITE RPCs would magically start to be deferred rather
> than dropped, for the sockets case and presumably also for the
> RDMA case. Is this a problem?
>

That's what I'm asking. I don't think it is, and if not, then I concur
we can normalize the defer logic.


> Greg.


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 15:12:09

by Tom Tucker

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core


Ah, this is a very good point. So then I think we need a transport
specific deferral mechanism.

On Wed, 2007-05-23 at 11:03 -0400, Trond Myklebust wrote:
> On Thu, 2007-05-24 at 00:55 +1000, Greg Banks wrote:
> > Currently neither svc_defer() nor svc_rdma_defer() support calls longer
> > than just the head, meaning that WRITE RPCs (only) are dropped and
> > retried instead of deferred. This doesn't seem to be too much of a
> > problem; deferral happens so infrequently anyway.
>
> That still isn't a good tactic.
>
> On NFSv4, dropping a request requires the server to also drop the
> connection. Unless you are completely out of resources, you are
> inevitably better off having the server return an NFS4ERR_DELAY or
> something like that.
>
> Trond
>


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 15:37:41

by Trond Myklebust

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Wed, 2007-05-23 at 10:12 -0500, Tom Tucker wrote:
> Ah, this is a very good point. So then I think we need a transport
> specific deferral mechanism.

No. I'm not sure that justifies a transport specific mechanism. It
rather calls for a more clever algorithm for deferring. Both NFSv3 and
NFSv4 have generic error messages that state 'I'm busy now, please try
again later'. As I said earlier, returning those errors are inevitably
more efficient than dropping.
Even for NFSv3 over TCP, dropping a single WRITE request will typically
cause a 60 second flat holdup instead of the more desirable retry +
exponential backoff.

Trond


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 16:02:48

by Tom Tucker

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Wed, 2007-05-23 at 11:37 -0400, Trond Myklebust wrote:
> On Wed, 2007-05-23 at 10:12 -0500, Tom Tucker wrote:
> > Ah, this is a very good point. So then I think we need a transport
> > specific deferral mechanism.
>
> No. I'm not sure that justifies a transport specific mechanism. It
> rather calls for a more clever algorithm for deferring. Both NFSv3 and
> NFSv4 have generic error messages that state 'I'm busy now, please try
> again later'. As I said earlier, returning those errors are inevitably
> more efficient than dropping.
> Even for NFSv3 over TCP, dropping a single WRITE request will typically
> cause a 60 second flat holdup instead of the more desirable retry +
> exponential backoff.

Understood. My concern is very simple. I don't want to copy the data and
to avoid this I need to have a way to save off the data for later
processing in a transport specific way. All of my data is sitting in
this rdma_context structure.

The current svcsock approach just does a kmalloc and a memcpy. This is
not a good approach for a 1MB NFS WRITE.

An approach that allows a transport to give a "data cookie" to the
deferral mechanism for later recovery would allow all the complexity to
be centralized, but still allow the transport to keep the data around in
it's own way.

>
> Trond
>


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 16:29:18

by Greg Banks

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Wed, May 23, 2007 at 11:03:06AM -0400, Trond Myklebust wrote:
> On Thu, 2007-05-24 at 00:55 +1000, Greg Banks wrote:
> > Currently neither svc_defer() nor svc_rdma_defer() support calls longer
> > than just the head, meaning that WRITE RPCs (only) are dropped and
> > retried instead of deferred. This doesn't seem to be too much of a
> > problem; deferral happens so infrequently anyway.
>
> That still isn't a good tactic.
>
> On NFSv4, dropping a request requires the server to also drop the
> connection.

<pokes around in RFC3530>

Egads, you're right, there it is in section 3.1.1. Oh I see, it's
new since RFC 3010, the last nfs4 document I read. I need to get up
to date!

A quick look at the source seems to indicate that this is currently
broken. Neither nfsd4_proc_compound() nor nfsd_dispatch() do anything
special for the (nfserr_dropit, vers=4) corner. And I can see at least
one path where nfserr_dropit can percolate back up to nfsd_dispatch().

Does the client obey RFC3530 and not retry? If so we should expect
to see file corruption doing pure WRITE workloads on NFSv4 when any
of the server caches expires.

One more thing to fix.

> Unless you are completely out of resources, you are
> inevitably better off having the server return an NFS4ERR_DELAY or
> something like that.

So we could translate nfserr_dropit -> nfserr_jukebox ?
I notice that nfs4_new_open() does that.

Greg.
--
Greg Banks, R&D Software Engineer, SGI Australian Software Group.
Apparently, I'm Bedevere. Which MPHG character are you?
I don't speak for SGI.

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 16:36:03

by Greg Banks

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Wed, May 23, 2007 at 11:02:44AM -0500, Tom Tucker wrote:
> On Wed, 2007-05-23 at 11:37 -0400, Trond Myklebust wrote:
> > On Wed, 2007-05-23 at 10:12 -0500, Tom Tucker wrote:
> > > Ah, this is a very good point. So then I think we need a transport
> > > specific deferral mechanism.
> >
> > No. I'm not sure that justifies a transport specific mechanism. It
> > rather calls for a more clever algorithm for deferring. Both NFSv3 and
> > NFSv4 have generic error messages that state 'I'm busy now, please try
> > again later'. As I said earlier, returning those errors are inevitably
> > more efficient than dropping.
> > Even for NFSv3 over TCP, dropping a single WRITE request will typically
> > cause a 60 second flat holdup instead of the more desirable retry +
> > exponential backoff.
>
> Understood. My concern is very simple. I don't want to copy the data and
> to avoid this I need to have a way to save off the data for later
> processing in a transport specific way. All of my data is sitting in
> this rdma_context structure.
>
> The current svcsock approach just does a kmalloc and a memcpy. This is
> not a good approach for a 1MB NFS WRITE.

Sure, but this is true regardless of the transport.

> An approach that allows a transport to give a "data cookie" to the
> deferral mechanism for later recovery would allow all the complexity to
> be centralized, but still allow the transport to keep the data around in
> it's own way.

I'm still confused. Which data are you referring to? Which
rdma_context structure, I don't see anything symbol like that in
the source?

Greg.
--
Greg Banks, R&D Software Engineer, SGI Australian Software Group.
Apparently, I'm Bedevere. Which MPHG character are you?
I don't speak for SGI.

-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 18:07:52

by Trond Myklebust

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Thu, 2007-05-24 at 02:29 +1000, Greg Banks wrote:
> On Wed, May 23, 2007 at 11:03:06AM -0400, Trond Myklebust wrote:
> > On NFSv4, dropping a request requires the server to also drop the
> > connection.
>
> <pokes around in RFC3530>
>
> Egads, you're right, there it is in section 3.1.1. Oh I see, it's
> new since RFC 3010, the last nfs4 document I read. I need to get up
> to date!
>
> A quick look at the source seems to indicate that this is currently
> broken. Neither nfsd4_proc_compound() nor nfsd_dispatch() do anything
> special for the (nfserr_dropit, vers=4) corner. And I can see at least
> one path where nfserr_dropit can percolate back up to nfsd_dispatch().
>
> Does the client obey RFC3530 and not retry? If so we should expect
> to see file corruption doing pure WRITE workloads on NFSv4 when any
> of the server caches expires.

The client will eventually retry (after the 60 second delay), but the
convention is that it must break the connection before it does so.

> > Unless you are completely out of resources, you are
> > inevitably better off having the server return an NFS4ERR_DELAY or
> > something like that.
>
> So we could translate nfserr_dropit -> nfserr_jukebox ?
> I notice that nfs4_new_open() does that.

Yes. For NFSv3 and NFSv4 that is infinitely better than dropping the
request, and will result in faster client recovery.

Cheers
Trond


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 18:19:27

by Talpey, Thomas

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

At 12:29 PM 5/23/2007, Greg Banks wrote:
>So we could translate nfserr_dropit -> nfserr_jukebox ?
>I notice that nfs4_new_open() does that.

If WRITEs start returning nfserr_jukebox because there happens to
be a large-ish number of them in progress at the server, then that
server's throughput is going to be in a world of hurt. Clients will be
forced to back off in herds, and herds don't back off well.

I feel strongly that we need a good, workable defer mechanism
that actually defers. Yes, it's maybe hard. But it's important!

Tom.


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs

2007-05-23 18:37:27

by Trond Myklebust

[permalink] [raw]
Subject: Re: [RFC,PATCH 11/15] knfsd: RDMA transport core

On Wed, 2007-05-23 at 14:19 -0400, Talpey, Thomas wrote:
> At 12:29 PM 5/23/2007, Greg Banks wrote:
> >So we could translate nfserr_dropit -> nfserr_jukebox ?
> >I notice that nfs4_new_open() does that.
>
> If WRITEs start returning nfserr_jukebox because there happens to
> be a large-ish number of them in progress at the server, then that
> server's throughput is going to be in a world of hurt. Clients will be
> forced to back off in herds, and herds don't back off well.

> I feel strongly that we need a good, workable defer mechanism
> that actually defers. Yes, it's maybe hard. But it's important!

Unless you have a way of capping the number of requests that are
deferred, you can quickly end up turning this into a resource issue.


For the case of UDP, dropping requests is acceptable since it is an
unreliable transport, and so clients are expected to have low
retransmission timeouts.

On TCP sockets you can probably set a per-socket limit on the number of
deferrals. As soon as you hit that number, then just stop handling any
further requests on that particular socket (i.e. leave any further data
queued in the socket buffer and let the server thread go to work on
another socket) until a sufficient number of deferrals have been cleared
out.
I assume that you could devise a similar scheme with RDMA pretty much by
substituting the word 'slot' for 'socket' in the previous paragraph,
right?

Cheers
Trond


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
NFS maillist - [email protected]
https://lists.sourceforge.net/lists/listinfo/nfs