2019-05-23 08:08:28

by Yan, Zheng

[permalink] [raw]
Subject: [PATCH 1/8] ceph: fix error handling in ceph_get_caps()

The function return 0 even when interrupted or try_get_cap_refs()
return error.

Introduce by commit 1199d7da2d "ceph: simplify arguments and return
semantics of try_get_cap_refs"

Signed-off-by: "Yan, Zheng" <[email protected]>
---
fs/ceph/caps.c | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 72f8e1311392..079d0df9650c 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -2738,15 +2738,13 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
_got = 0;
ret = try_get_cap_refs(ci, need, want, endoff,
false, &_got);
- if (ret == -EAGAIN) {
+ if (ret == -EAGAIN)
continue;
- } else if (!ret) {
- int err;
-
+ if (!ret) {
DEFINE_WAIT_FUNC(wait, woken_wake_function);
add_wait_queue(&ci->i_cap_wq, &wait);

- while (!(err = try_get_cap_refs(ci, need, want, endoff,
+ while (!(ret = try_get_cap_refs(ci, need, want, endoff,
true, &_got))) {
if (signal_pending(current)) {
ret = -ERESTARTSYS;
@@ -2756,14 +2754,16 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
}

remove_wait_queue(&ci->i_cap_wq, &wait);
- if (err == -EAGAIN)
+ if (ret == -EAGAIN)
continue;
}
- if (ret == -ESTALE) {
- /* session was killed, try renew caps */
- ret = ceph_renew_caps(&ci->vfs_inode);
- if (ret == 0)
- continue;
+ if (ret < 0) {
+ if (ret == -ESTALE) {
+ /* session was killed, try renew caps */
+ ret = ceph_renew_caps(&ci->vfs_inode);
+ if (ret == 0)
+ continue;
+ }
return ret;
}

--
2.17.2


2019-05-23 08:08:34

by Yan, Zheng

[permalink] [raw]
Subject: [PATCH 4/8] ceph: close race between d_name_cmp() and update_dentry_lease()

d_name_cmp() and update_dentry_lease() lock and unlock dentry->d_lock
respectively. Dentry may get renamed between them. The fix is moving
the dentry name compare into update_dentry_lease().

This patch introduce two version of update_dentry_lease(). One version
is for the case that parent inode is locked. It does not need to check
parent/target inode and dentry name. Another version is for the case
that parent inode is not locked. It checks arent/target inode and dentry
name after locking dentry->d_lock.

Signed-off-by: "Yan, Zheng" <[email protected]>
---
fs/ceph/inode.c | 164 ++++++++++++++++++++++++++----------------------
1 file changed, 88 insertions(+), 76 deletions(-)

diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 8cfece240ffe..e47a25495be5 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1031,59 +1031,38 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
}

/*
- * caller should hold session s_mutex.
+ * caller should hold session s_mutex and dentry->d_lock.
*/
-static void update_dentry_lease(struct dentry *dentry,
- struct ceph_mds_reply_lease *lease,
- struct ceph_mds_session *session,
- unsigned long from_time,
- struct ceph_vino *tgt_vino,
- struct ceph_vino *dir_vino)
+static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
+ struct ceph_mds_reply_lease *lease,
+ struct ceph_mds_session *session,
+ unsigned long from_time,
+ struct ceph_mds_session **old_lease_session)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
long unsigned duration = le32_to_cpu(lease->duration_ms);
long unsigned ttl = from_time + (duration * HZ) / 1000;
long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
- struct inode *dir;
- struct ceph_mds_session *old_lease_session = NULL;
-
- /*
- * Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that
- * we expect a negative dentry.
- */
- if (!tgt_vino && d_really_is_positive(dentry))
- return;
-
- if (tgt_vino && (d_really_is_negative(dentry) ||
- !ceph_ino_compare(d_inode(dentry), tgt_vino)))
- return;

- spin_lock(&dentry->d_lock);
dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
dentry, duration, ttl);

- dir = d_inode(dentry->d_parent);
-
- /* make sure parent matches dir_vino */
- if (!ceph_ino_compare(dir, dir_vino))
- goto out_unlock;
-
/* only track leases on regular dentries */
if (ceph_snap(dir) != CEPH_NOSNAP)
- goto out_unlock;
+ return;

di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
if (duration == 0) {
__ceph_dentry_dir_lease_touch(di);
- goto out_unlock;
+ return;
}

if (di->lease_gen == session->s_cap_gen &&
time_before(ttl, di->time))
- goto out_unlock; /* we already have a newer lease. */
+ return; /* we already have a newer lease. */

if (di->lease_session && di->lease_session != session) {
- old_lease_session = di->lease_session;
+ *old_lease_session = di->lease_session;
di->lease_session = NULL;
}

@@ -1096,6 +1075,62 @@ static void update_dentry_lease(struct dentry *dentry,
di->time = ttl;

__ceph_dentry_lease_touch(di);
+}
+
+static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry,
+ struct ceph_mds_reply_lease *lease,
+ struct ceph_mds_session *session,
+ unsigned long from_time)
+{
+ struct ceph_mds_session *old_lease_session = NULL;
+ spin_lock(&dentry->d_lock);
+ __update_dentry_lease(dir, dentry, lease, session, from_time,
+ &old_lease_session);
+ spin_unlock(&dentry->d_lock);
+ if (old_lease_session)
+ ceph_put_mds_session(old_lease_session);
+}
+
+/*
+ * update dentry lease without having parent inode locked
+ */
+static void update_dentry_lease_careful(struct dentry *dentry,
+ struct ceph_mds_reply_lease *lease,
+ struct ceph_mds_session *session,
+ unsigned long from_time,
+ char *dname, u32 dname_len,
+ struct ceph_vino *pdvino,
+ struct ceph_vino *ptvino)
+
+{
+ struct inode *dir;
+ struct ceph_mds_session *old_lease_session = NULL;
+
+ spin_lock(&dentry->d_lock);
+ /* make sure dentry's name matches target */
+ if (dentry->d_name.len != dname_len ||
+ memcmp(dentry->d_name.name, dname, dname_len))
+ goto out_unlock;
+
+ dir = d_inode(dentry->d_parent);
+ /* make sure parent matches dvino */
+ if (!ceph_ino_compare(dir, pdvino))
+ goto out_unlock;
+
+ /* make sure dentry's inode matches target. NULL ptvino means that
+ * we expect a negative dentry */
+ if (ptvino) {
+ if (d_really_is_negative(dentry))
+ goto out_unlock;
+ if (!ceph_ino_compare(d_inode(dentry), ptvino))
+ goto out_unlock;
+ } else {
+ if (d_really_is_positive(dentry))
+ goto out_unlock;
+ }
+
+ __update_dentry_lease(dir, dentry, lease, session,
+ from_time, &old_lease_session);
out_unlock:
spin_unlock(&dentry->d_lock);
if (old_lease_session)
@@ -1160,19 +1195,6 @@ static int splice_dentry(struct dentry **pdn, struct inode *in)
return 0;
}

-static int d_name_cmp(struct dentry *dentry, const char *name, size_t len)
-{
- int ret;
-
- /* take d_lock to ensure dentry->d_name stability */
- spin_lock(&dentry->d_lock);
- ret = dentry->d_name.len - len;
- if (!ret)
- ret = memcmp(dentry->d_name.name, name, len);
- spin_unlock(&dentry->d_lock);
- return ret;
-}
-
/*
* Incorporate results into the local cache. This is either just
* one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
@@ -1375,10 +1397,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
} else if (have_lease) {
if (d_unhashed(dn))
d_add(dn, NULL);
- update_dentry_lease(dn, rinfo->dlease,
- session,
- req->r_request_started,
- NULL, &dvino);
+ update_dentry_lease(dir, dn,
+ rinfo->dlease, session,
+ req->r_request_started);
}
goto done;
}
@@ -1400,11 +1421,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
}

if (have_lease) {
- tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
- tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
- update_dentry_lease(dn, rinfo->dlease, session,
- req->r_request_started,
- &tvino, &dvino);
+ update_dentry_lease(dir, dn,
+ rinfo->dlease, session,
+ req->r_request_started);
}
dout(" final dn %p\n", dn);
} else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
@@ -1422,27 +1441,20 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
err = splice_dentry(&req->r_dentry, in);
if (err < 0)
goto done;
- } else if (rinfo->head->is_dentry &&
- !d_name_cmp(req->r_dentry, rinfo->dname, rinfo->dname_len)) {
+ } else if (rinfo->head->is_dentry && req->r_dentry) {
+ /* parent inode is not locked, be carefull */
struct ceph_vino *ptvino = NULL;
-
- if ((le32_to_cpu(rinfo->diri.in->cap.caps) & CEPH_CAP_FILE_SHARED) ||
- le32_to_cpu(rinfo->dlease->duration_ms)) {
- dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
- dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
-
- if (rinfo->head->is_target) {
- tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
- tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
- ptvino = &tvino;
- }
-
- update_dentry_lease(req->r_dentry, rinfo->dlease,
- session, req->r_request_started, ptvino,
- &dvino);
- } else {
- dout("%s: no dentry lease or dir cap\n", __func__);
+ dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
+ dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
+ if (rinfo->head->is_target) {
+ tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
+ tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
+ ptvino = &tvino;
}
+ update_dentry_lease_careful(req->r_dentry, rinfo->dlease,
+ session, req->r_request_started,
+ rinfo->dname, rinfo->dname_len,
+ &dvino, ptvino);
}
done:
dout("fill_trace done err=%d\n", err);
@@ -1604,7 +1616,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
/* FIXME: release caps/leases if error occurs */
for (i = 0; i < rinfo->dir_nr; i++) {
struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
- struct ceph_vino tvino, dvino;
+ struct ceph_vino tvino;

dname.name = rde->name;
dname.len = rde->name_len;
@@ -1705,9 +1717,9 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,

ceph_dentry(dn)->offset = rde->offset;

- dvino = ceph_vino(d_inode(parent));
- update_dentry_lease(dn, rde->lease, req->r_session,
- req->r_request_started, &tvino, &dvino);
+ update_dentry_lease(d_inode(parent), dn,
+ rde->lease, req->r_session,
+ req->r_request_started);

if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
ret = fill_readdir_cache(d_inode(parent), dn,
--
2.17.2

2019-05-23 08:08:35

by Yan, Zheng

[permalink] [raw]
Subject: [PATCH 2/8] ceph: single workqueue for inode related works

We have three workqueue for inode works. Later patch will introduce
one more work for inode. It's not good to introcuce more workqueue
and add more 'struct work_struct' to 'struct ceph_inode_info'.

Signed-off-by: "Yan, Zheng" <[email protected]>
---
fs/ceph/file.c | 2 +-
fs/ceph/inode.c | 124 ++++++++++++++++++++++--------------------------
fs/ceph/super.c | 28 +++--------
fs/ceph/super.h | 17 ++++---
4 files changed, 74 insertions(+), 97 deletions(-)

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ccc054794542..b7be02dfb897 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -790,7 +790,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
if (aio_work) {
INIT_WORK(&aio_work->work, ceph_aio_retry_work);
aio_work->req = req;
- queue_work(ceph_inode_to_client(inode)->wb_wq,
+ queue_work(ceph_inode_to_client(inode)->inode_wq,
&aio_work->work);
return;
}
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 6eabcdb321cb..d9ff349821f0 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -33,9 +33,7 @@

static const struct inode_operations ceph_symlink_iops;

-static void ceph_invalidate_work(struct work_struct *work);
-static void ceph_writeback_work(struct work_struct *work);
-static void ceph_vmtruncate_work(struct work_struct *work);
+static void ceph_inode_work(struct work_struct *work);

/*
* find or create an inode, given the ceph ino number
@@ -509,10 +507,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
INIT_LIST_HEAD(&ci->i_snap_realm_item);
INIT_LIST_HEAD(&ci->i_snap_flush_item);

- INIT_WORK(&ci->i_wb_work, ceph_writeback_work);
- INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work);
-
- INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
+ INIT_WORK(&ci->i_work, ceph_inode_work);
+ ci->i_work_mask = 0;

ceph_fscache_inode_init(ci);

@@ -1750,51 +1746,62 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size)
*/
void ceph_queue_writeback(struct inode *inode)
{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ set_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask);
+
ihold(inode);
- if (queue_work(ceph_inode_to_client(inode)->wb_wq,
- &ceph_inode(inode)->i_wb_work)) {
+ if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+ &ci->i_work)) {
dout("ceph_queue_writeback %p\n", inode);
} else {
- dout("ceph_queue_writeback %p failed\n", inode);
+ dout("ceph_queue_writeback %p already queued, mask=%lx\n",
+ inode, ci->i_work_mask);
iput(inode);
}
}

-static void ceph_writeback_work(struct work_struct *work)
-{
- struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
- i_wb_work);
- struct inode *inode = &ci->vfs_inode;
-
- dout("writeback %p\n", inode);
- filemap_fdatawrite(&inode->i_data);
- iput(inode);
-}
-
/*
* queue an async invalidation
*/
void ceph_queue_invalidate(struct inode *inode)
{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ set_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask);
+
ihold(inode);
- if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
- &ceph_inode(inode)->i_pg_inv_work)) {
+ if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+ &ceph_inode(inode)->i_work)) {
dout("ceph_queue_invalidate %p\n", inode);
} else {
- dout("ceph_queue_invalidate %p failed\n", inode);
+ dout("ceph_queue_invalidate %p already queued, mask=%lx\n",
+ inode, ci->i_work_mask);
iput(inode);
}
}

/*
- * Invalidate inode pages in a worker thread. (This can't be done
- * in the message handler context.)
+ * Queue an async vmtruncate. If we fail to queue work, we will handle
+ * the truncation the next time we call __ceph_do_pending_vmtruncate.
*/
-static void ceph_invalidate_work(struct work_struct *work)
+void ceph_queue_vmtruncate(struct inode *inode)
{
- struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
- i_pg_inv_work);
- struct inode *inode = &ci->vfs_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ set_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask);
+
+ ihold(inode);
+ if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+ &ci->i_work)) {
+ dout("ceph_queue_vmtruncate %p\n", inode);
+ } else {
+ dout("ceph_queue_vmtruncate %p already queued, mask=%lx\n",
+ inode, ci->i_work_mask);
+ iput(inode);
+ }
+}
+
+static void ceph_do_invalidate_pages(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
u32 orig_gen;
int check = 0;
@@ -1846,44 +1853,6 @@ static void ceph_invalidate_work(struct work_struct *work)
out:
if (check)
ceph_check_caps(ci, 0, NULL);
- iput(inode);
-}
-
-
-/*
- * called by trunc_wq;
- *
- * We also truncate in a separate thread as well.
- */
-static void ceph_vmtruncate_work(struct work_struct *work)
-{
- struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
- i_vmtruncate_work);
- struct inode *inode = &ci->vfs_inode;
-
- dout("vmtruncate_work %p\n", inode);
- __ceph_do_pending_vmtruncate(inode);
- iput(inode);
-}
-
-/*
- * Queue an async vmtruncate. If we fail to queue work, we will handle
- * the truncation the next time we call __ceph_do_pending_vmtruncate.
- */
-void ceph_queue_vmtruncate(struct inode *inode)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
-
- ihold(inode);
-
- if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
- &ci->i_vmtruncate_work)) {
- dout("ceph_queue_vmtruncate %p\n", inode);
- } else {
- dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
- inode, ci->i_truncate_pending);
- iput(inode);
- }
}

/*
@@ -1947,6 +1916,25 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
wake_up_all(&ci->i_cap_wq);
}

+static void ceph_inode_work(struct work_struct *work)
+{
+ struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
+ i_work);
+ struct inode *inode = &ci->vfs_inode;
+
+ if (test_and_clear_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask)) {
+ dout("writeback %p\n", inode);
+ filemap_fdatawrite(&inode->i_data);
+ }
+ if (test_and_clear_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask))
+ ceph_do_invalidate_pages(inode);
+
+ if (test_and_clear_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask))
+ __ceph_do_pending_vmtruncate(inode);
+
+ iput(inode);
+}
+
/*
* symlinks
*/
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index afc4c5d008d4..b1ee41372e85 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -671,18 +671,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
* The number of concurrent works can be high but they don't need
* to be processed in parallel, limit concurrency.
*/
- fsc->wb_wq = alloc_workqueue("ceph-writeback", 0, 1);
- if (!fsc->wb_wq)
+ fsc->inode_wq = alloc_workqueue("ceph-inode", WQ_UNBOUND, 0);
+ if (!fsc->inode_wq)
goto fail_client;
- fsc->pg_inv_wq = alloc_workqueue("ceph-pg-invalid", 0, 1);
- if (!fsc->pg_inv_wq)
- goto fail_wb_wq;
- fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
- if (!fsc->trunc_wq)
- goto fail_pg_inv_wq;
fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1);
if (!fsc->cap_wq)
- goto fail_trunc_wq;
+ goto fail_inode_wq;

/* set up mempools */
err = -ENOMEM;
@@ -696,12 +690,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,

fail_cap_wq:
destroy_workqueue(fsc->cap_wq);
-fail_trunc_wq:
- destroy_workqueue(fsc->trunc_wq);
-fail_pg_inv_wq:
- destroy_workqueue(fsc->pg_inv_wq);
-fail_wb_wq:
- destroy_workqueue(fsc->wb_wq);
+fail_inode_wq:
+ destroy_workqueue(fsc->inode_wq);
fail_client:
ceph_destroy_client(fsc->client);
fail:
@@ -714,9 +704,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,

static void flush_fs_workqueues(struct ceph_fs_client *fsc)
{
- flush_workqueue(fsc->wb_wq);
- flush_workqueue(fsc->pg_inv_wq);
- flush_workqueue(fsc->trunc_wq);
+ flush_workqueue(fsc->inode_wq);
flush_workqueue(fsc->cap_wq);
}

@@ -724,9 +712,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
{
dout("destroy_fs_client %p\n", fsc);

- destroy_workqueue(fsc->wb_wq);
- destroy_workqueue(fsc->pg_inv_wq);
- destroy_workqueue(fsc->trunc_wq);
+ destroy_workqueue(fsc->inode_wq);
destroy_workqueue(fsc->cap_wq);

mempool_destroy(fsc->wb_pagevec_pool);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index c84135bb72c6..234610ce4155 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -109,9 +109,7 @@ struct ceph_fs_client {
mempool_t *wb_pagevec_pool;
atomic_long_t writeback_count;

- struct workqueue_struct *wb_wq;
- struct workqueue_struct *pg_inv_wq;
- struct workqueue_struct *trunc_wq;
+ struct workqueue_struct *inode_wq;
struct workqueue_struct *cap_wq;

#ifdef CONFIG_DEBUG_FS
@@ -388,10 +386,8 @@ struct ceph_inode_info {
struct list_head i_snap_flush_item;
struct timespec64 i_snap_btime;

- struct work_struct i_wb_work; /* writeback work */
- struct work_struct i_pg_inv_work; /* page invalidation work */
-
- struct work_struct i_vmtruncate_work;
+ struct work_struct i_work;
+ unsigned long i_work_mask;

#ifdef CONFIG_CEPH_FSCACHE
struct fscache_cookie *fscache;
@@ -513,6 +509,13 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_ERROR_FILELOCK (1 << 12) /* have seen file lock errors */


+/*
+ * Masks of ceph inode work.
+ */
+#define CEPH_I_WORK_WRITEBACK 0 /* writeback */
+#define CEPH_I_WORK_INVALIDATE_PAGES 1 /* invalidate pages */
+#define CEPH_I_WORK_VMTRUNCATE 2 /* vmtruncate */
+
/*
* We set the ERROR_WRITE bit when we start seeing write errors on an inode
* and then clear it when they start succeeding. Note that we do a lockless
--
2.17.2

2019-05-23 08:08:37

by Yan, Zheng

[permalink] [raw]
Subject: [PATCH 3/8] ceph: avoid iput_final() while holding mutex or in dispatch thread

iput_final() may wait for reahahead pages. The wait can cause deadlock.
For example:

Workqueue: ceph-msgr ceph_con_workfn [libceph]
Call Trace:
schedule+0x36/0x80
io_schedule+0x16/0x40
__lock_page+0x101/0x140
truncate_inode_pages_range+0x556/0x9f0
truncate_inode_pages_final+0x4d/0x60
evict+0x182/0x1a0
iput+0x1d2/0x220
iterate_session_caps+0x82/0x230 [ceph]
dispatch+0x678/0xa80 [ceph]
ceph_con_workfn+0x95b/0x1560 [libceph]
process_one_work+0x14d/0x410
worker_thread+0x4b/0x460
kthread+0x105/0x140
ret_from_fork+0x22/0x40

Workqueue: ceph-msgr ceph_con_workfn [libceph]
Call Trace:
__schedule+0x3d6/0x8b0
schedule+0x36/0x80
schedule_preempt_disabled+0xe/0x10
mutex_lock+0x2f/0x40
ceph_check_caps+0x505/0xa80 [ceph]
ceph_put_wrbuffer_cap_refs+0x1e5/0x2c0 [ceph]
writepages_finish+0x2d3/0x410 [ceph]
__complete_request+0x26/0x60 [libceph]
handle_reply+0x6c8/0xa10 [libceph]
dispatch+0x29a/0xbb0 [libceph]
ceph_con_workfn+0x95b/0x1560 [libceph]
process_one_work+0x14d/0x410
worker_thread+0x4b/0x460
kthread+0x105/0x140
ret_from_fork+0x22/0x40

In above example, truncate_inode_pages_range() waits for readahead pages
while holding s_mutex. ceph_check_caps() waits for s_mutex and blocks
OSD dispatch thread. Later OSD replies (for readahead) can't be handled.

ceph_check_caps() also may lock snap_rwsem for read. So similar deadlock
can happen if iput_final() is called while holding snap_rwsem.

In general, it's not good to call iput_final() inside MDS/OSD threads or
while holding any mutex.

The fix is introducing ceph_async_iput(), which calls iput_final() in
workqueue.

Signed-off-by: "Yan, Zheng" <[email protected]>
---
fs/ceph/caps.c | 12 ++++++++----
fs/ceph/inode.c | 31 +++++++++++++++++++++++++++----
fs/ceph/mds_client.c | 28 ++++++++++++++++++----------
fs/ceph/quota.c | 9 ++++++---
fs/ceph/snap.c | 16 +++++++++++-----
fs/ceph/super.h | 2 +-
6 files changed, 71 insertions(+), 27 deletions(-)

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 079d0df9650c..0176241eaea7 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -2992,8 +2992,10 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
}
if (complete_capsnap)
wake_up_all(&ci->i_cap_wq);
- while (put-- > 0)
- iput(inode);
+ while (put-- > 0) {
+ /* avoid calling iput_final() in osd dispatch threads */
+ ceph_async_iput(inode);
+ }
}

/*
@@ -3964,8 +3966,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
done:
mutex_unlock(&session->s_mutex);
done_unlocked:
- iput(inode);
ceph_put_string(extra_info.pool_ns);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(inode);
return;

flush_cap_releases:
@@ -4011,7 +4014,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
if (inode) {
dout("check_delayed_caps on %p\n", inode);
ceph_check_caps(ci, flags, NULL);
- iput(inode);
+ /* avoid calling iput_final() in tick thread */
+ ceph_async_iput(inode);
}
}
spin_unlock(&mdsc->cap_delay_lock);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index d9ff349821f0..8cfece240ffe 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1480,7 +1480,8 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
pr_err("fill_inode badness on %p got %d\n", in, rc);
err = rc;
}
- iput(in);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(in);
}

return err;
@@ -1678,8 +1679,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
&req->r_caps_reservation);
if (ret < 0) {
pr_err("fill_inode badness on %p\n", in);
- if (d_really_is_negative(dn))
- iput(in);
+ if (d_really_is_negative(dn)) {
+ /* avoid calling iput_final() in mds
+ * dispatch threads */
+ ceph_async_iput(in);
+ }
d_drop(dn);
err = ret;
goto next_item;
@@ -1689,7 +1693,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
if (ceph_security_xattr_deadlock(in)) {
dout(" skip splicing dn %p to inode %p"
" (security xattr deadlock)\n", dn, in);
- iput(in);
+ ceph_async_iput(in);
skipped++;
goto next_item;
}
@@ -1740,6 +1744,25 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size)
return ret;
}

+/*
+ * Put reference to inode, but avoid calling iput_final() in current thread.
+ * iput_final() may wait for reahahead pages. The wait can cause deadlock in
+ * some contexts.
+ */
+void ceph_async_iput(struct inode *inode)
+{
+ if (!inode)
+ return;
+ for (;;) {
+ if (atomic_add_unless(&inode->i_count, -1, 1))
+ break;
+ if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+ &ceph_inode(inode)->i_work))
+ break;
+ /* queue work failed, i_count must be at least 2 */
+ }
+}
+
/*
* Write back inode data in a worker thread. (This can't be done
* in the message handler context.)
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index e979d1d543e4..60e8ddbdfdc5 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -700,11 +700,12 @@ void ceph_mdsc_release_request(struct kref *kref)
ceph_msg_put(req->r_reply);
if (req->r_inode) {
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
- iput(req->r_inode);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(req->r_inode);
}
if (req->r_parent)
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
- iput(req->r_target_inode);
+ ceph_async_iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
if (req->r_old_dentry)
@@ -718,7 +719,7 @@ void ceph_mdsc_release_request(struct kref *kref)
*/
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
- iput(req->r_old_dentry_dir);
+ ceph_async_iput(req->r_old_dentry_dir);
}
kfree(req->r_path1);
kfree(req->r_path2);
@@ -828,7 +829,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
}

if (req->r_unsafe_dir) {
- iput(req->r_unsafe_dir);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(req->r_unsafe_dir);
req->r_unsafe_dir = NULL;
}

@@ -993,7 +995,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
if (!cap) {
spin_unlock(&ci->i_ceph_lock);
- iput(inode);
+ ceph_async_iput(inode);
goto random;
}
mds = cap->session->s_mds;
@@ -1002,7 +1004,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock);
out:
- iput(inode);
+ /* avoid calling iput_final() while holding mdsc->mutex or
+ * in mds dispatch threads */
+ ceph_async_iput(inode);
return mds;

random:
@@ -1312,7 +1316,9 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
spin_unlock(&session->s_cap_lock);

if (last_inode) {
- iput(last_inode);
+ /* avoid calling iput_final() while holding
+ * s_mutex or in mds dispatch threads */
+ ceph_async_iput(last_inode);
last_inode = NULL;
}
if (old_cap) {
@@ -1345,7 +1351,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock);

- iput(last_inode);
+ ceph_async_iput(last_inode);
if (old_cap)
ceph_put_cap(session->s_mdsc, old_cap);

@@ -1481,7 +1487,8 @@ static void remove_session_caps(struct ceph_mds_session *session)
spin_unlock(&session->s_cap_lock);

inode = ceph_find_inode(sb, vino);
- iput(inode);
+ /* avoid calling iput_final() while holding s_mutex */
+ ceph_async_iput(inode);

spin_lock(&session->s_cap_lock);
}
@@ -3923,8 +3930,9 @@ static void handle_lease(struct ceph_mds_client *mdsc,
ceph_con_send(&session->s_con, msg);

out:
- iput(inode);
mutex_unlock(&session->s_mutex);
+ /* avoid calling iput_final() in mds dispatch threads */
+ ceph_async_iput(inode);
return;

bad:
diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
index c4522212872c..d629fc857450 100644
--- a/fs/ceph/quota.c
+++ b/fs/ceph/quota.c
@@ -74,7 +74,8 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
le64_to_cpu(h->max_files));
spin_unlock(&ci->i_ceph_lock);

- iput(inode);
+ /* avoid calling iput_final() in dispatch thread */
+ ceph_async_iput(inode);
}

static struct ceph_quotarealm_inode *
@@ -235,7 +236,8 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,

ci = ceph_inode(in);
has_quota = __ceph_has_any_quota(ci);
- iput(in);
+ /* avoid calling iput_final() while holding mdsc->snap_rwsem */
+ ceph_async_iput(in);

next = realm->parent;
if (has_quota || !next)
@@ -372,7 +374,8 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op,
pr_warn("Invalid quota check op (%d)\n", op);
exceeded = true; /* Just break the loop */
}
- iput(in);
+ /* avoid calling iput_final() while holding mdsc->snap_rwsem */
+ ceph_async_iput(in);

next = realm->parent;
if (exceeded || !next)
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index b26e12cd8ec3..72c6c022f02b 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -648,13 +648,15 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
if (!inode)
continue;
spin_unlock(&realm->inodes_with_caps_lock);
- iput(lastinode);
+ /* avoid calling iput_final() while holding
+ * mdsc->snap_rwsem or in mds dispatch threads */
+ ceph_async_iput(lastinode);
lastinode = inode;
ceph_queue_cap_snap(ci);
spin_lock(&realm->inodes_with_caps_lock);
}
spin_unlock(&realm->inodes_with_caps_lock);
- iput(lastinode);
+ ceph_async_iput(lastinode);

dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
}
@@ -806,7 +808,9 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
ihold(inode);
spin_unlock(&mdsc->snap_flush_lock);
ceph_flush_snaps(ci, &session);
- iput(inode);
+ /* avoid calling iput_final() while holding
+ * session->s_mutex or in mds dispatch threads */
+ ceph_async_iput(inode);
spin_lock(&mdsc->snap_flush_lock);
}
spin_unlock(&mdsc->snap_flush_lock);
@@ -950,12 +954,14 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
ceph_get_snap_realm(mdsc, realm);
ceph_put_snap_realm(mdsc, oldrealm);

- iput(inode);
+ /* avoid calling iput_final() while holding
+ * mdsc->snap_rwsem or mds in dispatch threads */
+ ceph_async_iput(inode);
continue;

skip_inode:
spin_unlock(&ci->i_ceph_lock);
- iput(inode);
+ ceph_async_iput(inode);
}

/* we may have taken some of the old realm's children. */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 234610ce4155..11aeb540b0cf 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -904,9 +904,9 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask);
extern bool ceph_inode_set_size(struct inode *inode, loff_t size);
extern void __ceph_do_pending_vmtruncate(struct inode *inode);
extern void ceph_queue_vmtruncate(struct inode *inode);
-
extern void ceph_queue_invalidate(struct inode *inode);
extern void ceph_queue_writeback(struct inode *inode);
+extern void ceph_async_iput(struct inode *inode);

extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
int mask, bool force);
--
2.17.2

2019-05-23 08:08:52

by Yan, Zheng

[permalink] [raw]
Subject: [PATCH 6/8] ceph: use READ_ONCE to access d_parent in RCU critical section

Signed-off-by: "Yan, Zheng" <[email protected]>
---
fs/ceph/mds_client.c | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 60e8ddbdfdc5..870754e9d572 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -913,7 +913,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
struct inode *dir;

rcu_read_lock();
- parent = req->r_dentry->d_parent;
+ parent = READ_ONCE(req->r_dentry->d_parent);
dir = req->r_parent ? : d_inode_rcu(parent);

if (!dir || dir->i_sb != mdsc->fsc->sb) {
@@ -2131,8 +2131,8 @@ char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
dout("build_path path+%d: %p SNAPDIR\n",
pos, temp);
- } else if (stop_on_nosnap && inode && dentry != temp &&
- ceph_snap(inode) == CEPH_NOSNAP) {
+ } else if (stop_on_nosnap && dentry != temp &&
+ inode && ceph_snap(inode) == CEPH_NOSNAP) {
spin_unlock(&temp->d_lock);
pos++; /* get rid of any prepended '/' */
break;
@@ -2145,7 +2145,7 @@ char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
memcpy(path + pos, temp->d_name.name, temp->d_name.len);
}
spin_unlock(&temp->d_lock);
- temp = temp->d_parent;
+ temp = READ_ONCE(temp->d_parent);

/* Are we at the root? */
if (IS_ROOT(temp))
--
2.17.2

2019-05-23 08:09:07

by Yan, Zheng

[permalink] [raw]
Subject: [PATCH 7/8] ceph: ensure d_name/d_parent stability in ceph_mdsc_lease_send_msg()

Signed-off-by: "Yan, Zheng" <[email protected]>
---
fs/ceph/dir.c | 7 +++----
fs/ceph/mds_client.c | 24 +++++++++++++-----------
fs/ceph/mds_client.h | 1 -
3 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 1271024a3797..72efad28857c 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -1433,8 +1433,7 @@ static bool __dentry_lease_is_valid(struct ceph_dentry_info *di)
return false;
}

-static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
- struct inode *dir)
+static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags)
{
struct ceph_dentry_info *di;
struct ceph_mds_session *session = NULL;
@@ -1466,7 +1465,7 @@ static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
spin_unlock(&dentry->d_lock);

if (session) {
- ceph_mdsc_lease_send_msg(session, dir, dentry,
+ ceph_mdsc_lease_send_msg(session, dentry,
CEPH_MDS_LEASE_RENEW, seq);
ceph_put_mds_session(session);
}
@@ -1566,7 +1565,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
valid = 1;
} else {
- valid = dentry_lease_is_valid(dentry, flags, dir);
+ valid = dentry_lease_is_valid(dentry, flags);
if (valid == -ECHILD)
return valid;
if (valid || dir_lease_is_valid(dir, dentry)) {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 870754e9d572..98c500dbec3f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3941,31 +3941,33 @@ static void handle_lease(struct ceph_mds_client *mdsc,
}

void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
- struct inode *inode,
struct dentry *dentry, char action,
u32 seq)
{
struct ceph_msg *msg;
struct ceph_mds_lease *lease;
- int len = sizeof(*lease) + sizeof(u32);
- int dnamelen = 0;
+ struct inode *dir;
+ int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;

- dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
- inode, dentry, ceph_lease_op_name(action), session->s_mds);
- dnamelen = dentry->d_name.len;
- len += dnamelen;
+ dout("lease_send_msg identry %p %s to mds%d\n",
+ dentry, ceph_lease_op_name(action), session->s_mds);

msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
if (!msg)
return;
lease = msg->front.iov_base;
lease->action = action;
- lease->ino = cpu_to_le64(ceph_vino(inode).ino);
- lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
lease->seq = cpu_to_le32(seq);
- put_unaligned_le32(dnamelen, lease + 1);
- memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);

+ spin_lock(&dentry->d_lock);
+ dir = d_inode(dentry->d_parent);
+ lease->ino = cpu_to_le64(ceph_inode(dir)->i_vino.ino);
+ lease->first = lease->last = cpu_to_le64(ceph_inode(dir)->i_vino.snap);
+
+ put_unaligned_le32(dentry->d_name.len, lease + 1);
+ memcpy((void *)(lease + 1) + 4,
+ dentry->d_name.name, dentry->d_name.len);
+ spin_unlock(&dentry->d_lock);
/*
* if this is a preemptive lease RELEASE, no need to
* flush request stream, since the actual request will
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 9c28b86abcf4..330769ecb601 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -505,7 +505,6 @@ extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,

extern void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry);
extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
- struct inode *inode,
struct dentry *dentry, char action,
u32 seq);

--
2.17.2

2019-05-23 08:10:30

by Yan, Zheng

[permalink] [raw]
Subject: [PATCH 5/8] ceph: fix dir_lease_is_valid()

It should call __ceph_dentry_dir_lease_touch() under dentry->d_lock.
Besides, ceph_dentry(dentry) can be NULL when called by LOOKUP_RCU
d_revalidate()

Cc: [email protected] # v5.1+
Signed-off-by: "Yan, Zheng" <[email protected]>
---
fs/ceph/dir.c | 26 +++++++++++++++++---------
1 file changed, 17 insertions(+), 9 deletions(-)

diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 0637149fb9f9..1271024a3797 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -1512,18 +1512,26 @@ static int __dir_lease_try_check(const struct dentry *dentry)
static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
{
struct ceph_inode_info *ci = ceph_inode(dir);
- struct ceph_dentry_info *di = ceph_dentry(dentry);
- int valid = 0;
+ int valid;
+ int shared_gen;

spin_lock(&ci->i_ceph_lock);
- if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen)
- valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
+ valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
+ shared_gen = atomic_read(&ci->i_shared_gen);
spin_unlock(&ci->i_ceph_lock);
- if (valid)
- __ceph_dentry_dir_lease_touch(di);
- dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
- dir, (unsigned)atomic_read(&ci->i_shared_gen),
- dentry, (unsigned)di->lease_shared_gen, valid);
+ if (valid) {
+ struct ceph_dentry_info *di;
+ spin_lock(&dentry->d_lock);
+ di = ceph_dentry(dentry);
+ if (dir == d_inode(dentry->d_parent) &&
+ di && di->lease_shared_gen == shared_gen)
+ __ceph_dentry_dir_lease_touch(di);
+ else
+ valid = 0;
+ spin_unlock(&dentry->d_lock);
+ }
+ dout("dir_lease_is_valid dir %p v%u dentry %p = %d\n",
+ dir, (unsigned)atomic_read(&ci->i_shared_gen), dentry, valid);
return valid;
}

--
2.17.2

2019-05-23 08:10:56

by Yan, Zheng

[permalink] [raw]
Subject: [PATCH 8/8] ceph: hold i_ceph_lock when removing caps for freeing inode

ceph_d_revalidate(, LOOKUP_RCU) may call __ceph_caps_issued_mask()
on a freeing inode.

Cc: [email protected]
Signed-off-by: "Yan, Zheng" <[email protected]>
---
fs/ceph/caps.c | 10 ++++++----
fs/ceph/inode.c | 2 +-
fs/ceph/super.h | 2 +-
3 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 0176241eaea7..7754d7679122 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1263,20 +1263,22 @@ static int send_cap_msg(struct cap_msg_args *arg)
}

/*
- * Queue cap releases when an inode is dropped from our cache. Since
- * inode is about to be destroyed, there is no need for i_ceph_lock.
+ * Queue cap releases when an inode is dropped from our cache.
*/
-void __ceph_remove_caps(struct inode *inode)
+void __ceph_remove_caps(struct ceph_inode_info *ci)
{
- struct ceph_inode_info *ci = ceph_inode(inode);
struct rb_node *p;

+ /* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
+ * may call __ceph_caps_issued_mask() on a freeing inode. */
+ spin_lock(&ci->i_ceph_lock);
p = rb_first(&ci->i_caps);
while (p) {
struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
p = rb_next(p);
__ceph_remove_cap(cap, true);
}
+ spin_unlock(&ci->i_ceph_lock);
}

/*
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e47a25495be5..30d0cdc21035 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -534,7 +534,7 @@ void ceph_destroy_inode(struct inode *inode)

ceph_fscache_unregister_inode_cookie(ci);

- __ceph_remove_caps(inode);
+ __ceph_remove_caps(ci);

if (__ceph_has_any_quota(ci))
ceph_adjust_quota_realms_count(inode, false);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 11aeb540b0cf..e74867743e07 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -1003,7 +1003,7 @@ extern void ceph_add_cap(struct inode *inode,
unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap **new_cap);
extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
-extern void __ceph_remove_caps(struct inode* inode);
+extern void __ceph_remove_caps(struct ceph_inode_info *ci);
extern void ceph_put_cap(struct ceph_mds_client *mdsc,
struct ceph_cap *cap);
extern int ceph_is_any_caps(struct inode *inode);
--
2.17.2

2019-05-29 13:16:41

by Sasha Levin

[permalink] [raw]
Subject: Re: [PATCH 8/8] ceph: hold i_ceph_lock when removing caps for freeing inode

Hi,

[This is an automated email]

This commit has been processed because it contains a -stable tag.
The stable tag indicates that it's relevant for the following trees: all

The bot has tested the following trees: v5.1.4, v5.0.18, v4.19.45, v4.14.121, v4.9.178, v4.4.180, v3.18.140.

v5.1.4: Build OK!
v5.0.18: Failed to apply! Possible dependencies:
e3ec8d6898f71 ("ceph: send cap releases more aggressively")

v4.19.45: Failed to apply! Possible dependencies:
e3ec8d6898f71 ("ceph: send cap releases more aggressively")

v4.14.121: Failed to apply! Possible dependencies:
a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
a57d9064e4ee4 ("ceph: flush pending works before shutdown super")
e3ec8d6898f71 ("ceph: send cap releases more aggressively")

v4.9.178: Failed to apply! Possible dependencies:
a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
a57d9064e4ee4 ("ceph: flush pending works before shutdown super")
e3ec8d6898f71 ("ceph: send cap releases more aggressively")

v4.4.180: Failed to apply! Possible dependencies:
13d1ad16d05ee ("libceph: move message allocation out of ceph_osdc_alloc_request()")
34b759b4a22b0 ("ceph: kill ceph_empty_snapc")
3f1af42ad0fad ("libceph: enable large, variable-sized OSD requests")
5be0389dac662 ("ceph: re-send AIO write request when getting -EOLDSNAP error")
7627151ea30bc ("libceph: define new ceph_file_layout structure")
779fe0fb8e188 ("ceph: rados pool namespace support")
922dab6134178 ("libceph, rbd: ceph_osd_linger_request, watch/notify v2")
a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
ae458f5a171ba ("libceph: make r_request msg_size calculation clearer")
c41d13a31fefe ("rbd: use header_oid instead of header_name")
c8fe9b17d055f ("ceph: Asynchronous IO support")
d30291b985d18 ("libceph: variable-sized ceph_object_id")
e3ec8d6898f71 ("ceph: send cap releases more aggressively")

v3.18.140: Failed to apply! Possible dependencies:
10183a69551f7 ("ceph: check OSD caps before read/write")
28127bdd2f843 ("ceph: convert inline data to normal data before data write")
31c542a199d79 ("ceph: add inline data to pagecache")
5be0389dac662 ("ceph: re-send AIO write request when getting -EOLDSNAP error")
70db4f3629b34 ("ceph: introduce a new inode flag indicating if cached dentries are ordered")
745a8e3bccbc6 ("ceph: don't pre-allocate space for cap release messages")
7627151ea30bc ("libceph: define new ceph_file_layout structure")
779fe0fb8e188 ("ceph: rados pool namespace support")
83701246aee8f ("ceph: sync read inline data")
a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
affbc19a68f99 ("ceph: make sure syncfs flushes all cap snaps")
c8fe9b17d055f ("ceph: Asynchronous IO support")
d30291b985d18 ("libceph: variable-sized ceph_object_id")
d3383a8e37f80 ("ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_sync)")
e3ec8d6898f71 ("ceph: send cap releases more aggressively")
e96a650a8174e ("ceph, rbd: delete unnecessary checks before two function calls")


How should we proceed with this patch?

--
Thanks,
Sasha

2019-05-30 01:49:21

by Yan, Zheng

[permalink] [raw]
Subject: Re: [PATCH 8/8] ceph: hold i_ceph_lock when removing caps for freeing inode

On 5/29/19 9:14 PM, Sasha Levin wrote:
> Hi,
>
> [This is an automated email]
>
> This commit has been processed because it contains a -stable tag.
> The stable tag indicates that it's relevant for the following trees: all
>
> The bot has tested the following trees: v5.1.4, v5.0.18, v4.19.45, v4.14.121, v4.9.178, v4.4.180, v3.18.140.
>
> v5.1.4: Build OK!
> v5.0.18: Failed to apply! Possible dependencies:
> e3ec8d6898f71 ("ceph: send cap releases more aggressively")
>
> v4.19.45: Failed to apply! Possible dependencies:
> e3ec8d6898f71 ("ceph: send cap releases more aggressively")
>
> v4.14.121: Failed to apply! Possible dependencies:
> a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
> a57d9064e4ee4 ("ceph: flush pending works before shutdown super")
> e3ec8d6898f71 ("ceph: send cap releases more aggressively")
>
> v4.9.178: Failed to apply! Possible dependencies:
> a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
> a57d9064e4ee4 ("ceph: flush pending works before shutdown super")
> e3ec8d6898f71 ("ceph: send cap releases more aggressively")
>
> v4.4.180: Failed to apply! Possible dependencies:
> 13d1ad16d05ee ("libceph: move message allocation out of ceph_osdc_alloc_request()")
> 34b759b4a22b0 ("ceph: kill ceph_empty_snapc")
> 3f1af42ad0fad ("libceph: enable large, variable-sized OSD requests")
> 5be0389dac662 ("ceph: re-send AIO write request when getting -EOLDSNAP error")
> 7627151ea30bc ("libceph: define new ceph_file_layout structure")
> 779fe0fb8e188 ("ceph: rados pool namespace support")
> 922dab6134178 ("libceph, rbd: ceph_osd_linger_request, watch/notify v2")
> a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
> ae458f5a171ba ("libceph: make r_request msg_size calculation clearer")
> c41d13a31fefe ("rbd: use header_oid instead of header_name")
> c8fe9b17d055f ("ceph: Asynchronous IO support")
> d30291b985d18 ("libceph: variable-sized ceph_object_id")
> e3ec8d6898f71 ("ceph: send cap releases more aggressively")
>
> v3.18.140: Failed to apply! Possible dependencies:
> 10183a69551f7 ("ceph: check OSD caps before read/write")
> 28127bdd2f843 ("ceph: convert inline data to normal data before data write")
> 31c542a199d79 ("ceph: add inline data to pagecache")
> 5be0389dac662 ("ceph: re-send AIO write request when getting -EOLDSNAP error")
> 70db4f3629b34 ("ceph: introduce a new inode flag indicating if cached dentries are ordered")
> 745a8e3bccbc6 ("ceph: don't pre-allocate space for cap release messages")
> 7627151ea30bc ("libceph: define new ceph_file_layout structure")
> 779fe0fb8e188 ("ceph: rados pool namespace support")
> 83701246aee8f ("ceph: sync read inline data")
> a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
> affbc19a68f99 ("ceph: make sure syncfs flushes all cap snaps")
> c8fe9b17d055f ("ceph: Asynchronous IO support")
> d30291b985d18 ("libceph: variable-sized ceph_object_id")
> d3383a8e37f80 ("ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_sync)")
> e3ec8d6898f71 ("ceph: send cap releases more aggressively")
> e96a650a8174e ("ceph, rbd: delete unnecessary checks before two function calls")
>
>
> How should we proceed with this patch?
>

please use following patch for old kernels

Regards
Yan, Zheng

---
From 55937416f12e096621b06ada7554cacb89d06e97 Mon Sep 17 00:00:00 2001
From: "Yan, Zheng" <[email protected]>
Date: Thu, 23 May 2019 11:01:37 +0800
Subject: [PATCH] ceph: hold i_ceph_lock when removing caps for freeing inode

ceph_d_revalidate(, LOOKUP_RCU) may call __ceph_caps_issued_mask()
on a freeing inode.

Cc: [email protected]
Signed-off-by: "Yan, Zheng" <[email protected]>
Reviewed-by: Jeff Layton <[email protected]>
---
fs/ceph/caps.c | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index ff5d32cf9578..0fb4e919cdce 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1119,20 +1119,23 @@ static int send_cap_msg(struct cap_msg_args *arg)
}

/*
- * Queue cap releases when an inode is dropped from our cache. Since
- * inode is about to be destroyed, there is no need for i_ceph_lock.
+ * Queue cap releases when an inode is dropped from our cache.
*/
void ceph_queue_caps_release(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct rb_node *p;

+ /* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
+ * may call __ceph_caps_issued_mask() on a freeing inode. */
+ spin_lock(&ci->i_ceph_lock);
p = rb_first(&ci->i_caps);
while (p) {
struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
p = rb_next(p);
__ceph_remove_cap(cap, true);
}
+ spin_unlock(&ci->i_ceph_lock);
}

/*
--
2.17.2





> --
> Thanks,
> Sasha
>


2019-08-02 13:13:37

by Greg KH

[permalink] [raw]
Subject: Re: [PATCH 8/8] ceph: hold i_ceph_lock when removing caps for freeing inode

On Thu, May 30, 2019 at 09:46:35AM +0800, Yan, Zheng wrote:
> On 5/29/19 9:14 PM, Sasha Levin wrote:
> > Hi,
> >
> > [This is an automated email]
> >
> > This commit has been processed because it contains a -stable tag.
> > The stable tag indicates that it's relevant for the following trees: all
> >
> > The bot has tested the following trees: v5.1.4, v5.0.18, v4.19.45, v4.14.121, v4.9.178, v4.4.180, v3.18.140.
> >
> > v5.1.4: Build OK!
> > v5.0.18: Failed to apply! Possible dependencies:
> > e3ec8d6898f71 ("ceph: send cap releases more aggressively")
> >
> > v4.19.45: Failed to apply! Possible dependencies:
> > e3ec8d6898f71 ("ceph: send cap releases more aggressively")
> >
> > v4.14.121: Failed to apply! Possible dependencies:
> > a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
> > a57d9064e4ee4 ("ceph: flush pending works before shutdown super")
> > e3ec8d6898f71 ("ceph: send cap releases more aggressively")
> >
> > v4.9.178: Failed to apply! Possible dependencies:
> > a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
> > a57d9064e4ee4 ("ceph: flush pending works before shutdown super")
> > e3ec8d6898f71 ("ceph: send cap releases more aggressively")
> >
> > v4.4.180: Failed to apply! Possible dependencies:
> > 13d1ad16d05ee ("libceph: move message allocation out of ceph_osdc_alloc_request()")
> > 34b759b4a22b0 ("ceph: kill ceph_empty_snapc")
> > 3f1af42ad0fad ("libceph: enable large, variable-sized OSD requests")
> > 5be0389dac662 ("ceph: re-send AIO write request when getting -EOLDSNAP error")
> > 7627151ea30bc ("libceph: define new ceph_file_layout structure")
> > 779fe0fb8e188 ("ceph: rados pool namespace support")
> > 922dab6134178 ("libceph, rbd: ceph_osd_linger_request, watch/notify v2")
> > a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
> > ae458f5a171ba ("libceph: make r_request msg_size calculation clearer")
> > c41d13a31fefe ("rbd: use header_oid instead of header_name")
> > c8fe9b17d055f ("ceph: Asynchronous IO support")
> > d30291b985d18 ("libceph: variable-sized ceph_object_id")
> > e3ec8d6898f71 ("ceph: send cap releases more aggressively")
> >
> > v3.18.140: Failed to apply! Possible dependencies:
> > 10183a69551f7 ("ceph: check OSD caps before read/write")
> > 28127bdd2f843 ("ceph: convert inline data to normal data before data write")
> > 31c542a199d79 ("ceph: add inline data to pagecache")
> > 5be0389dac662 ("ceph: re-send AIO write request when getting -EOLDSNAP error")
> > 70db4f3629b34 ("ceph: introduce a new inode flag indicating if cached dentries are ordered")
> > 745a8e3bccbc6 ("ceph: don't pre-allocate space for cap release messages")
> > 7627151ea30bc ("libceph: define new ceph_file_layout structure")
> > 779fe0fb8e188 ("ceph: rados pool namespace support")
> > 83701246aee8f ("ceph: sync read inline data")
> > a1c6b8358171c ("ceph: define argument structure for handle_cap_grant")
> > affbc19a68f99 ("ceph: make sure syncfs flushes all cap snaps")
> > c8fe9b17d055f ("ceph: Asynchronous IO support")
> > d30291b985d18 ("libceph: variable-sized ceph_object_id")
> > d3383a8e37f80 ("ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_sync)")
> > e3ec8d6898f71 ("ceph: send cap releases more aggressively")
> > e96a650a8174e ("ceph, rbd: delete unnecessary checks before two function calls")
> >
> >
> > How should we proceed with this patch?
> >
>
> please use following patch for old kernels
>
> Regards
> Yan, Zheng
>
> ---
> From 55937416f12e096621b06ada7554cacb89d06e97 Mon Sep 17 00:00:00 2001
> From: "Yan, Zheng" <[email protected]>
> Date: Thu, 23 May 2019 11:01:37 +0800
> Subject: [PATCH] ceph: hold i_ceph_lock when removing caps for freeing inode
>
> ceph_d_revalidate(, LOOKUP_RCU) may call __ceph_caps_issued_mask()
> on a freeing inode.
>
> Cc: [email protected]
> Signed-off-by: "Yan, Zheng" <[email protected]>
> Reviewed-by: Jeff Layton <[email protected]>
> ---
> fs/ceph/caps.c | 7 +++++--
> 1 file changed, 5 insertions(+), 2 deletions(-)
>
> diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
> index ff5d32cf9578..0fb4e919cdce 100644
> --- a/fs/ceph/caps.c
> +++ b/fs/ceph/caps.c
> @@ -1119,20 +1119,23 @@ static int send_cap_msg(struct cap_msg_args *arg)
> }
>
> /*
> - * Queue cap releases when an inode is dropped from our cache. Since
> - * inode is about to be destroyed, there is no need for i_ceph_lock.
> + * Queue cap releases when an inode is dropped from our cache.
> */
> void ceph_queue_caps_release(struct inode *inode)
> {
> struct ceph_inode_info *ci = ceph_inode(inode);
> struct rb_node *p;
>
> + /* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
> + * may call __ceph_caps_issued_mask() on a freeing inode. */
> + spin_lock(&ci->i_ceph_lock);
> p = rb_first(&ci->i_caps);
> while (p) {
> struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
> p = rb_next(p);
> __ceph_remove_cap(cap, true);
> }
> + spin_unlock(&ci->i_ceph_lock);
> }
>
> /*
> --
> 2.17.2

Thanks for the backport, now queued up.

greg k-h