2008-06-13 22:50:44

by J.Bruce Fields

[permalink] [raw]
Subject: [RFC] new client gssd upcall

The client needs a new more estensible text-based upcall to gssd, to
make it easier to support some features that are needed for new krb5
enctypes, for callbacks, etc.

We will have to continue to support the old upcall for backwards
compatibility with older gssd's. To simplify upgrades, as well as
testing and debugging, it would help if we can upgrade gssd without
having to choose at boot (or module-load) time whether we want the new
or the old upcall.

That means that when gssd opens an rpc upcall pipe, we'd like to just
start using whichever version it chose immediately--ideally without
having to wait the 30 seconds that it would normally take for upcalls
already queued on the wrong upcall pipe to time out.

The following patches do that by queueing the upcalls on whichever of
the two upcall pipes (the new one and the old one) that somebody holds
open. If nobody's opened anything yet, then upcalls are provisionally
queued on the new pipe. I add a pipe_open method to allow the gss code
to keep track of which pipe is open, to prevent anyone from attempting
to open both pipes at once, and to cancel any upcalls that got queued to
the wrong pipe.

I did some minimal testing with the old upcall, but haven't tested the
new upcall yet. (Olga, could you do that?) So this is just an RFC at
this point.

--b.


2008-06-17 20:52:01

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 2/5] rpc: Use separate spinlock for cred locking in auth_gss.c

On Sat, Jun 14, 2008 at 02:16:27PM -0400, Trond Myklebust wrote:
> On Sat, 2008-06-14 at 13:45 -0400, J. Bruce Fields wrote:
> > > NACK. I deliberately ripped out the old struct gss_auth spinlock in
> > > order to allow multiple gss_auth per inode (I believe Kevin was asking
> > > for this).
> >
> > OK, makes sense. So what will be the scope of a cred lookup--an rpc
> > client?
>
> That should normally be the case, but why do you need to change the
> locking here in the first place? Is there ever going to be a case where
> the same rpc_cred has upcalls on several different pipes? I can't see
> how that could be justified.

If you allow changing the upcall version over the life of the kernel,
then it's difficult to avoid. You can insist that noone have both the
new and old version opened simultaneously, for example, but it's harder
to know when there are no longer messages sitting around that have been
unhashed but are still lying around waiting for a process to wake up and
examine their results.

--b.

2008-06-17 21:35:40

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 2/5] rpc: Use separate spinlock for cred locking in auth_gss.c

On Tue, 2008-06-17 at 16:51 -0400, J. Bruce Fields wrote:
> On Sat, Jun 14, 2008 at 02:16:27PM -0400, Trond Myklebust wrote:
> > On Sat, 2008-06-14 at 13:45 -0400, J. Bruce Fields wrote:
> > > > NACK. I deliberately ripped out the old struct gss_auth spinlock in
> > > > order to allow multiple gss_auth per inode (I believe Kevin was asking
> > > > for this).
> > >
> > > OK, makes sense. So what will be the scope of a cred lookup--an rpc
> > > client?
> >
> > That should normally be the case, but why do you need to change the
> > locking here in the first place? Is there ever going to be a case where
> > the same rpc_cred has upcalls on several different pipes? I can't see
> > how that could be justified.
>
> If you allow changing the upcall version over the life of the kernel,
> then it's difficult to avoid. You can insist that noone have both the
> new and old version opened simultaneously, for example, but it's harder
> to know when there are no longer messages sitting around that have been
> unhashed but are still lying around waiting for a process to wake up and
> examine their results.

AFAIK, there are two cases when the dying rpc.gssd closes the pipe:

1. gss_cred->gc_upcall is set. In this case, the gss_cred has a
full reference to the gss_msg, so it has no trouble locating the
message and figuring out if it needs to resend (rpc_purge_list()
will ensure that the error field is set to EAGAIN) or if the
call completed before gssd died. If you are worried about the
fact that gss_upcall_callback uses gss_msg->auth to access the
inode in order to do the locking, then just add a pointer to the
inode to gss_msg: it is not as if a gss_msg can migrate from one
upcall queue to another.
2. gss_cred->gc_upcall is not set. In this case the call to
rpc_purge_list() in rpc_pipe_release() will ensure that the
message gets immediately released.

In other words, I can't see that keeping the current behaviour will
cause you to have more than one upcall at a time even if you do restart
rpc.gssd.

--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-06-17 22:06:53

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 2/5] rpc: Use separate spinlock for cred locking in auth_gss.c

On Tue, Jun 17, 2008 at 05:34:47PM -0400, Trond Myklebust wrote:
> On Tue, 2008-06-17 at 16:51 -0400, J. Bruce Fields wrote:
> > On Sat, Jun 14, 2008 at 02:16:27PM -0400, Trond Myklebust wrote:
> > > On Sat, 2008-06-14 at 13:45 -0400, J. Bruce Fields wrote:
> > > > > NACK. I deliberately ripped out the old struct gss_auth spinlock in
> > > > > order to allow multiple gss_auth per inode (I believe Kevin was asking
> > > > > for this).
> > > >
> > > > OK, makes sense. So what will be the scope of a cred lookup--an rpc
> > > > client?
> > >
> > > That should normally be the case, but why do you need to change the
> > > locking here in the first place? Is there ever going to be a case where
> > > the same rpc_cred has upcalls on several different pipes? I can't see
> > > how that could be justified.
> >
> > If you allow changing the upcall version over the life of the kernel,
> > then it's difficult to avoid. You can insist that noone have both the
> > new and old version opened simultaneously, for example, but it's harder
> > to know when there are no longer messages sitting around that have been
> > unhashed but are still lying around waiting for a process to wake up and
> > examine their results.
>
> AFAIK, there are two cases when the dying rpc.gssd closes the pipe:
>
> 1. gss_cred->gc_upcall is set. In this case, the gss_cred has a
> full reference to the gss_msg, so it has no trouble locating the
> message and figuring out if it needs to resend (rpc_purge_list()
> will ensure that the error field is set to EAGAIN) or if the
> call completed before gssd died. If you are worried about the
> fact that gss_upcall_callback uses gss_msg->auth to access the
> inode in order to do the locking, then just add a pointer to the
> inode to gss_msg: it is not as if a gss_msg can migrate from one
> upcall queue to another.
> 2. gss_cred->gc_upcall is not set. In this case the call to
> rpc_purge_list() in rpc_pipe_release() will ensure that the
> message gets immediately released.
>
> In other words, I can't see that keeping the current behaviour will
> cause you to have more than one upcall at a time even if you do restart
> rpc.gssd.

Yeah, I think that's more or less exactly the sort of case I was worried
about--so let me see if that does the job. Thanks!--b.

2008-06-13 22:50:44

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 3/5] rpc: move in_downcall list to gss code

This list is only used by the auth_gss code anyway.

Signed-off-by: J. Bruce Fields <[email protected]>
---
include/linux/sunrpc/rpc_pipe_fs.h | 1 -
net/sunrpc/auth_gss/auth_gss.c | 33 +++++++++++++++++++--------------
net/sunrpc/rpc_pipe.c | 1 -
3 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/include/linux/sunrpc/rpc_pipe_fs.h b/include/linux/sunrpc/rpc_pipe_fs.h
index 51b977a..867bea8 100644
--- a/include/linux/sunrpc/rpc_pipe_fs.h
+++ b/include/linux/sunrpc/rpc_pipe_fs.h
@@ -23,7 +23,6 @@ struct rpc_inode {
void *private;
struct list_head pipe;
struct list_head in_upcall;
- struct list_head in_downcall;
int pipelen;
int nreaders;
int nwriters;
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 28ca1c9..5e82a69 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -83,6 +83,7 @@ struct gss_auth {
struct kref kref;
struct rpc_auth rpc_auth;
spinlock_t lock;
+ struct list_head in_downcall;
struct gss_api_mech *mech;
enum rpc_gss_svc service;
struct rpc_clnt *client;
@@ -259,10 +260,11 @@ gss_release_msg(struct gss_upcall_msg *gss_msg)
}

static struct gss_upcall_msg *
-__gss_find_upcall(struct rpc_inode *rpci, uid_t uid)
+__gss_find_upcall(struct gss_auth *gss_auth, uid_t uid)
{
struct gss_upcall_msg *pos;
- list_for_each_entry(pos, &rpci->in_downcall, list) {
+
+ list_for_each_entry(pos, &gss_auth->in_downcall, list) {
if (pos->uid != uid)
continue;
atomic_inc(&pos->count);
@@ -280,15 +282,13 @@ __gss_find_upcall(struct rpc_inode *rpci, uid_t uid)
static inline struct gss_upcall_msg *
gss_add_msg(struct gss_auth *gss_auth, struct gss_upcall_msg *gss_msg)
{
- struct inode *inode = gss_auth->dentry->d_inode;
- struct rpc_inode *rpci = RPC_I(inode);
struct gss_upcall_msg *old;

spin_lock(&gss_auth->lock);
- old = __gss_find_upcall(rpci, gss_msg->uid);
+ old = __gss_find_upcall(gss_auth, gss_msg->uid);
if (old == NULL) {
atomic_inc(&gss_msg->count);
- list_add(&gss_msg->list, &rpci->in_downcall);
+ list_add(&gss_msg->list, &gss_auth->in_downcall);
} else
gss_msg = old;
spin_unlock(&gss_auth->lock);
@@ -484,6 +484,14 @@ gss_pipe_upcall(struct file *filp, struct rpc_pipe_msg *msg,
return mlen;
}

+static struct gss_auth *gss_auth_from_dentry(struct dentry *dentry)
+{
+ struct inode *inode = dentry->d_inode;
+ struct rpc_clnt *clnt = RPC_I(inode)->private;
+
+ return container_of(clnt->cl_auth, struct gss_auth, rpc_auth);
+}
+
#define MSG_BUF_MAXSIZE 1024

static ssize_t
@@ -492,10 +500,7 @@ gss_pipe_downcall(struct file *filp, const char __user *src, size_t mlen)
const void *p, *end;
void *buf;
struct gss_upcall_msg *gss_msg;
- struct inode *inode = filp->f_path.dentry->d_inode;
- struct rpc_clnt *clnt = RPC_I(inode)->private;
- struct gss_auth *gss_auth = container_of(clnt->cl_auth,
- struct gss_auth, rpc_auth);
+ struct gss_auth *gss_auth = gss_auth_from_dentry(filp->f_path.dentry);
struct gss_cl_ctx *ctx;
uid_t uid;
ssize_t err = -EFBIG;
@@ -507,7 +512,6 @@ gss_pipe_downcall(struct file *filp, const char __user *src, size_t mlen)
if (!buf)
goto out;

- clnt = RPC_I(inode)->private;
err = -EFAULT;
if (copy_from_user(buf, src, mlen))
goto err;
@@ -527,7 +531,7 @@ gss_pipe_downcall(struct file *filp, const char __user *src, size_t mlen)
err = -ENOENT;
/* Find a matching upcall */
spin_lock(&gss_auth->lock);
- gss_msg = __gss_find_upcall(RPC_I(inode), uid);
+ gss_msg = __gss_find_upcall(gss_auth, uid);
if (gss_msg == NULL) {
spin_unlock(&gss_auth->lock);
goto err_put_ctx;
@@ -568,9 +572,9 @@ gss_pipe_release(struct inode *inode)
struct gss_upcall_msg *gss_msg;

spin_lock(&gss_auth->lock);
- while (!list_empty(&rpci->in_downcall)) {
+ while (!list_empty(&gss_auth->in_downcall)) {

- gss_msg = list_entry(rpci->in_downcall.next,
+ gss_msg = list_entry(gss_auth->in_downcall.next,
struct gss_upcall_msg, list);
gss_msg->msg.errno = -EPIPE;
atomic_inc(&gss_msg->count);
@@ -635,6 +639,7 @@ gss_create(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
goto err_put_mech;
auth = &gss_auth->rpc_auth;
spin_lock_init(&gss_auth->lock);
+ INIT_LIST_HEAD(&gss_auth->in_downcall);
auth->au_cslack = GSS_CRED_SLACK >> 2;
auth->au_rslack = GSS_VERF_SLACK >> 2;
auth->au_ops = &authgss_ops;
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 5a9b0e7..41d968f 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -906,7 +906,6 @@ init_once(struct kmem_cache * cachep, void *foo)
rpci->nreaders = 0;
rpci->nwriters = 0;
INIT_LIST_HEAD(&rpci->in_upcall);
- INIT_LIST_HEAD(&rpci->in_downcall);
INIT_LIST_HEAD(&rpci->pipe);
rpci->pipelen = 0;
init_waitqueue_head(&rpci->waitq);
--
1.5.5.rc1


2008-06-13 22:50:43

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 1/5] rpc: remove unnecessary assignment

We're just about to kfree() gss_auth, so there's no point to setting any
of its fields.

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/auth_gss/auth_gss.c | 1 -
1 files changed, 0 insertions(+), 1 deletions(-)

diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index cc12d5f..b42ec85 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -665,7 +665,6 @@ static void
gss_free(struct gss_auth *gss_auth)
{
rpc_unlink(gss_auth->dentry);
- gss_auth->dentry = NULL;
gss_mech_put(gss_auth->mech);

kfree(gss_auth);
--
1.5.5.rc1


2008-06-13 22:50:44

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 4/5] rpc: add an rpc_pipe_open method

We want to transition to a new gssd upcall which is text-based and more
easily extensible.

To simplify upgrades, as well as testing and debugging, it will help if
we can upgrade gssd (to a version which understands the new upcall)
without having to choose at boot (or module-load) time whether we want
the new or the old upcall.

That means that when gssd opens an rpc upcall pipe, we'd like to just
start using whichever version it chose immediately--ideally without
having to wait the 30 seconds that it would normally take for upcalls
already queued on the wrong upcall pipe to time out.

So we add an rpc_pipe open method, which we will use to let the gssd
code know that a new gssd may have started up.

Signed-off-by: J. Bruce Fields <[email protected]>
---
include/linux/sunrpc/rpc_pipe_fs.h | 1 +
net/sunrpc/rpc_pipe.c | 9 ++++++++-
2 files changed, 9 insertions(+), 1 deletions(-)

diff --git a/include/linux/sunrpc/rpc_pipe_fs.h b/include/linux/sunrpc/rpc_pipe_fs.h
index 867bea8..439bef3 100644
--- a/include/linux/sunrpc/rpc_pipe_fs.h
+++ b/include/linux/sunrpc/rpc_pipe_fs.h
@@ -15,6 +15,7 @@ struct rpc_pipe_ops {
ssize_t (*upcall)(struct file *, struct rpc_pipe_msg *, char __user *, size_t);
ssize_t (*downcall)(struct file *, const char __user *, size_t);
void (*release_pipe)(struct inode *);
+ int (*open_pipe)(struct inode *);
void (*destroy_msg)(struct rpc_pipe_msg *);
};

diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 41d968f..f7781fc 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -169,9 +169,15 @@ static int
rpc_pipe_open(struct inode *inode, struct file *filp)
{
struct rpc_inode *rpci = RPC_I(inode);
- int res = -ENXIO;
+ int res;

mutex_lock(&inode->i_mutex);
+ if (rpci->ops->open_pipe) {
+ res = rpci->ops->open_pipe(inode);
+ if (res)
+ goto out;
+ }
+ res = -ENXIO;
if (rpci->ops != NULL) {
if (filp->f_mode & FMODE_READ)
rpci->nreaders ++;
@@ -179,6 +185,7 @@ rpc_pipe_open(struct inode *inode, struct file *filp)
rpci->nwriters ++;
res = 0;
}
+out:
mutex_unlock(&inode->i_mutex);
return res;
}
--
1.5.5.rc1


2008-06-13 22:50:44

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 2/5] rpc: Use separate spinlock for cred locking in auth_gss.c

The i_lock uses in auth_gss.c are to protect the gc_upcall and gc_ctx
fields of the gss cred; so we can use a separate per-auth spinlock for
those instead of the i_lock. This simplifies the locking in the case
where we have two different pipes (one the current pipe, one the pipe
for a new more flexible upcall), either of which might be used for gss
upcalls.

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/auth_gss/auth_gss.c | 56 +++++++++++++++++++++------------------
1 files changed, 30 insertions(+), 26 deletions(-)

diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index b42ec85..28ca1c9 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -82,6 +82,7 @@ static const struct rpc_credops gss_nullops;
struct gss_auth {
struct kref kref;
struct rpc_auth rpc_auth;
+ spinlock_t lock;
struct gss_api_mech *mech;
enum rpc_gss_svc service;
struct rpc_clnt *client;
@@ -108,7 +109,7 @@ gss_put_ctx(struct gss_cl_ctx *ctx)
/* gss_cred_set_ctx:
* called by gss_upcall_callback and gss_create_upcall in order
* to set the gss context. The actual exchange of an old context
- * and a new one is protected by the inode->i_lock.
+ * and a new one is protected by the gss_auth->lock.
*/
static void
gss_cred_set_ctx(struct rpc_cred *cred, struct gss_cl_ctx *ctx)
@@ -283,14 +284,14 @@ gss_add_msg(struct gss_auth *gss_auth, struct gss_upcall_msg *gss_msg)
struct rpc_inode *rpci = RPC_I(inode);
struct gss_upcall_msg *old;

- spin_lock(&inode->i_lock);
+ spin_lock(&gss_auth->lock);
old = __gss_find_upcall(rpci, gss_msg->uid);
if (old == NULL) {
atomic_inc(&gss_msg->count);
list_add(&gss_msg->list, &rpci->in_downcall);
} else
gss_msg = old;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
return gss_msg;
}

@@ -307,14 +308,13 @@ static void
gss_unhash_msg(struct gss_upcall_msg *gss_msg)
{
struct gss_auth *gss_auth = gss_msg->auth;
- struct inode *inode = gss_auth->dentry->d_inode;

if (list_empty(&gss_msg->list))
return;
- spin_lock(&inode->i_lock);
+ spin_lock(&gss_auth->lock);
if (!list_empty(&gss_msg->list))
__gss_unhash_msg(gss_msg);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
}

static void
@@ -323,16 +323,16 @@ gss_upcall_callback(struct rpc_task *task)
struct gss_cred *gss_cred = container_of(task->tk_msg.rpc_cred,
struct gss_cred, gc_base);
struct gss_upcall_msg *gss_msg = gss_cred->gc_upcall;
- struct inode *inode = gss_msg->auth->dentry->d_inode;
+ struct gss_auth *gss_auth = gss_msg->auth;

- spin_lock(&inode->i_lock);
+ spin_lock(&gss_auth->lock);
if (gss_msg->ctx)
gss_cred_set_ctx(task->tk_msg.rpc_cred, gss_msg->ctx);
else
task->tk_status = gss_msg->msg.errno;
gss_cred->gc_upcall = NULL;
rpc_wake_up_status(&gss_msg->rpc_waitqueue, gss_msg->msg.errno);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
gss_release_msg(gss_msg);
}

@@ -391,7 +391,6 @@ gss_refresh_upcall(struct rpc_task *task)
struct gss_cred *gss_cred = container_of(cred,
struct gss_cred, gc_base);
struct gss_upcall_msg *gss_msg;
- struct inode *inode = gss_auth->dentry->d_inode;
int err = 0;

dprintk("RPC: %5u gss_refresh_upcall for uid %u\n", task->tk_pid,
@@ -401,7 +400,7 @@ gss_refresh_upcall(struct rpc_task *task)
err = PTR_ERR(gss_msg);
goto out;
}
- spin_lock(&inode->i_lock);
+ spin_lock(&gss_auth->lock);
if (gss_cred->gc_upcall != NULL)
rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL);
else if (gss_msg->ctx != NULL) {
@@ -416,7 +415,7 @@ gss_refresh_upcall(struct rpc_task *task)
rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback);
} else
err = gss_msg->msg.errno;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
gss_release_msg(gss_msg);
out:
dprintk("RPC: %5u gss_refresh_upcall for uid %u result %d\n",
@@ -427,7 +426,6 @@ out:
static inline int
gss_create_upcall(struct gss_auth *gss_auth, struct gss_cred *gss_cred)
{
- struct inode *inode = gss_auth->dentry->d_inode;
struct rpc_cred *cred = &gss_cred->gc_base;
struct gss_upcall_msg *gss_msg;
DEFINE_WAIT(wait);
@@ -441,11 +439,11 @@ gss_create_upcall(struct gss_auth *gss_auth, struct gss_cred *gss_cred)
}
for (;;) {
prepare_to_wait(&gss_msg->waitqueue, &wait, TASK_INTERRUPTIBLE);
- spin_lock(&inode->i_lock);
+ spin_lock(&gss_auth->lock);
if (gss_msg->ctx != NULL || gss_msg->msg.errno < 0) {
break;
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
if (signalled()) {
err = -ERESTARTSYS;
goto out_intr;
@@ -456,7 +454,7 @@ gss_create_upcall(struct gss_auth *gss_auth, struct gss_cred *gss_cred)
gss_cred_set_ctx(cred, gss_msg->ctx);
else
err = gss_msg->msg.errno;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
out_intr:
finish_wait(&gss_msg->waitqueue, &wait);
gss_release_msg(gss_msg);
@@ -493,9 +491,11 @@ gss_pipe_downcall(struct file *filp, const char __user *src, size_t mlen)
{
const void *p, *end;
void *buf;
- struct rpc_clnt *clnt;
struct gss_upcall_msg *gss_msg;
struct inode *inode = filp->f_path.dentry->d_inode;
+ struct rpc_clnt *clnt = RPC_I(inode)->private;
+ struct gss_auth *gss_auth = container_of(clnt->cl_auth,
+ struct gss_auth, rpc_auth);
struct gss_cl_ctx *ctx;
uid_t uid;
ssize_t err = -EFBIG;
@@ -526,14 +526,14 @@ gss_pipe_downcall(struct file *filp, const char __user *src, size_t mlen)

err = -ENOENT;
/* Find a matching upcall */
- spin_lock(&inode->i_lock);
+ spin_lock(&gss_auth->lock);
gss_msg = __gss_find_upcall(RPC_I(inode), uid);
if (gss_msg == NULL) {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
goto err_put_ctx;
}
list_del_init(&gss_msg->list);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);

p = gss_fill_context(p, end, ctx, gss_msg->auth->mech);
if (IS_ERR(p)) {
@@ -545,9 +545,9 @@ gss_pipe_downcall(struct file *filp, const char __user *src, size_t mlen)
err = mlen;

err_release_msg:
- spin_lock(&inode->i_lock);
+ spin_lock(&gss_auth->lock);
__gss_unhash_msg(gss_msg);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
gss_release_msg(gss_msg);
err_put_ctx:
gss_put_ctx(ctx);
@@ -562,9 +562,12 @@ static void
gss_pipe_release(struct inode *inode)
{
struct rpc_inode *rpci = RPC_I(inode);
+ struct rpc_clnt *clnt = rpci->private;
+ struct gss_auth *gss_auth = container_of(clnt->cl_auth,
+ struct gss_auth, rpc_auth);
struct gss_upcall_msg *gss_msg;

- spin_lock(&inode->i_lock);
+ spin_lock(&gss_auth->lock);
while (!list_empty(&rpci->in_downcall)) {

gss_msg = list_entry(rpci->in_downcall.next,
@@ -572,11 +575,11 @@ gss_pipe_release(struct inode *inode)
gss_msg->msg.errno = -EPIPE;
atomic_inc(&gss_msg->count);
__gss_unhash_msg(gss_msg);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
gss_release_msg(gss_msg);
- spin_lock(&inode->i_lock);
+ spin_lock(&gss_auth->lock);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&gss_auth->lock);
}

static void
@@ -631,6 +634,7 @@ gss_create(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
if (gss_auth->service == 0)
goto err_put_mech;
auth = &gss_auth->rpc_auth;
+ spin_lock_init(&gss_auth->lock);
auth->au_cslack = GSS_CRED_SLACK >> 2;
auth->au_rslack = GSS_VERF_SLACK >> 2;
auth->au_ops = &authgss_ops;
--
1.5.5.rc1


2008-06-13 22:50:44

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 5/5] rpc: add new gssd upcall pipe

This adds the new text-based gss upcall. We only allow an open from
either the new pipe or the old one, and use the pipe_open method to
enforce this.

If no pipes are open, we provisionally queue messages for the new pipe;
a subsequent open of the old pipe will cause all those messages to be
purged.

Signed-off-by: J. Bruce Fields <[email protected]>
---
include/linux/sunrpc/rpc_pipe_fs.h | 1 +
net/sunrpc/auth_gss/auth_gss.c | 144 ++++++++++++++++++++++++++++++++---
net/sunrpc/rpc_pipe.c | 27 +++++--
3 files changed, 153 insertions(+), 19 deletions(-)

diff --git a/include/linux/sunrpc/rpc_pipe_fs.h b/include/linux/sunrpc/rpc_pipe_fs.h
index 439bef3..4c0f4a0 100644
--- a/include/linux/sunrpc/rpc_pipe_fs.h
+++ b/include/linux/sunrpc/rpc_pipe_fs.h
@@ -42,6 +42,7 @@ RPC_I(struct inode *inode)
}

extern int rpc_queue_upcall(struct inode *, struct rpc_pipe_msg *);
+void rpc_pipe_cancel_all(struct inode *inode, int errno);

extern struct dentry *rpc_mkdir(char *, struct rpc_clnt *);
extern int rpc_rmdir(struct dentry *);
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 5e82a69..a5a56e8 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -87,7 +87,19 @@ struct gss_auth {
struct gss_api_mech *mech;
enum rpc_gss_svc service;
struct rpc_clnt *client;
- struct dentry *dentry;
+ /*
+ * There are two upcall pipes; dentry[1], named "gssd", is used
+ * for the new text-based upcall; dentry[0] is named after the
+ * mechanism ("krb5") and exists for backwards-compatibility
+ * with older gssd's.
+ */
+ struct dentry *dentry[2];
+ /*
+ * The upcall_version tracks which of the above upcalls we're
+ * using, 0 (old) or 1 (new). While userland has neither pipe
+ * open, upcall_version is set to -1:
+ */
+ int upcall_version;
};

static void gss_free_ctx(struct gss_cl_ctx *);
@@ -235,6 +247,7 @@ err:
return p;
}

+#define UPCALL_BUF_LEN 128

struct gss_upcall_msg {
atomic_t count;
@@ -245,6 +258,7 @@ struct gss_upcall_msg {
struct rpc_wait_queue rpc_waitqueue;
wait_queue_head_t waitqueue;
struct gss_cl_ctx *ctx;
+ char databuf[UPCALL_BUF_LEN];
};

static void
@@ -347,14 +361,52 @@ gss_alloc_msg(struct gss_auth *gss_auth, uid_t uid)
rpc_init_wait_queue(&gss_msg->rpc_waitqueue, "RPCSEC_GSS upcall waitq");
init_waitqueue_head(&gss_msg->waitqueue);
atomic_set(&gss_msg->count, 1);
- gss_msg->msg.data = &gss_msg->uid;
- gss_msg->msg.len = sizeof(gss_msg->uid);
gss_msg->uid = uid;
gss_msg->auth = gss_auth;
}
return gss_msg;
}

+static void gss_encode_v0_msg(struct gss_upcall_msg *gss_msg)
+{
+ gss_msg->msg.data = &gss_msg->uid;
+ gss_msg->msg.len = sizeof(gss_msg->uid);
+}
+
+static void gss_encode_v1_msg(struct gss_upcall_msg *gss_msg)
+{
+ gss_msg->msg.len = sprintf(gss_msg->databuf, "mech=%s uid=%d\n",
+ gss_msg->auth->mech->gm_name,
+ gss_msg->uid);
+ gss_msg->msg.data = gss_msg->databuf;
+ BUG_ON(gss_msg->msg.len > UPCALL_BUF_LEN);
+}
+
+static void gss_encode_msg(struct gss_upcall_msg *gss_msg, int vers)
+{
+ if (vers == 0)
+ gss_encode_v0_msg(gss_msg);
+ else
+ gss_encode_v1_msg(gss_msg);
+}
+
+static int gss_queue_upcall(struct gss_auth *gss_auth, struct gss_upcall_msg *gss_msg)
+{
+ int res;
+ int vers;
+
+ spin_lock(&gss_auth->lock);
+ vers = gss_auth->upcall_version;
+ if (vers == -1) {
+ /* Neither pipe opened yet; default to the new one: */
+ vers = 1;
+ }
+ gss_encode_msg(gss_msg, vers);
+ res = rpc_queue_upcall(gss_auth->dentry[vers]->d_inode, &gss_msg->msg);
+ spin_unlock(&gss_auth->lock);
+ return res;
+}
+
static struct gss_upcall_msg *
gss_setup_upcall(struct rpc_clnt *clnt, struct gss_auth *gss_auth, struct rpc_cred *cred)
{
@@ -372,7 +424,7 @@ gss_setup_upcall(struct rpc_clnt *clnt, struct gss_auth *gss_auth, struct rpc_cr
return ERR_PTR(-ENOMEM);
gss_msg = gss_add_msg(gss_auth, gss_new);
if (gss_msg == gss_new) {
- int res = rpc_queue_upcall(gss_auth->dentry->d_inode, &gss_new->msg);
+ int res = gss_queue_upcall(gss_auth, gss_msg);
if (res) {
gss_unhash_msg(gss_new);
gss_msg = ERR_PTR(res);
@@ -562,6 +614,55 @@ out:
return err;
}

+static int gss_pipe_version(struct gss_auth *gss_auth, struct inode *inode)
+{
+ if (gss_auth->dentry[0]->d_inode == inode)
+ return 0;
+ if (gss_auth->dentry[1]->d_inode == inode)
+ return 1;
+ BUG();
+}
+
+static int
+gss_pipe_open(struct inode *inode)
+{
+ struct rpc_inode *rpci = RPC_I(inode);
+ struct rpc_clnt *clnt = rpci->private;
+ struct gss_auth *gss_auth = container_of(clnt->cl_auth,
+ struct gss_auth, rpc_auth);
+ int vers = gss_pipe_version(gss_auth, inode);
+ int ret;
+
+ spin_lock(&gss_auth->lock);
+ if (gss_auth->upcall_version >= 0) {
+ /*
+ * If pipe is already opened, permit only opens of the same
+ * version:
+ */
+ if (vers == gss_auth->upcall_version)
+ ret = 0;
+ else
+ ret = -EBUSY;
+ spin_unlock(&gss_auth->lock);
+ return ret;
+ }
+
+ /*
+ * Neither pipe was opened previously, so this open is the first
+ * hint we've gotten as to which upcall gssd intends to use:
+ */
+ gss_auth->upcall_version = vers;
+ spin_unlock(&gss_auth->lock);
+ if (vers == 0) {
+ /*
+ * We default to trying the new upcall, so may have
+ * already attempted some upcalls there; cancel them:
+ */
+ rpc_pipe_cancel_all(gss_auth->dentry[1]->d_inode, -EAGAIN);
+ }
+ return 0;
+}
+
static void
gss_pipe_release(struct inode *inode)
{
@@ -572,6 +673,8 @@ gss_pipe_release(struct inode *inode)
struct gss_upcall_msg *gss_msg;

spin_lock(&gss_auth->lock);
+ if (rpci->nreaders == 0)
+ gss_auth->upcall_version = -1;
while (!list_empty(&gss_auth->in_downcall)) {

gss_msg = list_entry(gss_auth->in_downcall.next,
@@ -646,21 +749,34 @@ gss_create(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
auth->au_flavor = flavor;
atomic_set(&auth->au_count, 1);
kref_init(&gss_auth->kref);
-
- gss_auth->dentry = rpc_mkpipe(clnt->cl_dentry, gss_auth->mech->gm_name,
- clnt, &gss_upcall_ops, RPC_PIPE_WAIT_FOR_OPEN);
- if (IS_ERR(gss_auth->dentry)) {
- err = PTR_ERR(gss_auth->dentry);
+ gss_auth->upcall_version = -1;
+
+ gss_auth->dentry[0] = rpc_mkpipe(clnt->cl_dentry,
+ gss_auth->mech->gm_name,
+ clnt, &gss_upcall_ops,
+ RPC_PIPE_WAIT_FOR_OPEN);
+ if (IS_ERR(gss_auth->dentry[0])) {
+ err = PTR_ERR(gss_auth->dentry[0]);
goto err_put_mech;
}
+ gss_auth->dentry[1] = rpc_mkpipe(clnt->cl_dentry,
+ "gssd",
+ clnt, &gss_upcall_ops,
+ RPC_PIPE_WAIT_FOR_OPEN);
+ if (IS_ERR(gss_auth->dentry[1])) {
+ err = PTR_ERR(gss_auth->dentry[1]);
+ goto err_unlink_pipe_0;
+ }

err = rpcauth_init_credcache(auth);
if (err)
- goto err_unlink_pipe;
+ goto err_unlink_pipe_1;

return auth;
-err_unlink_pipe:
- rpc_unlink(gss_auth->dentry);
+err_unlink_pipe_1:
+ rpc_unlink(gss_auth->dentry[1]);
+err_unlink_pipe_0:
+ rpc_unlink(gss_auth->dentry[0]);
err_put_mech:
gss_mech_put(gss_auth->mech);
err_free:
@@ -673,7 +789,8 @@ out_dec:
static void
gss_free(struct gss_auth *gss_auth)
{
- rpc_unlink(gss_auth->dentry);
+ rpc_unlink(gss_auth->dentry[1]);
+ rpc_unlink(gss_auth->dentry[0]);
gss_mech_put(gss_auth->mech);

kfree(gss_auth);
@@ -1362,6 +1479,7 @@ static struct rpc_pipe_ops gss_upcall_ops = {
.downcall = gss_pipe_downcall,
.destroy_msg = gss_pipe_destroy_msg,
.release_pipe = gss_pipe_release,
+ .open_pipe = gss_pipe_open,
};

/*
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index f7781fc..bbefc3a 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -53,13 +53,18 @@ static void rpc_purge_list(struct rpc_inode *rpci, struct list_head *head,
wake_up(&rpci->waitq);
}

-static void
-rpc_timeout_upcall_queue(struct work_struct *work)
+/**
+ * rpc_pipe_cancel_all
+ * @inode: inode of upcall pipe to cancel all upcalls on
+ * @errno: errno (as a negative integer) to return on all pending upcalls
+ *
+ * Call with an inode created by rpc_mkpipe() to remove all queued
+ * upcalls. Allowed only on pipes not yet opened for read.
+ */
+void rpc_pipe_cancel_all(struct inode *inode, int errno)
{
LIST_HEAD(free_list);
- struct rpc_inode *rpci =
- container_of(work, struct rpc_inode, queue_timeout.work);
- struct inode *inode = &rpci->vfs_inode;
+ struct rpc_inode *rpci = RPC_I(inode);
void (*destroy_msg)(struct rpc_pipe_msg *);

spin_lock(&inode->i_lock);
@@ -73,7 +78,17 @@ rpc_timeout_upcall_queue(struct work_struct *work)
rpci->pipelen = 0;
}
spin_unlock(&inode->i_lock);
- rpc_purge_list(rpci, &free_list, destroy_msg, -ETIMEDOUT);
+ rpc_purge_list(rpci, &free_list, destroy_msg, errno);
+}
+
+static void
+rpc_timeout_upcall_queue(struct work_struct *work)
+{
+ struct rpc_inode *rpci =
+ container_of(work, struct rpc_inode, queue_timeout.work);
+ struct inode *inode = &rpci->vfs_inode;
+
+ rpc_pipe_cancel_all(inode, -ETIMEDOUT);
}

/**
--
1.5.5.rc1


2008-06-14 15:58:57

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 2/5] rpc: Use separate spinlock for cred locking in auth_gss.c

On Fri, 2008-06-13 at 18:50 -0400, J. Bruce Fields wrote:
> The i_lock uses in auth_gss.c are to protect the gc_upcall and gc_ctx
> fields of the gss cred; so we can use a separate per-auth spinlock for
> those instead of the i_lock. This simplifies the locking in the case
> where we have two different pipes (one the current pipe, one the pipe
> for a new more flexible upcall), either of which might be used for gss
> upcalls.

NACK. I deliberately ripped out the old struct gss_auth spinlock in
order to allow multiple gss_auth per inode (I believe Kevin was asking
for this).

Trond

--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-06-14 16:01:35

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 3/5] rpc: move in_downcall list to gss code

On Fri, 2008-06-13 at 18:50 -0400, J. Bruce Fields wrote:
> This list is only used by the auth_gss code anyway.
>
> Signed-off-by: J. Bruce Fields <[email protected]>

NACK. This too reintroduces a forced one-to-one relationship between the
inode and the gss_auth.

--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-06-14 16:07:20

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 5/5] rpc: add new gssd upcall pipe

On Fri, 2008-06-13 at 18:50 -0400, J. Bruce Fields wrote:
> This adds the new text-based gss upcall. We only allow an open from
> either the new pipe or the old one, and use the pipe_open method to
> enforce this.
>
> If no pipes are open, we provisionally queue messages for the new pipe;
> a subsequent open of the old pipe will cause all those messages to be
> purged.
>
> Signed-off-by: J. Bruce Fields <[email protected]>
> ---

I'm not happy about allowing the gss_auth layer to control timeouts and
cancel upcalls either: that caused us way too much grief and bugs in
earlier revisions of rpc_pipefs.

Why not rather disallow queueing (i.e. put the task to sleep, and maybe
print out a reminder on the console every minute or so) until you are
notified by the rpc_pipefs layer that someone has opened the file for
reading?

--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-06-14 17:36:13

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 5/5] rpc: add new gssd upcall pipe

On Sat, Jun 14, 2008 at 12:07:18PM -0400, Trond Myklebust wrote:
> On Fri, 2008-06-13 at 18:50 -0400, J. Bruce Fields wrote:
> > This adds the new text-based gss upcall. We only allow an open from
> > either the new pipe or the old one, and use the pipe_open method to
> > enforce this.
> >
> > If no pipes are open, we provisionally queue messages for the new pipe;
> > a subsequent open of the old pipe will cause all those messages to be
> > purged.
> >
> > Signed-off-by: J. Bruce Fields <[email protected]>
> > ---
>
> I'm not happy about allowing the gss_auth layer to control timeouts and
> cancel upcalls either: that caused us way too much grief and bugs in
> earlier revisions of rpc_pipefs.

Understood. In this case the only cancel occurs before any open has
come--it seems pretty simple. But perhaps I'm missing something--I'll
think about it some more.

> Why not rather disallow queueing (i.e. put the task to sleep, and maybe
> print out a reminder on the console every minute or so) until you are
> notified by the rpc_pipefs layer that someone has opened the file for
> reading?

I have another version of these patches that did that (in fact, I just
turned off the RPC_PIPE_WAIT_FOR_OPEN stuff and relied entirely on
timeouts in gss code). I'll take another look at that.

--b.

2008-06-14 17:45:30

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 2/5] rpc: Use separate spinlock for cred locking in auth_gss.c

On Sat, Jun 14, 2008 at 11:58:55AM -0400, Trond Myklebust wrote:
> On Fri, 2008-06-13 at 18:50 -0400, J. Bruce Fields wrote:
> > The i_lock uses in auth_gss.c are to protect the gc_upcall and gc_ctx
> > fields of the gss cred; so we can use a separate per-auth spinlock for
> > those instead of the i_lock. This simplifies the locking in the case
> > where we have two different pipes (one the current pipe, one the pipe
> > for a new more flexible upcall), either of which might be used for gss
> > upcalls.
>
> NACK. I deliberately ripped out the old struct gss_auth spinlock in
> order to allow multiple gss_auth per inode (I believe Kevin was asking
> for this).

OK, makes sense. So what will be the scope of a cred lookup--an rpc
client?

I wish I had a better picture of what the data structures here will
eventually look like.

--b.

2008-06-14 18:40:44

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 2/5] rpc: Use separate spinlock for cred locking in auth_gss.c

On Sat, 2008-06-14 at 13:45 -0400, J. Bruce Fields wrote:
> > NACK. I deliberately ripped out the old struct gss_auth spinlock in
> > order to allow multiple gss_auth per inode (I believe Kevin was asking
> > for this).
>
> OK, makes sense. So what will be the scope of a cred lookup--an rpc
> client?

That should normally be the case, but why do you need to change the
locking here in the first place? Is there ever going to be a case where
the same rpc_cred has upcalls on several different pipes? I can't see
how that could be justified.

--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-06-16 14:29:33

by Jeff Layton

[permalink] [raw]
Subject: Re: [RFC] new client gssd upcall

On Fri, 13 Jun 2008 18:50:37 -0400
"J. Bruce Fields" <[email protected]> wrote:

> The client needs a new more estensible text-based upcall to gssd, to
> make it easier to support some features that are needed for new krb5
> enctypes, for callbacks, etc.
>
> We will have to continue to support the old upcall for backwards
> compatibility with older gssd's. To simplify upgrades, as well as
> testing and debugging, it would help if we can upgrade gssd without
> having to choose at boot (or module-load) time whether we want the new
> or the old upcall.
>
> That means that when gssd opens an rpc upcall pipe, we'd like to just
> start using whichever version it chose immediately--ideally without
> having to wait the 30 seconds that it would normally take for upcalls
> already queued on the wrong upcall pipe to time out.
>
> The following patches do that by queueing the upcalls on whichever of
> the two upcall pipes (the new one and the old one) that somebody holds
> open. If nobody's opened anything yet, then upcalls are provisionally
> queued on the new pipe. I add a pipe_open method to allow the gss code
> to keep track of which pipe is open, to prevent anyone from attempting
> to open both pipes at once, and to cancel any upcalls that got queued to
> the wrong pipe.
>
> I did some minimal testing with the old upcall, but haven't tested the
> new upcall yet. (Olga, could you do that?) So this is just an RFC at
> this point.
>
> --b.

Has any thought been given to moving all of the rpc_pipefs upcalls to use
the keyctl API that David Howells did? It seems like that would be better
suited to this sort of application than rpc_pipefs...

--
Jeff Layton <[email protected]>

2008-11-10 20:22:17

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 5/9] rpc: call release_pipe only on last close

On Mon, 2008-11-10 at 15:17 -0500, J. Bruce Fields wrote:
> On Mon, Nov 10, 2008 at 03:11:38PM -0500, Trond Myklebust wrote:
> > On Mon, 2008-11-10 at 15:07 -0500, J. Bruce Fields wrote:
> >
> > > Again, I'm dealing with that case by calling release_pipe() from
> > > rpc_close_pipes(), just as the current code does--the only change being
> > > to do that only when there are still opens:
> > >
> > > last_close = rpci->nreaders != 0 || rpci->nwriters != 0;
> > > rpci->nreaders = 0;
> > > ...
> > > rpci->nwriters = 0;
> > > if (last_close && ops->release_pipe)
> > > ops->release_pipe(inode);
> >
> > Which means that if the kernel calls rpc_close_pipes() before gssd has
> > managed to close, then you _NEVER_ call ops->release_pipe()...
>
> So, I take "before gssd has managed to close" to mean that gssd is still
> holding the file open. Thus the statement
>
> last_close = rpci->nreaders != 0 || rpci->nwriters != 0;
>
> evaluates to true; either nreaders or nwriters must be nonzero.
>
> (And note the open and close code that modifes nreaders and nwriters is
> all serialized with this code by the i_mutex.)
>
> --b.

Exactly... That is a very common situation that happens pretty much
every time you unmount. The kernel closes the pipe on its side, and
removes the dentry; it doesn't wait for gssd to close the pipe.

Trond

--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-11-10 20:35:31

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 4/9] rpc: add an rpc_pipe_open method

On Sun, 2008-11-09 at 16:04 -0500, J. Bruce Fields wrote:
> We want to transition to a new gssd upcall which is text-based and more
> easily extensible.
>
> To simplify upgrades, as well as testing and debugging, it will help if
> we can upgrade gssd (to a version which understands the new upcall)
> without having to choose at boot (or module-load) time whether we want
> the new or the old upcall.
>
> We will do this by providing two different pipes: one named, as
> currently, after the mechanism (normally "krb5"), and supporting the
> old upcall. One named "gssd" and supporting the new upcall version.
>
> We allow gssd to indicate which version it supports by its choice of
> which pipe to open.
>
> As we have no interest in supporting *simultaneous* use of both
> versions, we'll forbid opening both pipes at the same time.
>
> So, add a new pipe_open callback to the rpc_pipefs api, which the gss
> code can use to track which pipes have been open, and to refuse opens of
> incompatible pipes.
>
> We only need this to be called on the first open of a given pipe.
>
> Signed-off-by: J. Bruce Fields <[email protected]>
> ---
> include/linux/sunrpc/rpc_pipe_fs.h | 1 +
> net/sunrpc/rpc_pipe.c | 21 ++++++++++++++-------
> 2 files changed, 15 insertions(+), 7 deletions(-)
>
> diff --git a/include/linux/sunrpc/rpc_pipe_fs.h b/include/linux/sunrpc/rpc_pipe_fs.h
> index 51b977a..cea764c 100644
> --- a/include/linux/sunrpc/rpc_pipe_fs.h
> +++ b/include/linux/sunrpc/rpc_pipe_fs.h
> @@ -15,6 +15,7 @@ struct rpc_pipe_ops {
> ssize_t (*upcall)(struct file *, struct rpc_pipe_msg *, char __user *, size_t);
> ssize_t (*downcall)(struct file *, const char __user *, size_t);
> void (*release_pipe)(struct inode *);
> + int (*open_pipe)(struct inode *);
> void (*destroy_msg)(struct rpc_pipe_msg *);
> };
>
> diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
> index 23a2b8f..4171ab7 100644
> --- a/net/sunrpc/rpc_pipe.c
> +++ b/net/sunrpc/rpc_pipe.c
> @@ -169,16 +169,23 @@ static int
> rpc_pipe_open(struct inode *inode, struct file *filp)
> {
> struct rpc_inode *rpci = RPC_I(inode);
> + int first_open = rpci->nreaders == 0 && rpci->nwriters == 0;
> int res = -ENXIO;
>
> mutex_lock(&inode->i_mutex);
> - if (rpci->ops != NULL) {
> - if (filp->f_mode & FMODE_READ)
> - rpci->nreaders ++;
> - if (filp->f_mode & FMODE_WRITE)
> - rpci->nwriters ++;
> - res = 0;
> + if (rpci->ops == NULL)
> + goto out;
> + if (first_open && rpci->ops->open_pipe) {
> + res = rpci->ops->open_pipe(inode);
> + if (res)
> + goto out;
> }
> + if (filp->f_mode & FMODE_READ)
> + rpci->nreaders++;
> + if (filp->f_mode & FMODE_WRITE)
> + rpci->nwriters++;
> + res = 0;
> +out:

BTW: This is racy. You have to set first_open _after_ you take the
inode->i_mutex.

> mutex_unlock(&inode->i_mutex);
> return res;
> }
> @@ -748,7 +755,7 @@ rpc_rmdir(struct dentry *dentry)
> * @name: name of pipe
> * @private: private data to associate with the pipe, for the caller's use
> * @ops: operations defining the behavior of the pipe: upcall, downcall,
> - * release_pipe, and destroy_msg.
> + * release_pipe, open_pipe, and destroy_msg.
> * @flags: rpc_inode flags
> *
> * Data is made available for userspace to read by calls to
--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-11-09 20:46:24

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 2/5] rpc: Use separate spinlock for cred locking in auth_gss.c

On Tue, Jun 17, 2008 at 05:34:47PM -0400, Trond Myklebust wrote:
> On Tue, 2008-06-17 at 16:51 -0400, J. Bruce Fields wrote:
> > On Sat, Jun 14, 2008 at 02:16:27PM -0400, Trond Myklebust wrote:
> > > On Sat, 2008-06-14 at 13:45 -0400, J. Bruce Fields wrote:
> > > > > NACK. I deliberately ripped out the old struct gss_auth spinlock in
> > > > > order to allow multiple gss_auth per inode (I believe Kevin was asking
> > > > > for this).
> > > >
> > > > OK, makes sense. So what will be the scope of a cred lookup--an rpc
> > > > client?
> > >
> > > That should normally be the case, but why do you need to change the
> > > locking here in the first place? Is there ever going to be a case where
> > > the same rpc_cred has upcalls on several different pipes? I can't see
> > > how that could be justified.
> >
> > If you allow changing the upcall version over the life of the kernel,
> > then it's difficult to avoid. You can insist that noone have both the
> > new and old version opened simultaneously, for example, but it's harder
> > to know when there are no longer messages sitting around that have been
> > unhashed but are still lying around waiting for a process to wake up and
> > examine their results.
>
> AFAIK, there are two cases when the dying rpc.gssd closes the pipe:
>
> 1. gss_cred->gc_upcall is set. In this case, the gss_cred has a
> full reference to the gss_msg, so it has no trouble locating the
> message and figuring out if it needs to resend (rpc_purge_list()
> will ensure that the error field is set to EAGAIN) or if the
> call completed before gssd died. If you are worried about the
> fact that gss_upcall_callback uses gss_msg->auth to access the
> inode in order to do the locking, then just add a pointer to the
> inode to gss_msg: it is not as if a gss_msg can migrate from one
> upcall queue to another.
> 2. gss_cred->gc_upcall is not set. In this case the call to
> rpc_purge_list() in rpc_pipe_release() will ensure that the
> message gets immediately released.
>
> In other words, I can't see that keeping the current behaviour will
> cause you to have more than one upcall at a time even if you do restart
> rpc.gssd.

OK, sorry for the delay. I believe there is still a race here in the
presence of two pipes that take upcalls, something like:

task 1 task 2
------ ------

Create upcall message on
old pipe using refresh_upcall

---- gssd closes old pipe, destroys message ----
---- new gssd opens new pipe ----

close of old pipe wakes up
this rpc task; upcall_callback
called.
gss_refresh__callback creates new
upcall on new pipe for the same cred;
sets gc_upcall.
gc_upcall := NULL
woken up later, upcall_callback
dereferences NULL gc_upcall.

The problem being that access to gc_upcall isn't correctly serialized,
since it's protected here by two different i_lock's (task 1 is using the
old pipe's i_lock, task 2 the new pipe's i_lock).

Anyway, rather than fooling with the basic data structures I figured
it's easiest just to add a separate lock which just controls the change
of version and to forbid changing the version while there are still any
upcalls lying around. I'll send patches.

--b.

2008-11-09 20:46:52

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 5/5] rpc: add new gssd upcall pipe

On Sat, Jun 14, 2008 at 12:07:18PM -0400, Trond Myklebust wrote:
> On Fri, 2008-06-13 at 18:50 -0400, J. Bruce Fields wrote:
> > This adds the new text-based gss upcall. We only allow an open from
> > either the new pipe or the old one, and use the pipe_open method to
> > enforce this.
> >
> > If no pipes are open, we provisionally queue messages for the new pipe;
> > a subsequent open of the old pipe will cause all those messages to be
> > purged.
> >
> > Signed-off-by: J. Bruce Fields <[email protected]>
> > ---
>
> I'm not happy about allowing the gss_auth layer to control timeouts and
> cancel upcalls either: that caused us way too much grief and bugs in
> earlier revisions of rpc_pipefs.
>
> Why not rather disallow queueing (i.e. put the task to sleep, and maybe
> print out a reminder on the console every minute or so) until you are
> notified by the rpc_pipefs layer that someone has opened the file for
> reading?

OK, done.

--b.

2008-11-09 21:04:45

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 2/9] rpc: factor out warning code from gss_pipe_destroy_msg

We'll want to call this from elsewhere soon. And this is a bit nicer
anyway.

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/auth_gss/auth_gss.c | 23 ++++++++++++++---------
1 files changed, 14 insertions(+), 9 deletions(-)

diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 07bb395..1cb4bcc 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -369,6 +369,18 @@ gss_setup_upcall(struct rpc_clnt *clnt, struct gss_auth *gss_auth, struct rpc_cr
return gss_msg;
}

+static void warn_gssd(void)
+{
+ static unsigned long ratelimit;
+ unsigned long now = jiffies;
+
+ if (time_after(now, ratelimit)) {
+ printk(KERN_WARNING "RPC: AUTH_GSS upcall timed out.\n"
+ "Please check user daemon is running.\n");
+ ratelimit = now + 15*HZ;
+ }
+}
+
static inline int
gss_refresh_upcall(struct rpc_task *task)
{
@@ -568,21 +580,14 @@ static void
gss_pipe_destroy_msg(struct rpc_pipe_msg *msg)
{
struct gss_upcall_msg *gss_msg = container_of(msg, struct gss_upcall_msg, msg);
- static unsigned long ratelimit;

if (msg->errno < 0) {
dprintk("RPC: gss_pipe_destroy_msg releasing msg %p\n",
gss_msg);
atomic_inc(&gss_msg->count);
gss_unhash_msg(gss_msg);
- if (msg->errno == -ETIMEDOUT) {
- unsigned long now = jiffies;
- if (time_after(now, ratelimit)) {
- printk(KERN_WARNING "RPC: AUTH_GSS upcall timed out.\n"
- "Please check user daemon is running!\n");
- ratelimit = now + 15*HZ;
- }
- }
+ if (msg->errno == -ETIMEDOUT)
+ warn_gssd();
gss_release_msg(gss_msg);
}
}
--
1.5.5.rc1


2008-11-09 21:04:46

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 7/9] rpc: use count of pipe openers to wait for first open

Introduce a global variable pipe_version which will eventually be used
to keep track of which version of the upcall gssd is using.

For now, though, it only keeps track of whether any pipe is open or not;
it is negative if not, zero if one is opened. We use this to wait for
the first gssd to open a pipe.

(Minor digression: note this waits only for the very first open of any
pipe, not for the first open of a pipe for a given auth; thus we still
need the RPC_PIPE_WAIT_FOR_OPEN behavior to wait for gssd to open new
pipes that pop up on subsequent mounts.)

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/auth_gss/auth_gss.c | 64 ++++++++++++++++++++++++++++++++++++++--
1 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 45e6728..3046f2b 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -75,7 +75,12 @@ struct gss_auth {
struct dentry *dentry;
};

+/* pipe_version >= 0 if and only if someone has a pipe open. */
+static int pipe_version = -1;
static atomic_t pipe_users = ATOMIC_INIT(0);
+static DEFINE_SPINLOCK(pipe_version_lock);
+static struct rpc_wait_queue pipe_version_rpc_waitqueue;
+static DECLARE_WAIT_QUEUE_HEAD(pipe_version_waitqueue);

static void gss_free_ctx(struct gss_cl_ctx *);
static struct rpc_pipe_ops gss_upcall_ops;
@@ -234,12 +239,34 @@ struct gss_upcall_msg {
struct gss_cl_ctx *ctx;
};

+static int get_pipe_version(void)
+{
+ int ret;
+
+ spin_lock(&pipe_version_lock);
+ if (pipe_version >= 0) {
+ atomic_inc(&pipe_users);
+ ret = 0;
+ } else
+ ret = -EAGAIN;
+ spin_unlock(&pipe_version_lock);
+ return ret;
+}
+
+static void put_pipe_version(void)
+{
+ if (atomic_dec_and_lock(&pipe_users, &pipe_version_lock)) {
+ pipe_version = -1;
+ spin_unlock(&pipe_version_lock);
+ }
+}
+
static void
gss_release_msg(struct gss_upcall_msg *gss_msg)
{
if (!atomic_dec_and_test(&gss_msg->count))
return;
- atomic_dec(&pipe_users);
+ put_pipe_version();
BUG_ON(!list_empty(&gss_msg->list));
if (gss_msg->ctx != NULL)
gss_put_ctx(gss_msg->ctx);
@@ -330,11 +357,16 @@ static inline struct gss_upcall_msg *
gss_alloc_msg(struct gss_auth *gss_auth, uid_t uid)
{
struct gss_upcall_msg *gss_msg;
+ int vers;

gss_msg = kzalloc(sizeof(*gss_msg), GFP_NOFS);
if (gss_msg == NULL)
return ERR_PTR(-ENOMEM);
- atomic_inc(&pipe_users);
+ vers = get_pipe_version();
+ if (vers < 0) {
+ kfree(gss_msg);
+ return ERR_PTR(vers);
+ }
INIT_LIST_HEAD(&gss_msg->list);
rpc_init_wait_queue(&gss_msg->rpc_waitqueue, "RPCSEC_GSS upcall waitq");
init_waitqueue_head(&gss_msg->waitqueue);
@@ -400,6 +432,14 @@ gss_refresh_upcall(struct rpc_task *task)
dprintk("RPC: %5u gss_refresh_upcall for uid %u\n", task->tk_pid,
cred->cr_uid);
gss_msg = gss_setup_upcall(task->tk_client, gss_auth, cred);
+ if (IS_ERR(gss_msg) == -EAGAIN) {
+ /* XXX: warning on the first, under the assumption we
+ * shouldn't normally hit this case on a refresh. */
+ warn_gssd();
+ task->tk_timeout = 15*HZ;
+ rpc_sleep_on(&pipe_version_rpc_waitqueue, task, NULL);
+ return 0;
+ }
if (IS_ERR(gss_msg)) {
err = PTR_ERR(gss_msg);
goto out;
@@ -437,7 +477,17 @@ gss_create_upcall(struct gss_auth *gss_auth, struct gss_cred *gss_cred)
int err = 0;

dprintk("RPC: gss_upcall for uid %u\n", cred->cr_uid);
+retry:
gss_msg = gss_setup_upcall(gss_auth->client, gss_auth, cred);
+ if (PTR_ERR(gss_msg) == -EAGAIN) {
+ err = wait_event_interruptible_timeout(pipe_version_waitqueue,
+ pipe_version >= 0, 15*HZ);
+ if (err)
+ goto out;
+ if (pipe_version < 0)
+ warn_gssd();
+ goto retry;
+ }
if (IS_ERR(gss_msg)) {
err = PTR_ERR(gss_msg);
goto out;
@@ -562,7 +612,14 @@ out:
static int
gss_pipe_open(struct inode *inode)
{
+ spin_lock(&pipe_version_lock);
+ if (pipe_version < 0) {
+ pipe_version = 0;
+ rpc_wake_up(&pipe_version_rpc_waitqueue);
+ wake_up(&pipe_version_waitqueue);
+ }
atomic_inc(&pipe_users);
+ spin_unlock(&pipe_version_lock);
return 0;
}

@@ -586,7 +643,7 @@ gss_pipe_release(struct inode *inode)
}
spin_unlock(&inode->i_lock);

- atomic_dec(&pipe_users);
+ put_pipe_version();
}

static void
@@ -1372,6 +1429,7 @@ static int __init init_rpcsec_gss(void)
err = gss_svc_init();
if (err)
goto out_unregister;
+ rpc_init_wait_queue(&pipe_version_rpc_waitqueue, "gss pipe version");
return 0;
out_unregister:
rpcauth_unregister(&authgss_ops);
--
1.5.5.rc1


2008-11-09 21:04:47

by J.Bruce Fields

[permalink] [raw]
Subject: text-based gss upcall


So, this time there aren't any changes to the underlying
gss_auth/rpc_pipe locking, and the gss code doesn't attempt to cancel
rpc calls on its own. Instead we:

- keep a single global variable which determines which version (new
or old) of the upcall we use;
- keep a counter which tells us
count of gss pipes open + count of gss upcalls outstanding;
- permit switching versions only when that counter goes to zero;
- use pipe_open and pipe_release callbacks to maintain that
counter; and
- allow the gss code to wait on gssd to open a pipe before
deciding which version of upcall to make.

And the new locking change is one spinlock to protect the global pipe
version.

I've tested with an unmodified gssd, and checked that things still work
correctly when gssd is killed and restarted. I haven't tested with a
gssd that uses the new upcall yet.

Most of these patches are just trivial preparatory cleanup, and the
first two (maybe three) patches might be worth applying independently.

--b.

2008-11-09 21:04:46

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 6/9] rpc: track number of users of the gss upcall pipe

Keep a count of the number of pipes open plus the number of messages on
a pipe. This count isn't used yet.

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/auth_gss/auth_gss.c | 14 ++++++++++++++
1 files changed, 14 insertions(+), 0 deletions(-)

diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 7baa432..45e6728 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -75,6 +75,8 @@ struct gss_auth {
struct dentry *dentry;
};

+static atomic_t pipe_users = ATOMIC_INIT(0);
+
static void gss_free_ctx(struct gss_cl_ctx *);
static struct rpc_pipe_ops gss_upcall_ops;

@@ -237,6 +239,7 @@ gss_release_msg(struct gss_upcall_msg *gss_msg)
{
if (!atomic_dec_and_test(&gss_msg->count))
return;
+ atomic_dec(&pipe_users);
BUG_ON(!list_empty(&gss_msg->list));
if (gss_msg->ctx != NULL)
gss_put_ctx(gss_msg->ctx);
@@ -331,6 +334,7 @@ gss_alloc_msg(struct gss_auth *gss_auth, uid_t uid)
gss_msg = kzalloc(sizeof(*gss_msg), GFP_NOFS);
if (gss_msg == NULL)
return ERR_PTR(-ENOMEM);
+ atomic_inc(&pipe_users);
INIT_LIST_HEAD(&gss_msg->list);
rpc_init_wait_queue(&gss_msg->rpc_waitqueue, "RPCSEC_GSS upcall waitq");
init_waitqueue_head(&gss_msg->waitqueue);
@@ -555,6 +559,13 @@ out:
return err;
}

+static int
+gss_pipe_open(struct inode *inode)
+{
+ atomic_inc(&pipe_users);
+ return 0;
+}
+
static void
gss_pipe_release(struct inode *inode)
{
@@ -574,6 +585,8 @@ gss_pipe_release(struct inode *inode)
spin_lock(&inode->i_lock);
}
spin_unlock(&inode->i_lock);
+
+ atomic_dec(&pipe_users);
}

static void
@@ -1342,6 +1355,7 @@ static struct rpc_pipe_ops gss_upcall_ops = {
.upcall = gss_pipe_upcall,
.downcall = gss_pipe_downcall,
.destroy_msg = gss_pipe_destroy_msg,
+ .open_pipe = gss_pipe_open,
.release_pipe = gss_pipe_release,
};

--
1.5.5.rc1


2008-11-09 21:04:45

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 1/9] rpc: remove unnecessary assignment

We're just about to kfree() gss_auth, so there's no point to setting any
of its fields.

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/auth_gss/auth_gss.c | 1 -
1 files changed, 0 insertions(+), 1 deletions(-)

diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 853a414..07bb395 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -650,7 +650,6 @@ static void
gss_free(struct gss_auth *gss_auth)
{
rpc_unlink(gss_auth->dentry);
- gss_auth->dentry = NULL;
gss_mech_put(gss_auth->mech);

kfree(gss_auth);
--
1.5.5.rc1


2008-11-09 21:04:45

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 3/9] rpc: minor gss_alloc_msg cleanup

I want to add a little more code here, so it'll be convenient to have
this flatter.

Also, I'll want to add another error condition, so it'll be more
convenient to return -ENOMEM than NULL in the error case. The only
caller is already converting NULL to -ENOMEM anyway.

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/auth_gss/auth_gss.c | 24 ++++++++++++------------
1 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 1cb4bcc..7baa432 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -329,16 +329,16 @@ gss_alloc_msg(struct gss_auth *gss_auth, uid_t uid)
struct gss_upcall_msg *gss_msg;

gss_msg = kzalloc(sizeof(*gss_msg), GFP_NOFS);
- if (gss_msg != NULL) {
- INIT_LIST_HEAD(&gss_msg->list);
- rpc_init_wait_queue(&gss_msg->rpc_waitqueue, "RPCSEC_GSS upcall waitq");
- init_waitqueue_head(&gss_msg->waitqueue);
- atomic_set(&gss_msg->count, 1);
- gss_msg->msg.data = &gss_msg->uid;
- gss_msg->msg.len = sizeof(gss_msg->uid);
- gss_msg->uid = uid;
- gss_msg->auth = gss_auth;
- }
+ if (gss_msg == NULL)
+ return ERR_PTR(-ENOMEM);
+ INIT_LIST_HEAD(&gss_msg->list);
+ rpc_init_wait_queue(&gss_msg->rpc_waitqueue, "RPCSEC_GSS upcall waitq");
+ init_waitqueue_head(&gss_msg->waitqueue);
+ atomic_set(&gss_msg->count, 1);
+ gss_msg->msg.data = &gss_msg->uid;
+ gss_msg->msg.len = sizeof(gss_msg->uid);
+ gss_msg->uid = uid;
+ gss_msg->auth = gss_auth;
return gss_msg;
}

@@ -355,8 +355,8 @@ gss_setup_upcall(struct rpc_clnt *clnt, struct gss_auth *gss_auth, struct rpc_cr
uid = 0;

gss_new = gss_alloc_msg(gss_auth, uid);
- if (gss_new == NULL)
- return ERR_PTR(-ENOMEM);
+ if (IS_ERR(gss_new))
+ return gss_new;
gss_msg = gss_add_msg(gss_auth, gss_new);
if (gss_msg == gss_new) {
int res = rpc_queue_upcall(gss_auth->dentry->d_inode, &gss_new->msg);
--
1.5.5.rc1


2008-11-09 21:04:46

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 5/9] rpc: call release_pipe only on last close

I can't see any reason we particularly need to call this until the last
gssd closes the pipe.

Also, this allows to guarantee that open_pipe and release_pipe are
called strictly in pairs; open_pipe on the first open, release_pipe on
the last close. That'll make it very easy for the gss code to keep
track of which pipes gssd is using.

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/rpc_pipe.c | 9 ++++++---
1 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 4171ab7..712e1ec 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -126,13 +126,14 @@ rpc_close_pipes(struct inode *inode)
{
struct rpc_inode *rpci = RPC_I(inode);
struct rpc_pipe_ops *ops;
+ int last_close;

mutex_lock(&inode->i_mutex);
ops = rpci->ops;
if (ops != NULL) {
LIST_HEAD(free_list);
-
spin_lock(&inode->i_lock);
+ last_close = rpci->nreaders != 0 || rpci->nwriters != 0;
rpci->nreaders = 0;
list_splice_init(&rpci->in_upcall, &free_list);
list_splice_init(&rpci->pipe, &free_list);
@@ -141,7 +142,7 @@ rpc_close_pipes(struct inode *inode)
spin_unlock(&inode->i_lock);
rpc_purge_list(rpci, &free_list, ops->destroy_msg, -EPIPE);
rpci->nwriters = 0;
- if (ops->release_pipe)
+ if (last_close && ops->release_pipe)
ops->release_pipe(inode);
cancel_delayed_work_sync(&rpci->queue_timeout);
}
@@ -195,6 +196,7 @@ rpc_pipe_release(struct inode *inode, struct file *filp)
{
struct rpc_inode *rpci = RPC_I(inode);
struct rpc_pipe_msg *msg;
+ int last_close;

mutex_lock(&inode->i_mutex);
if (rpci->ops == NULL)
@@ -221,7 +223,8 @@ rpc_pipe_release(struct inode *inode, struct file *filp)
rpci->ops->destroy_msg, -EAGAIN);
}
}
- if (rpci->ops->release_pipe)
+ last_close = rpci->nwriters == 0 && rpci->nreaders == 0;
+ if (last_close && rpci->ops->release_pipe)
rpci->ops->release_pipe(inode);
out:
mutex_unlock(&inode->i_mutex);
--
1.5.5.rc1


2008-11-09 21:04:47

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 9/9] rpc: implement new upcall

Implement the new upcall. We decide which version of the upcall gssd
will use (new or old), by creating both pipes (the new one named "gssd",
the old one named after the mechanism (e.g., "krb5")), and then waiting
to see which version gssd actually opens.

We don't permit pipes of the two different types to be opened at once.

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/auth_gss/auth_gss.c | 95 +++++++++++++++++++++++++++++++++------
1 files changed, 80 insertions(+), 15 deletions(-)

diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 96c1bab..fbe04cb 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -72,7 +72,13 @@ struct gss_auth {
struct gss_api_mech *mech;
enum rpc_gss_svc service;
struct rpc_clnt *client;
- struct dentry *dentry;
+ /*
+ * There are two upcall pipes; dentry[1], named "gssd", is used
+ * for the new text-based upcall; dentry[0] is named after the
+ * mechanism (for example, "krb5") and exists for
+ * backwards-compatibility with older gssd's.
+ */
+ struct dentry *dentry[2];
};

/* pipe_version >= 0 if and only if someone has a pipe open. */
@@ -227,6 +233,7 @@ err:
return p;
}

+#define UPCALL_BUF_LEN 128

struct gss_upcall_msg {
atomic_t count;
@@ -238,6 +245,7 @@ struct gss_upcall_msg {
struct rpc_wait_queue rpc_waitqueue;
wait_queue_head_t waitqueue;
struct gss_cl_ctx *ctx;
+ char databuf[UPCALL_BUF_LEN];
};

static int get_pipe_version(void)
@@ -247,7 +255,7 @@ static int get_pipe_version(void)
spin_lock(&pipe_version_lock);
if (pipe_version >= 0) {
atomic_inc(&pipe_users);
- ret = 0;
+ ret = pipe_version;
} else
ret = -EAGAIN;
spin_unlock(&pipe_version_lock);
@@ -353,6 +361,29 @@ gss_upcall_callback(struct rpc_task *task)
gss_release_msg(gss_msg);
}

+static void gss_encode_v0_msg(struct gss_upcall_msg *gss_msg)
+{
+ gss_msg->msg.data = &gss_msg->uid;
+ gss_msg->msg.len = sizeof(gss_msg->uid);
+}
+
+static void gss_encode_v1_msg(struct gss_upcall_msg *gss_msg)
+{
+ gss_msg->msg.len = sprintf(gss_msg->databuf, "mech=%s uid=%d\n",
+ gss_msg->auth->mech->gm_name,
+ gss_msg->uid);
+ gss_msg->msg.data = gss_msg->databuf;
+ BUG_ON(gss_msg->msg.len > UPCALL_BUF_LEN);
+}
+
+static void gss_encode_msg(struct gss_upcall_msg *gss_msg)
+{
+ if (pipe_version == 0)
+ gss_encode_v0_msg(gss_msg);
+ else /* pipe_version == 1 */
+ gss_encode_v1_msg(gss_msg);
+}
+
static inline struct gss_upcall_msg *
gss_alloc_msg(struct gss_auth *gss_auth, uid_t uid)
{
@@ -367,13 +398,12 @@ gss_alloc_msg(struct gss_auth *gss_auth, uid_t uid)
kfree(gss_msg);
return ERR_PTR(vers);
}
- gss_msg->inode = RPC_I(gss_auth->dentry->d_inode);
+ gss_msg->inode = RPC_I(gss_auth->dentry[vers]->d_inode);
INIT_LIST_HEAD(&gss_msg->list);
rpc_init_wait_queue(&gss_msg->rpc_waitqueue, "RPCSEC_GSS upcall waitq");
init_waitqueue_head(&gss_msg->waitqueue);
atomic_set(&gss_msg->count, 1);
- gss_msg->msg.data = &gss_msg->uid;
- gss_msg->msg.len = sizeof(gss_msg->uid);
+ gss_encode_msg(gss_msg);
gss_msg->uid = uid;
gss_msg->auth = gss_auth;
return gss_msg;
@@ -613,18 +643,40 @@ out:
return err;
}

+static int gss_pipe_version(struct gss_auth *gss_auth, struct inode *inode)
+{
+ if (gss_auth->dentry[0]->d_inode == inode)
+ return 0;
+ if (gss_auth->dentry[1]->d_inode == inode)
+ return 1;
+ BUG();
+}
+
static int
gss_pipe_open(struct inode *inode)
{
+ struct rpc_inode *rpci = RPC_I(inode);
+ struct rpc_clnt *clnt = rpci->private;
+ struct gss_auth *gss_auth = container_of(clnt->cl_auth,
+ struct gss_auth, rpc_auth);
+ int new_version = gss_pipe_version(gss_auth, inode);
+ int ret = 0;
+
spin_lock(&pipe_version_lock);
if (pipe_version < 0) {
- pipe_version = 0;
+ /* First open of any gss pipe determines the version: */
+ pipe_version = new_version;
rpc_wake_up(&pipe_version_rpc_waitqueue);
wake_up(&pipe_version_waitqueue);
+ } else if (pipe_version != new_version) {
+ /* Trying to open a pipe of a different version */
+ ret = -EBUSY;
+ goto out;
}
atomic_inc(&pipe_users);
+out:
spin_unlock(&pipe_version_lock);
- return 0;
+ return ret;
}

static void
@@ -702,20 +754,32 @@ gss_create(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
atomic_set(&auth->au_count, 1);
kref_init(&gss_auth->kref);

- gss_auth->dentry = rpc_mkpipe(clnt->cl_dentry, gss_auth->mech->gm_name,
- clnt, &gss_upcall_ops, RPC_PIPE_WAIT_FOR_OPEN);
- if (IS_ERR(gss_auth->dentry)) {
- err = PTR_ERR(gss_auth->dentry);
+ gss_auth->dentry[0] = rpc_mkpipe(clnt->cl_dentry,
+ gss_auth->mech->gm_name,
+ clnt, &gss_upcall_ops,
+ RPC_PIPE_WAIT_FOR_OPEN);
+ if (IS_ERR(gss_auth->dentry[0])) {
+ err = PTR_ERR(gss_auth->dentry[0]);
goto err_put_mech;
}

+ gss_auth->dentry[1] = rpc_mkpipe(clnt->cl_dentry,
+ "gssd",
+ clnt, &gss_upcall_ops,
+ RPC_PIPE_WAIT_FOR_OPEN);
+ if (IS_ERR(gss_auth->dentry[1])) {
+ err = PTR_ERR(gss_auth->dentry[1]);
+ goto err_unlink_pipe_0;
+ }
err = rpcauth_init_credcache(auth);
if (err)
- goto err_unlink_pipe;
+ goto err_unlink_pipe_1;

return auth;
-err_unlink_pipe:
- rpc_unlink(gss_auth->dentry);
+err_unlink_pipe_1:
+ rpc_unlink(gss_auth->dentry[1]);
+err_unlink_pipe_0:
+ rpc_unlink(gss_auth->dentry[0]);
err_put_mech:
gss_mech_put(gss_auth->mech);
err_free:
@@ -728,7 +792,8 @@ out_dec:
static void
gss_free(struct gss_auth *gss_auth)
{
- rpc_unlink(gss_auth->dentry);
+ rpc_unlink(gss_auth->dentry[1]);
+ rpc_unlink(gss_auth->dentry[0]);
gss_mech_put(gss_auth->mech);

kfree(gss_auth);
--
1.5.5.rc1


2008-11-09 21:04:47

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 8/9] rpc: store pointer to pipe inode in gss upcall message

Keep a pointer to the inode that the message is queued on in the struct
gss_upcall_msg. This will be convenient, especially after we have a
choice of two pipes that an upcall could be queued on.

Signed-off-by: J. Bruce Fields <[email protected]>
---
net/sunrpc/auth_gss/auth_gss.c | 20 ++++++++++++--------
1 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 3046f2b..96c1bab 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -234,6 +234,7 @@ struct gss_upcall_msg {
struct rpc_pipe_msg msg;
struct list_head list;
struct gss_auth *auth;
+ struct rpc_inode *inode;
struct rpc_wait_queue rpc_waitqueue;
wait_queue_head_t waitqueue;
struct gss_cl_ctx *ctx;
@@ -296,8 +297,8 @@ __gss_find_upcall(struct rpc_inode *rpci, uid_t uid)
static inline struct gss_upcall_msg *
gss_add_msg(struct gss_auth *gss_auth, struct gss_upcall_msg *gss_msg)
{
- struct inode *inode = gss_auth->dentry->d_inode;
- struct rpc_inode *rpci = RPC_I(inode);
+ struct rpc_inode *rpci = gss_msg->inode;
+ struct inode *inode = &rpci->vfs_inode;
struct gss_upcall_msg *old;

spin_lock(&inode->i_lock);
@@ -323,8 +324,7 @@ __gss_unhash_msg(struct gss_upcall_msg *gss_msg)
static void
gss_unhash_msg(struct gss_upcall_msg *gss_msg)
{
- struct gss_auth *gss_auth = gss_msg->auth;
- struct inode *inode = gss_auth->dentry->d_inode;
+ struct inode *inode = &gss_msg->inode->vfs_inode;

if (list_empty(&gss_msg->list))
return;
@@ -340,7 +340,7 @@ gss_upcall_callback(struct rpc_task *task)
struct gss_cred *gss_cred = container_of(task->tk_msg.rpc_cred,
struct gss_cred, gc_base);
struct gss_upcall_msg *gss_msg = gss_cred->gc_upcall;
- struct inode *inode = gss_msg->auth->dentry->d_inode;
+ struct inode *inode = &gss_msg->inode->vfs_inode;

spin_lock(&inode->i_lock);
if (gss_msg->ctx)
@@ -367,6 +367,7 @@ gss_alloc_msg(struct gss_auth *gss_auth, uid_t uid)
kfree(gss_msg);
return ERR_PTR(vers);
}
+ gss_msg->inode = RPC_I(gss_auth->dentry->d_inode);
INIT_LIST_HEAD(&gss_msg->list);
rpc_init_wait_queue(&gss_msg->rpc_waitqueue, "RPCSEC_GSS upcall waitq");
init_waitqueue_head(&gss_msg->waitqueue);
@@ -395,7 +396,8 @@ gss_setup_upcall(struct rpc_clnt *clnt, struct gss_auth *gss_auth, struct rpc_cr
return gss_new;
gss_msg = gss_add_msg(gss_auth, gss_new);
if (gss_msg == gss_new) {
- int res = rpc_queue_upcall(gss_auth->dentry->d_inode, &gss_new->msg);
+ struct inode *inode = &gss_new->inode->vfs_inode;
+ int res = rpc_queue_upcall(inode, &gss_new->msg);
if (res) {
gss_unhash_msg(gss_new);
gss_msg = ERR_PTR(res);
@@ -426,7 +428,7 @@ gss_refresh_upcall(struct rpc_task *task)
struct gss_cred *gss_cred = container_of(cred,
struct gss_cred, gc_base);
struct gss_upcall_msg *gss_msg;
- struct inode *inode = gss_auth->dentry->d_inode;
+ struct inode *inode;
int err = 0;

dprintk("RPC: %5u gss_refresh_upcall for uid %u\n", task->tk_pid,
@@ -444,6 +446,7 @@ gss_refresh_upcall(struct rpc_task *task)
err = PTR_ERR(gss_msg);
goto out;
}
+ inode = &gss_msg->inode->vfs_inode;
spin_lock(&inode->i_lock);
if (gss_cred->gc_upcall != NULL)
rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL);
@@ -470,7 +473,7 @@ out:
static inline int
gss_create_upcall(struct gss_auth *gss_auth, struct gss_cred *gss_cred)
{
- struct inode *inode = gss_auth->dentry->d_inode;
+ struct inode *inode;
struct rpc_cred *cred = &gss_cred->gc_base;
struct gss_upcall_msg *gss_msg;
DEFINE_WAIT(wait);
@@ -492,6 +495,7 @@ retry:
err = PTR_ERR(gss_msg);
goto out;
}
+ inode = &gss_msg->inode->vfs_inode;
for (;;) {
prepare_to_wait(&gss_msg->waitqueue, &wait, TASK_INTERRUPTIBLE);
spin_lock(&inode->i_lock);
--
1.5.5.rc1


2008-11-09 21:04:46

by J.Bruce Fields

[permalink] [raw]
Subject: [PATCH 4/9] rpc: add an rpc_pipe_open method

We want to transition to a new gssd upcall which is text-based and more
easily extensible.

To simplify upgrades, as well as testing and debugging, it will help if
we can upgrade gssd (to a version which understands the new upcall)
without having to choose at boot (or module-load) time whether we want
the new or the old upcall.

We will do this by providing two different pipes: one named, as
currently, after the mechanism (normally "krb5"), and supporting the
old upcall. One named "gssd" and supporting the new upcall version.

We allow gssd to indicate which version it supports by its choice of
which pipe to open.

As we have no interest in supporting *simultaneous* use of both
versions, we'll forbid opening both pipes at the same time.

So, add a new pipe_open callback to the rpc_pipefs api, which the gss
code can use to track which pipes have been open, and to refuse opens of
incompatible pipes.

We only need this to be called on the first open of a given pipe.

Signed-off-by: J. Bruce Fields <[email protected]>
---
include/linux/sunrpc/rpc_pipe_fs.h | 1 +
net/sunrpc/rpc_pipe.c | 21 ++++++++++++++-------
2 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/include/linux/sunrpc/rpc_pipe_fs.h b/include/linux/sunrpc/rpc_pipe_fs.h
index 51b977a..cea764c 100644
--- a/include/linux/sunrpc/rpc_pipe_fs.h
+++ b/include/linux/sunrpc/rpc_pipe_fs.h
@@ -15,6 +15,7 @@ struct rpc_pipe_ops {
ssize_t (*upcall)(struct file *, struct rpc_pipe_msg *, char __user *, size_t);
ssize_t (*downcall)(struct file *, const char __user *, size_t);
void (*release_pipe)(struct inode *);
+ int (*open_pipe)(struct inode *);
void (*destroy_msg)(struct rpc_pipe_msg *);
};

diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 23a2b8f..4171ab7 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -169,16 +169,23 @@ static int
rpc_pipe_open(struct inode *inode, struct file *filp)
{
struct rpc_inode *rpci = RPC_I(inode);
+ int first_open = rpci->nreaders == 0 && rpci->nwriters == 0;
int res = -ENXIO;

mutex_lock(&inode->i_mutex);
- if (rpci->ops != NULL) {
- if (filp->f_mode & FMODE_READ)
- rpci->nreaders ++;
- if (filp->f_mode & FMODE_WRITE)
- rpci->nwriters ++;
- res = 0;
+ if (rpci->ops == NULL)
+ goto out;
+ if (first_open && rpci->ops->open_pipe) {
+ res = rpci->ops->open_pipe(inode);
+ if (res)
+ goto out;
}
+ if (filp->f_mode & FMODE_READ)
+ rpci->nreaders++;
+ if (filp->f_mode & FMODE_WRITE)
+ rpci->nwriters++;
+ res = 0;
+out:
mutex_unlock(&inode->i_mutex);
return res;
}
@@ -748,7 +755,7 @@ rpc_rmdir(struct dentry *dentry)
* @name: name of pipe
* @private: private data to associate with the pipe, for the caller's use
* @ops: operations defining the behavior of the pipe: upcall, downcall,
- * release_pipe, and destroy_msg.
+ * release_pipe, open_pipe, and destroy_msg.
* @flags: rpc_inode flags
*
* Data is made available for userspace to read by calls to
--
1.5.5.rc1


2008-11-10 19:12:23

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 5/9] rpc: call release_pipe only on last close

On Sun, 2008-11-09 at 16:04 -0500, J. Bruce Fields wrote:
> I can't see any reason we particularly need to call this until the last
> gssd closes the pipe.

There's a very good reason: if we call rpc_close_pipes() then it is
because the kernel listener is shutting down. At that point, we want to
return EPIPE for all future read() or write() attempts by gssd.

> Also, this allows to guarantee that open_pipe and release_pipe are
> called strictly in pairs; open_pipe on the first open, release_pipe on
> the last close. That'll make it very easy for the gss code to keep
> track of which pipes gssd is using.

...unless the gss code is no longer running.

--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-11-10 19:49:04

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 5/9] rpc: call release_pipe only on last close

On Mon, Nov 10, 2008 at 02:11:37PM -0500, Trond Myklebust wrote:
> On Sun, 2008-11-09 at 16:04 -0500, J. Bruce Fields wrote:
> > I can't see any reason we particularly need to call this until the last
> > gssd closes the pipe.
>
> There's a very good reason: if we call rpc_close_pipes() then it is
> because the kernel listener is shutting down. At that point, we want to
> return EPIPE for all future read() or write() attempts by gssd.

Sure. This doesn't interfere with that--we're still calling
release_pipe() itself (which is the function that sets rpci->ops to
NULL), it's only the ->release_pipe() op that we're not calling.

> > Also, this allows to guarantee that open_pipe and release_pipe are
> > called strictly in pairs; open_pipe on the first open, release_pipe on
> > the last close. That'll make it very easy for the gss code to keep
> > track of which pipes gssd is using.
>
> ...unless the gss code is no longer running.

I'm not following you.

Note the patch calls ->release_pipe() at close_pipe() time if there are
still opens (and then subsequent closes become no-ops). So it does
guarantee that there's always a matching call.

So my patch description was wrong: "last close" should be something like
"last close, or destruction of an opened pipe." I'll fix that.

--b.

2008-11-10 20:01:43

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 5/9] rpc: call release_pipe only on last close

On Mon, 2008-11-10 at 14:49 -0500, J. Bruce Fields wrote:
> On Mon, Nov 10, 2008 at 02:11:37PM -0500, Trond Myklebust wrote:
> > On Sun, 2008-11-09 at 16:04 -0500, J. Bruce Fields wrote:
> > > I can't see any reason we particularly need to call this until the last
> > > gssd closes the pipe.
> >
> > There's a very good reason: if we call rpc_close_pipes() then it is
> > because the kernel listener is shutting down. At that point, we want to
> > return EPIPE for all future read() or write() attempts by gssd.
>
> Sure. This doesn't interfere with that--we're still calling
> release_pipe() itself (which is the function that sets rpci->ops to
> NULL), it's only the ->release_pipe() op that we're not calling.
>
> > > Also, this allows to guarantee that open_pipe and release_pipe are
> > > called strictly in pairs; open_pipe on the first open, release_pipe on
> > > the last close. That'll make it very easy for the gss code to keep
> > > track of which pipes gssd is using.
> >
> > ...unless the gss code is no longer running.
>
> I'm not following you.
>
> Note the patch calls ->release_pipe() at close_pipe() time if there are
> still opens (and then subsequent closes become no-ops). So it does
> guarantee that there's always a matching call.

No it doesn't. Now that rpci->ops has been set to NULL by
rpc_close_pipes(), exactly how are you going to call
rpci->ops->release_pipe() at a later time?

--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-11-10 20:07:30

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 5/9] rpc: call release_pipe only on last close

On Mon, Nov 10, 2008 at 03:01:19PM -0500, Trond Myklebust wrote:
> On Mon, 2008-11-10 at 14:49 -0500, J. Bruce Fields wrote:
> > On Mon, Nov 10, 2008 at 02:11:37PM -0500, Trond Myklebust wrote:
> > > On Sun, 2008-11-09 at 16:04 -0500, J. Bruce Fields wrote:
> > > > I can't see any reason we particularly need to call this until the last
> > > > gssd closes the pipe.
> > >
> > > There's a very good reason: if we call rpc_close_pipes() then it is
> > > because the kernel listener is shutting down. At that point, we want to
> > > return EPIPE for all future read() or write() attempts by gssd.
> >
> > Sure. This doesn't interfere with that--we're still calling
> > release_pipe() itself (which is the function that sets rpci->ops to
> > NULL), it's only the ->release_pipe() op that we're not calling.
> >
> > > > Also, this allows to guarantee that open_pipe and release_pipe are
> > > > called strictly in pairs; open_pipe on the first open, release_pipe on
> > > > the last close. That'll make it very easy for the gss code to keep
> > > > track of which pipes gssd is using.
> > >
> > > ...unless the gss code is no longer running.
> >
> > I'm not following you.
> >
> > Note the patch calls ->release_pipe() at close_pipe() time if there are
> > still opens (and then subsequent closes become no-ops). So it does
> > guarantee that there's always a matching call.
>
> No it doesn't. Now that rpci->ops has been set to NULL by
> rpc_close_pipes(), exactly how are you going to call
> rpci->ops->release_pipe() at a later time?

Again, I'm dealing with that case by calling release_pipe() from
rpc_close_pipes(), just as the current code does--the only change being
to do that only when there are still opens:

last_close = rpci->nreaders != 0 || rpci->nwriters != 0;
rpci->nreaders = 0;
...
rpci->nwriters = 0;
if (last_close && ops->release_pipe)
ops->release_pipe(inode);

--b.

2008-11-10 20:12:00

by Myklebust, Trond

[permalink] [raw]
Subject: Re: [PATCH 5/9] rpc: call release_pipe only on last close

On Mon, 2008-11-10 at 15:07 -0500, J. Bruce Fields wrote:

> Again, I'm dealing with that case by calling release_pipe() from
> rpc_close_pipes(), just as the current code does--the only change being
> to do that only when there are still opens:
>
> last_close = rpci->nreaders != 0 || rpci->nwriters != 0;
> rpci->nreaders = 0;
> ...
> rpci->nwriters = 0;
> if (last_close && ops->release_pipe)
> ops->release_pipe(inode);

Which means that if the kernel calls rpc_close_pipes() before gssd has
managed to close, then you _NEVER_ call ops->release_pipe()...



--
Trond Myklebust
Linux NFS client maintainer

NetApp
[email protected]
http://www.netapp.com

2008-11-10 20:17:10

by J. Bruce Fields

[permalink] [raw]
Subject: Re: [PATCH 5/9] rpc: call release_pipe only on last close

On Mon, Nov 10, 2008 at 03:11:38PM -0500, Trond Myklebust wrote:
> On Mon, 2008-11-10 at 15:07 -0500, J. Bruce Fields wrote:
>
> > Again, I'm dealing with that case by calling release_pipe() from
> > rpc_close_pipes(), just as the current code does--the only change being
> > to do that only when there are still opens:
> >
> > last_close = rpci->nreaders != 0 || rpci->nwriters != 0;
> > rpci->nreaders = 0;
> > ...
> > rpci->nwriters = 0;
> > if (last_close && ops->release_pipe)
> > ops->release_pipe(inode);
>
> Which means that if the kernel calls rpc_close_pipes() before gssd has
> managed to close, then you _NEVER_ call ops->release_pipe()...

So, I take "before gssd has managed to close" to mean that gssd is still
holding the file open. Thus the statement

last_close = rpci->nreaders != 0 || rpci->nwriters != 0;

evaluates to true; either nreaders or nwriters must be nonzero.

(And note the open and close code that modifes nreaders and nwriters is
all serialized with this code by the i_mutex.)

--b.