From: "Labiaga, Ricardo" Subject: Re: [RFC 04/11] nfsd41: sunrpc: Added rpc server-side backchannel handling Date: Wed, 20 May 2009 11:40:15 -0700 Message-ID: References: <4A13C0B0.4080207@panasas.com> Mime-Version: 1.0 Content-Type: text/plain; charset="US-ASCII" Cc: , , , Rahul Iyer , Mike Sager , Marc Eshel , Andy Adamson , Alexandros Batsakis To: Benny Halevy Return-path: Received: from mx2.netapp.com ([216.240.18.37]:14636 "EHLO mx2.netapp.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755219AbZETSth (ORCPT ); Wed, 20 May 2009 14:49:37 -0400 In-Reply-To: <4A13C0B0.4080207@panasas.com> Sender: linux-nfs-owner@vger.kernel.org List-ID: On 5/20/09 1:34 AM, "Benny Halevy" wrote: > On May. 20, 2009, 6:00 +0300, Ricardo Labiaga > wrote: >> Signed-off-by: Rahul Iyer >> Signed-off-by: Mike Sager >> Signed-off-by: Marc Eshel >> Signed-off-by: Benny Halevy >> >> When the call direction is a reply, copy the xid and call direction into the >> req->rq_private_buf.head[0].iov_base otherwise rpc_verify_header returns >> rpc_garbage. >> >> Signed-off-by: Andy Adamson >> Signed-off-by: Benny Halevy >> [get rid of CONFIG_NFSD_V4_1] >> Signed-off-by: Benny Halevy >> >> [sunrpc: refactoring of svc_tcp_recvfrom] >> Signed-off-by: Alexandros Batsakis >> Signed-off-by: Ricardo Labiaga >> --- >> include/linux/sunrpc/clnt.h | 1 + >> include/linux/sunrpc/svcsock.h | 1 + >> include/linux/sunrpc/xprt.h | 2 + >> net/sunrpc/clnt.c | 1 + >> net/sunrpc/svcsock.c | 102 +++++++++++++-- >> net/sunrpc/xprt.c | 41 ++++++- >> net/sunrpc/xprtsock.c | 278 >> +++++++++++++++++++++++++++++++++++++++- >> 7 files changed, 405 insertions(+), 21 deletions(-) >> >> diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h >> index c39a210..cf9a8ec 100644 >> --- a/include/linux/sunrpc/clnt.h >> +++ b/include/linux/sunrpc/clnt.h >> @@ -110,6 +110,7 @@ struct rpc_create_args { >> rpc_authflavor_t authflavor; >> unsigned long flags; >> char *client_name; >> + struct svc_sock *bc_sock; /* NFSv4.1 backchannel */ >> }; >> >> /* Values for "flags" field */ >> diff --git a/include/linux/sunrpc/svcsock.h b/include/linux/sunrpc/svcsock.h >> index 8271631..19228f4 100644 >> --- a/include/linux/sunrpc/svcsock.h >> +++ b/include/linux/sunrpc/svcsock.h >> @@ -28,6 +28,7 @@ struct svc_sock { >> /* private TCP part */ >> u32 sk_reclen; /* length of record */ >> u32 sk_tcplen; /* current read length */ >> + struct rpc_xprt *sk_bc_xprt; /* NFSv4.1 backchannel xprt */ >> }; >> >> /* >> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h >> index 1758d9f..063a6a7 100644 >> --- a/include/linux/sunrpc/xprt.h >> +++ b/include/linux/sunrpc/xprt.h >> @@ -174,6 +174,7 @@ struct rpc_xprt { >> spinlock_t reserve_lock; /* lock slot table */ >> u32 xid; /* Next XID value to use */ >> struct rpc_task * snd_task; /* Task blocked in send */ >> + struct svc_sock *bc_sock; /* NFSv4.1 backchannel */ >> struct list_head recv; >> >> struct { >> @@ -197,6 +198,7 @@ struct xprt_create { >> struct sockaddr * srcaddr; /* optional local address */ >> struct sockaddr * dstaddr; /* remote peer address */ >> size_t addrlen; >> + struct svc_sock *bc_sock; /* NFSv4.1 backchannel */ >> }; >> >> struct xprt_class { >> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c >> index 5abab09..3dc847f 100644 >> --- a/net/sunrpc/clnt.c >> +++ b/net/sunrpc/clnt.c >> @@ -266,6 +266,7 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args) >> .srcaddr = args->saddress, >> .dstaddr = args->address, >> .addrlen = args->addrsize, >> + .bc_sock = args->bc_sock, >> }; >> char servername[48]; >> >> diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c >> index b739111..90c9a75 100644 >> --- a/net/sunrpc/svcsock.c >> +++ b/net/sunrpc/svcsock.c >> @@ -49,6 +49,7 @@ >> #include >> #include >> #include >> +#include >> >> #define RPCDBG_FACILITY RPCDBG_SVCXPRT >> >> @@ -895,6 +896,57 @@ static int svc_tcp_recv_record(struct svc_sock *svsk, >> struct svc_rqst *rqstp) >> return -EAGAIN; >> } >> >> +static int svc_process_calldir(struct svc_sock *svsk, struct svc_rqst >> *rqstp, >> + struct rpc_rqst **reqpp, struct kvec *vec) >> +{ >> + struct rpc_rqst *req = NULL; >> + u32 *p; >> + u32 xid; >> + u32 calldir; >> + int len; >> + >> + len = svc_recvfrom(rqstp, vec, 1, 8); >> + if (len < 0) >> + goto error; >> + >> + p = (u32 *)rqstp->rq_arg.head[0].iov_base; >> + xid = *p++; >> + calldir = *p; >> + >> + if (calldir == 0) { >> + /* REQUEST is the most common case */ >> + vec[0] = rqstp->rq_arg.head[0]; >> + } else { >> + /* REPLY */ >> + if (svsk->sk_bc_xprt) >> + req = xprt_lookup_rqst(svsk->sk_bc_xprt, xid); >> + >> + if (!req) { >> + printk(KERN_NOTICE >> + "%s: Got unrecognized reply: " >> + "calldir 0x%x sk_bc_xprt %p xid %08x\n", >> + __func__, ntohl(calldir), >> + svsk->sk_bc_xprt, xid); >> + vec[0] = rqstp->rq_arg.head[0]; >> + goto out; >> + } >> + >> + memcpy(&req->rq_private_buf, &req->rq_rcv_buf, >> + sizeof(struct xdr_buf)); >> + /* copy the xid and call direction */ >> + memcpy(req->rq_private_buf.head[0].iov_base, >> + rqstp->rq_arg.head[0].iov_base, 8); >> + vec[0] = req->rq_private_buf.head[0]; >> + } >> + out: >> + vec[0].iov_base += 8; >> + vec[0].iov_len -= 8; >> + len = svsk->sk_reclen - 8; >> + error: >> + *reqpp = req; >> + return len; >> +} >> + >> /* >> * Receive data from a TCP socket. >> */ >> @@ -906,6 +958,7 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp) >> int len; >> struct kvec *vec; >> int pnum, vlen; >> + struct rpc_rqst *req = NULL; >> >> dprintk("svc: tcp_recv %p data %d conn %d close %d\n", >> svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags), >> @@ -919,9 +972,27 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp) >> vec = rqstp->rq_vec; >> vec[0] = rqstp->rq_arg.head[0]; >> vlen = PAGE_SIZE; >> + >> + /* >> + * We have enough data for the whole tcp record. Let's try and read the >> + * first 8 bytes to get the xid and the call direction. We can use this >> + * to figure out if this is a call or a reply to a callback. If >> + * sk_reclen is < 8 (xid and calldir), then this is a malformed packet. >> + * In that case, don't bother with the calldir and just read the data. >> + * It will be rejected in svc_process. >> + */ >> + if (len >= 8) { >> + len = svc_process_calldir(svsk, rqstp, &req, vec); >> + if (len < 0) >> + goto err_again; >> + vlen -= 8; >> + } >> + >> pnum = 1; >> while (vlen < len) { >> - vec[pnum].iov_base = page_address(rqstp->rq_pages[pnum]); >> + vec[pnum].iov_base = (req) ? >> + page_address(req->rq_private_buf.pages[pnum - 1]) : >> + page_address(rqstp->rq_pages[pnum]); >> vec[pnum].iov_len = PAGE_SIZE; >> pnum++; >> vlen += PAGE_SIZE; >> @@ -931,8 +1002,18 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp) >> /* Now receive data */ >> len = svc_recvfrom(rqstp, vec, pnum, len); >> if (len < 0) >> - goto error; >> + goto err_again; > > This seems to belong to the previous patch > as well as the last hunk in this file (@@ -957,21 +1039,19 @@). Thanks for catching this. I'll move it to the previous patch and resubmit both. - ricardo > Benny > >> + >> + /* >> + * Account for the 8 bytes we read earlier >> + */ >> + len += 8; >> >> + if (req) { >> + xprt_complete_rqst(req->rq_task, len); >> + len = 0; >> + goto out; >> + } >> dprintk("svc: TCP complete record (%d bytes)\n", len); >> rqstp->rq_arg.len = len; >> rqstp->rq_arg.page_base = 0; >> @@ -946,6 +1027,7 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp) >> rqstp->rq_xprt_ctxt = NULL; >> rqstp->rq_prot = IPPROTO_TCP; >> >> +out: >> /* Reset TCP read info */ >> svsk->sk_reclen = 0; >> svsk->sk_tcplen = 0; >> @@ -957,21 +1039,19 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp) >> >> return len; >> >> - err_delete: >> - set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); >> - return -EAGAIN; >> - >> - error: >> +err_again: >> if (len == -EAGAIN) { >> dprintk("RPC: TCP recvfrom got EAGAIN\n"); >> svc_xprt_received(&svsk->sk_xprt); >> - } else { >> + return len; >> + } >> +error: >> + if (len != -EAGAIN) { >> printk(KERN_NOTICE "%s: recvfrom returned errno %d\n", >> svsk->sk_xprt.xpt_server->sv_name, -len); >> - goto err_delete; >> + set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); >> } >> - >> - return len; >> + return -EAGAIN; >> } >> >> /* >> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c >> index a0bfe53..03f175e 100644 >> --- a/net/sunrpc/xprt.c >> +++ b/net/sunrpc/xprt.c >> @@ -1015,6 +1015,27 @@ void xprt_release(struct rpc_task *task) >> spin_unlock(&xprt->reserve_lock); >> } >> >> +/* >> + * The autoclose function for the back channel >> + * >> + * The callback channel should never close the channel, >> + * let the forechannel do that. >> + */ >> +static void bc_autoclose(struct work_struct *work) >> +{ >> + return; >> +} >> + >> + >> +/* >> + * The autodisconnect routine for the back channel. We never disconnect >> + */ >> +static void >> +bc_init_autodisconnect(unsigned long data) >> +{ >> + return; >> +} >> + >> /** >> * xprt_create_transport - create an RPC transport >> * @args: rpc transport creation arguments >> @@ -1051,9 +1072,16 @@ found: >> >> INIT_LIST_HEAD(&xprt->free); >> INIT_LIST_HEAD(&xprt->recv); >> - INIT_WORK(&xprt->task_cleanup, xprt_autoclose); >> - setup_timer(&xprt->timer, xprt_init_autodisconnect, >> - (unsigned long)xprt); >> + if (args->bc_sock) { >> + INIT_WORK(&xprt->task_cleanup, bc_autoclose); >> + setup_timer(&xprt->timer, bc_init_autodisconnect, >> + (unsigned long)xprt); >> + } else { >> + INIT_WORK(&xprt->task_cleanup, xprt_autoclose); >> + setup_timer(&xprt->timer, xprt_init_autodisconnect, >> + (unsigned long)xprt); >> + } >> + >> xprt->last_used = jiffies; >> xprt->cwnd = RPC_INITCWND; >> xprt->bind_index = 0; >> @@ -1073,6 +1101,13 @@ found: >> dprintk("RPC: created transport %p with %u slots\n", xprt, >> xprt->max_reqs); >> >> + /* >> + * Since we don't want connections for the backchannel, we set >> + * the xprt status to connected >> + */ >> + if (args->bc_sock) >> + xprt_set_connected(xprt); >> + >> return xprt; >> } >> >> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c >> index d40ff50..067d205 100644 >> --- a/net/sunrpc/xprtsock.c >> +++ b/net/sunrpc/xprtsock.c >> @@ -32,6 +32,7 @@ >> #include >> #include >> #include >> +#include >> #include >> #include >> >> @@ -1966,6 +1967,219 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, >> struct seq_file *seq) >> xprt->stat.bklog_u); >> } >> >> +/* >> + * The connect worker for the backchannel >> + * This should never be called as we should never need to connect >> + */ >> +static void bc_connect_worker(struct work_struct *work) >> +{ >> + BUG(); >> +} >> + >> +/* >> + * The set_port routine of the rpc_xprt_ops. This is related to the >> portmapper >> + * and should never be called >> + */ >> + >> +static void bc_set_port(struct rpc_xprt *xprt, unsigned short port) >> +{ >> + BUG(); >> +} >> + >> +/* >> + * The connect routine for the backchannel rpc_xprt ops >> + * Again, should never be called! >> + */ >> + >> +static void bc_connect(struct rpc_task *task) >> +{ >> + BUG(); >> +} >> + >> +struct rpc_buffer { >> + size_t len; >> + char data[]; >> +}; >> +/* >> + * Allocate a bunch of pages for a scratch buffer for the rpc code. The >> reason >> + * we allocate pages instead doing a kmalloc like rpc_malloc is because we >> want >> + * to use the server side send routines. >> + */ >> +void *bc_malloc(struct rpc_task *task, size_t size) >> +{ >> + struct page *page; >> + struct rpc_buffer *buf; >> + >> + BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer)); >> + page = alloc_page(GFP_KERNEL); >> + >> + if (!page) >> + return NULL; >> + >> + buf = page_address(page); >> + buf->len = PAGE_SIZE; >> + >> + return buf->data; >> +} >> + >> +/* >> + * Free the space allocated in the bc_alloc routine >> + */ >> +void bc_free(void *buffer) >> +{ >> + struct rpc_buffer *buf; >> + >> + if (!buffer) >> + return; >> + >> + buf = container_of(buffer, struct rpc_buffer, data); >> + free_pages((unsigned long)buf, get_order(buf->len)); >> +} >> + >> +/* >> + * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex >> + * held. Borrows heavily from svc_tcp_sendto and xs_tcp_semd_request. >> + */ >> +static int bc_sendto(struct rpc_rqst *req) >> +{ >> + int total_len; >> + int len; >> + int size; >> + int result; >> + struct xdr_buf *xbufp = &req->rq_snd_buf; >> + struct page **pages = xbufp->pages; >> + unsigned int flags = MSG_MORE; >> + unsigned int pglen = xbufp->page_len; >> + size_t base = xbufp->page_base; >> + struct rpc_xprt *xprt = req->rq_xprt; >> + struct sock_xprt *transport = >> + container_of(xprt, struct sock_xprt, xprt); >> + struct socket *sock = transport->sock; >> + >> + total_len = xbufp->len; >> + >> + /* >> + * Set up the rpc header and record marker stuff >> + */ >> + xs_encode_tcp_record_marker(xbufp); >> + >> + /* >> + * The RPC message is divided into 3 pieces: >> + * - The header: This is what most of the smaller RPC messages consist >> + * of. Often the whole message is in this. >> + * >> + * - xdr->pages: This is a list of pages that contain data, for >> + * example in a write request or while using rpcsec gss >> + * >> + * - The tail: This is the rest of the rpc message >> + * >> + * First we send the header, then the pages and then finally the tail. >> + * The code borrows heavily from svc_sendto. >> + */ >> + >> + /* >> + * Send the head >> + */ >> + if (total_len == xbufp->head[0].iov_len) >> + flags = 0; >> + >> + len = sock->ops->sendpage(sock, virt_to_page(xbufp->head[0].iov_base), >> + (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK, >> + xbufp->head[0].iov_len, flags); >> + >> + if (len != xbufp->head[0].iov_len) >> + goto out; >> + >> + /* >> + * send page data >> + * >> + * Check the amount of data to be sent. If it is less than the >> + * remaining page, then send it else send the current page >> + */ >> + >> + size = PAGE_SIZE - base < pglen ? PAGE_SIZE - base : pglen; >> + while (pglen > 0) { >> + if (total_len == size) >> + flags = 0; >> + result = sock->ops->sendpage(sock, *pages, base, size, flags); >> + if (result > 0) >> + len += result; >> + if (result != size) >> + goto out; >> + total_len -= size; >> + pglen -= size; >> + size = PAGE_SIZE < pglen ? PAGE_SIZE : pglen; >> + base = 0; >> + pages++; >> + } >> + /* >> + * send tail >> + */ >> + if (xbufp->tail[0].iov_len) { >> + result = sock->ops->sendpage(sock, >> + xbufp->tail[0].iov_base, >> + (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK, >> + xbufp->tail[0].iov_len, >> + 0); >> + >> + if (result > 0) >> + len += result; >> + } >> +out: >> + if (len != xbufp->len) >> + printk(KERN_NOTICE "Error sending entire callback!\n"); >> + >> + return len; >> +} >> + >> +/* >> + * The send routine. Borrows from svc_send >> + */ >> +static int bc_send_request(struct rpc_task *task) >> +{ >> + struct rpc_rqst *req = task->tk_rqstp; >> + struct rpc_xprt *bc_xprt = req->rq_xprt; >> + struct svc_xprt *xprt; >> + struct svc_sock *svsk; >> + u32 len; >> + >> + dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); >> + /* >> + * Get the server socket associated with this callback xprt >> + */ >> + svsk = bc_xprt->bc_sock; >> + xprt = &svsk->sk_xprt; >> + >> + mutex_lock(&xprt->xpt_mutex); >> + if (test_bit(XPT_DEAD, &xprt->xpt_flags)) >> + len = -ENOTCONN; >> + else >> + len = bc_sendto(req); >> + mutex_unlock(&xprt->xpt_mutex); >> + >> + return 0; >> + >> +} >> + >> +/* >> + * The close routine. Since this is client initiated, we do nothing >> + */ >> + >> +static void bc_close(struct rpc_xprt *xprt) >> +{ >> + return; >> +} >> + >> +/* >> + * The xprt destroy routine. Again, because this connection is client >> + * initiated, we do nothing >> + */ >> + >> +static void bc_destroy(struct rpc_xprt *xprt) >> +{ >> + return; >> +} >> + >> static struct rpc_xprt_ops xs_udp_ops = { >> .set_buffer_size = xs_udp_set_buffer_size, >> .reserve_xprt = xprt_reserve_xprt_cong, >> @@ -1999,6 +2213,24 @@ static struct rpc_xprt_ops xs_tcp_ops = { >> .print_stats = xs_tcp_print_stats, >> }; >> >> +/* >> + * The rpc_xprt_ops for the server backchannel >> + */ >> + >> +static struct rpc_xprt_ops bc_tcp_ops = { >> + .reserve_xprt = xprt_reserve_xprt, >> + .release_xprt = xprt_release_xprt, >> + .set_port = bc_set_port, >> + .connect = bc_connect, >> + .buf_alloc = bc_malloc, >> + .buf_free = bc_free, >> + .send_request = bc_send_request, >> + .set_retrans_timeout = xprt_set_retrans_timeout_def, >> + .close = bc_close, >> + .destroy = bc_destroy, >> + .print_stats = xs_tcp_print_stats, >> +}; >> + >> static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, >> unsigned int slot_table_size) >> { >> @@ -2131,13 +2363,29 @@ static struct rpc_xprt *xs_setup_tcp(struct >> xprt_create *args) >> xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); >> xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; >> >> - xprt->bind_timeout = XS_BIND_TO; >> - xprt->connect_timeout = XS_TCP_CONN_TO; >> - xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; >> - xprt->idle_timeout = XS_IDLE_DISC_TO; >> + if (args->bc_sock) { >> + /* backchannel */ >> + xprt_set_bound(xprt); >> + INIT_DELAYED_WORK(&transport->connect_worker, >> + bc_connect_worker); >> + xprt->bind_timeout = 0; >> + xprt->connect_timeout = 0; >> + xprt->reestablish_timeout = 0; >> + xprt->idle_timeout = (~0); >> >> - xprt->ops = &xs_tcp_ops; >> - xprt->timeout = &xs_tcp_default_timeout; >> + /* >> + * The backchannel uses the same socket connection as the >> + * forechannel >> + */ >> + xprt->bc_sock = args->bc_sock; >> + xprt->bc_sock->sk_bc_xprt = xprt; >> + transport->sock = xprt->bc_sock->sk_sock; >> + transport->inet = xprt->bc_sock->sk_sk; >> + >> + xprt->ops = &bc_tcp_ops; >> + >> + goto next; >> + } >> >> switch (addr->sa_family) { >> case AF_INET: >> @@ -2145,13 +2393,29 @@ static struct rpc_xprt *xs_setup_tcp(struct >> xprt_create *args) >> xprt_set_bound(xprt); >> >> INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4); >> - xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); >> break; >> case AF_INET6: >> if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) >> xprt_set_bound(xprt); >> >> INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6); >> + break; >> + } >> + xprt->bind_timeout = XS_BIND_TO; >> + xprt->connect_timeout = XS_TCP_CONN_TO; >> + xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; >> + xprt->idle_timeout = XS_IDLE_DISC_TO; >> + >> + xprt->ops = &xs_tcp_ops; >> + >> +next: >> + xprt->timeout = &xs_tcp_default_timeout; >> + >> + switch (addr->sa_family) { >> + case AF_INET: >> + xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); >> + break; >> + case AF_INET6: >> xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); >> break; >> default: