2020-02-17 03:32:35

by Gao Xiang

[permalink] [raw]
Subject: [PATCH v2] erofs: convert workstn to XArray

XArray has friendly apis and it will replace the old radix
tree in the near future.

This convert makes use of __xa_cmpxchg when inserting on
a just inserted item by other thread. In detail, instead
of totally looking up again as what we did for the old
radix tree, it will try to legitimize the current in-tree
item in the XArray therefore more effective.

In addition, naming is rather a challenge for non-English
speaker like me. The basic idea of workstn is to provide
a runtime sparse array with items arranged in the physical
block number order. Such items (was called workgroup) can be
used to record compress clusters or for later new features.

However, both workgroup and workstn seem not good names from
whatever point of view, so I'd like to rename them as pslot
and managed_pslots to stand for physical slots. This patch
handles the second as a part of the radix root convert.

Cc: Chao Yu <[email protected]>
Cc: Matthew Wilcox <[email protected]>
Signed-off-by: Gao Xiang <[email protected]>
---
changes since v1:
- update the comment above "struct xarray managed_pslots";
- get rid of "comparison to NULL" style in v1;
- stress tested without strange behaviors;
https://lore.kernel.org/r/[email protected]

fs/erofs/internal.h | 8 ++---
fs/erofs/super.c | 2 +-
fs/erofs/utils.c | 85 ++++++++++++++++-----------------------------
fs/erofs/zdata.c | 65 ++++++++++++++++------------------
4 files changed, 64 insertions(+), 96 deletions(-)

diff --git a/fs/erofs/internal.h b/fs/erofs/internal.h
index c4c6dcdc89ad..b70f52c80852 100644
--- a/fs/erofs/internal.h
+++ b/fs/erofs/internal.h
@@ -52,8 +52,8 @@ struct erofs_sb_info {
struct list_head list;
struct mutex umount_mutex;

- /* the dedicated workstation for compression */
- struct radix_tree_root workstn_tree;
+ /* managed XArray arranged in physical block number */
+ struct xarray managed_pslots;

/* threshold for decompression synchronously */
unsigned int max_sync_decompress_pages;
@@ -402,8 +402,8 @@ static inline void *erofs_get_pcpubuf(unsigned int pagenr)
int erofs_workgroup_put(struct erofs_workgroup *grp);
struct erofs_workgroup *erofs_find_workgroup(struct super_block *sb,
pgoff_t index);
-int erofs_register_workgroup(struct super_block *sb,
- struct erofs_workgroup *grp);
+struct erofs_workgroup *erofs_insert_workgroup(struct super_block *sb,
+ struct erofs_workgroup *grp);
void erofs_workgroup_free_rcu(struct erofs_workgroup *grp);
void erofs_shrinker_register(struct super_block *sb);
void erofs_shrinker_unregister(struct super_block *sb);
diff --git a/fs/erofs/super.c b/fs/erofs/super.c
index 057e6d7b5b7f..b514c67e5fc2 100644
--- a/fs/erofs/super.c
+++ b/fs/erofs/super.c
@@ -425,7 +425,7 @@ static int erofs_fill_super(struct super_block *sb, void *data, int silent)
sb->s_flags &= ~SB_POSIXACL;

#ifdef CONFIG_EROFS_FS_ZIP
- INIT_RADIX_TREE(&sbi->workstn_tree, GFP_ATOMIC);
+ xa_init(&sbi->managed_pslots);
#endif

/* get the root inode */
diff --git a/fs/erofs/utils.c b/fs/erofs/utils.c
index fddc5059c930..56ab5b3f73b0 100644
--- a/fs/erofs/utils.c
+++ b/fs/erofs/utils.c
@@ -37,9 +37,6 @@ void *erofs_get_pcpubuf(unsigned int pagenr)
/* global shrink count (for all mounted EROFS instances) */
static atomic_long_t erofs_global_shrink_cnt;

-#define __erofs_workgroup_get(grp) atomic_inc(&(grp)->refcount)
-#define __erofs_workgroup_put(grp) atomic_dec(&(grp)->refcount)
-
static int erofs_workgroup_get(struct erofs_workgroup *grp)
{
int o;
@@ -66,7 +63,7 @@ struct erofs_workgroup *erofs_find_workgroup(struct super_block *sb,

repeat:
rcu_read_lock();
- grp = radix_tree_lookup(&sbi->workstn_tree, index);
+ grp = xa_load(&sbi->managed_pslots, index);
if (grp) {
if (erofs_workgroup_get(grp)) {
/* prefer to relax rcu read side */
@@ -80,43 +77,36 @@ struct erofs_workgroup *erofs_find_workgroup(struct super_block *sb,
return grp;
}

-int erofs_register_workgroup(struct super_block *sb,
- struct erofs_workgroup *grp)
+struct erofs_workgroup *erofs_insert_workgroup(struct super_block *sb,
+ struct erofs_workgroup *grp)
{
struct erofs_sb_info *sbi;
- int err;
-
- /* grp shouldn't be broken or used before */
- if (atomic_read(&grp->refcount) != 1) {
- DBG_BUGON(1);
- return -EINVAL;
- }
-
- err = radix_tree_preload(GFP_NOFS);
- if (err)
- return err;
-
- sbi = EROFS_SB(sb);
- xa_lock(&sbi->workstn_tree);
+ struct erofs_workgroup *pre;

/*
- * Bump up reference count before making this workgroup
- * visible to other users in order to avoid potential UAF
- * without serialized by workstn_lock.
+ * Bump up a reference count before making this visible
+ * to others for the XArray in order to avoid potential
+ * UAF without serialized by xa_lock.
*/
- __erofs_workgroup_get(grp);
+ atomic_inc(&grp->refcount);

- err = radix_tree_insert(&sbi->workstn_tree, grp->index, grp);
- if (err)
- /*
- * it's safe to decrease since the workgroup isn't visible
- * and refcount >= 2 (cannot be freezed).
- */
- __erofs_workgroup_put(grp);
-
- xa_unlock(&sbi->workstn_tree);
- radix_tree_preload_end();
- return err;
+ sbi = EROFS_SB(sb);
+repeat:
+ xa_lock(&sbi->managed_pslots);
+ pre = __xa_cmpxchg(&sbi->managed_pslots, grp->index,
+ NULL, grp, GFP_NOFS);
+ if (pre) {
+ /* try to legitimize the current in-tree one */
+ if (erofs_workgroup_get(pre)) {
+ xa_unlock(&sbi->managed_pslots);
+ cond_resched();
+ goto repeat;
+ }
+ atomic_dec(&grp->refcount);
+ grp = pre;
+ }
+ xa_unlock(&sbi->managed_pslots);
+ return grp;
}

static void __erofs_workgroup_free(struct erofs_workgroup *grp)
@@ -155,7 +145,7 @@ static bool erofs_try_to_release_workgroup(struct erofs_sb_info *sbi,

/*
* Note that all cached pages should be unattached
- * before deleted from the radix tree. Otherwise some
+ * before deleted from the XArray. Otherwise some
* cached pages could be still attached to the orphan
* old workgroup when the new one is available in the tree.
*/
@@ -169,7 +159,7 @@ static bool erofs_try_to_release_workgroup(struct erofs_sb_info *sbi,
* however in order to avoid some race conditions, add a
* DBG_BUGON to observe this in advance.
*/
- DBG_BUGON(radix_tree_delete(&sbi->workstn_tree, grp->index) != grp);
+ DBG_BUGON(xa_erase(&sbi->managed_pslots, grp->index) != grp);

/*
* If managed cache is on, last refcount should indicate
@@ -182,22 +172,11 @@ static bool erofs_try_to_release_workgroup(struct erofs_sb_info *sbi,
static unsigned long erofs_shrink_workstation(struct erofs_sb_info *sbi,
unsigned long nr_shrink)
{
- pgoff_t first_index = 0;
- void *batch[PAGEVEC_SIZE];
+ struct erofs_workgroup *grp;
unsigned int freed = 0;
+ unsigned long index;

- int i, found;
-repeat:
- xa_lock(&sbi->workstn_tree);
-
- found = radix_tree_gang_lookup(&sbi->workstn_tree,
- batch, first_index, PAGEVEC_SIZE);
-
- for (i = 0; i < found; ++i) {
- struct erofs_workgroup *grp = batch[i];
-
- first_index = grp->index + 1;
-
+ xa_for_each(&sbi->managed_pslots, index, grp) {
/* try to shrink each valid workgroup */
if (!erofs_try_to_release_workgroup(sbi, grp))
continue;
@@ -206,10 +185,6 @@ static unsigned long erofs_shrink_workstation(struct erofs_sb_info *sbi,
if (!--nr_shrink)
break;
}
- xa_unlock(&sbi->workstn_tree);
-
- if (i && nr_shrink)
- goto repeat;
return freed;
}

diff --git a/fs/erofs/zdata.c b/fs/erofs/zdata.c
index 80e47f07d946..0d77a166068f 100644
--- a/fs/erofs/zdata.c
+++ b/fs/erofs/zdata.c
@@ -67,16 +67,6 @@ static void z_erofs_pcluster_init_once(void *ptr)
pcl->compressed_pages[i] = NULL;
}

-static void z_erofs_pcluster_init_always(struct z_erofs_pcluster *pcl)
-{
- struct z_erofs_collection *cl = z_erofs_primarycollection(pcl);
-
- atomic_set(&pcl->obj.refcount, 1);
-
- DBG_BUGON(cl->nr_pages);
- DBG_BUGON(cl->vcnt);
-}
-
int __init z_erofs_init_zip_subsystem(void)
{
pcluster_cachep = kmem_cache_create("erofs_compress",
@@ -341,26 +331,19 @@ static int z_erofs_lookup_collection(struct z_erofs_collector *clt,
struct inode *inode,
struct erofs_map_blocks *map)
{
- struct erofs_workgroup *grp;
- struct z_erofs_pcluster *pcl;
+ struct z_erofs_pcluster *pcl = clt->pcl;
struct z_erofs_collection *cl;
unsigned int length;

- grp = erofs_find_workgroup(inode->i_sb, map->m_pa >> PAGE_SHIFT);
- if (!grp)
- return -ENOENT;
-
- pcl = container_of(grp, struct z_erofs_pcluster, obj);
+ /* to avoid unexpected loop formed by corrupted images */
if (clt->owned_head == &pcl->next || pcl == clt->tailpcl) {
DBG_BUGON(1);
- erofs_workgroup_put(grp);
return -EFSCORRUPTED;
}

cl = z_erofs_primarycollection(pcl);
if (cl->pageofs != (map->m_la & ~PAGE_MASK)) {
DBG_BUGON(1);
- erofs_workgroup_put(grp);
return -EFSCORRUPTED;
}

@@ -368,7 +351,6 @@ static int z_erofs_lookup_collection(struct z_erofs_collector *clt,
if (length & Z_EROFS_PCLUSTER_FULL_LENGTH) {
if ((map->m_llen << Z_EROFS_PCLUSTER_LENGTH_BIT) > length) {
DBG_BUGON(1);
- erofs_workgroup_put(grp);
return -EFSCORRUPTED;
}
} else {
@@ -391,7 +373,6 @@ static int z_erofs_lookup_collection(struct z_erofs_collector *clt,
/* clean tailpcl if the current owned_head is Z_EROFS_PCLUSTER_TAIL */
if (clt->owned_head == Z_EROFS_PCLUSTER_TAIL)
clt->tailpcl = NULL;
- clt->pcl = pcl;
clt->cl = cl;
return 0;
}
@@ -402,14 +383,14 @@ static int z_erofs_register_collection(struct z_erofs_collector *clt,
{
struct z_erofs_pcluster *pcl;
struct z_erofs_collection *cl;
- int err;
+ struct erofs_workgroup *grp;

/* no available workgroup, let's allocate one */
pcl = kmem_cache_alloc(pcluster_cachep, GFP_NOFS);
if (!pcl)
return -ENOMEM;

- z_erofs_pcluster_init_always(pcl);
+ atomic_set(&pcl->obj.refcount, 1);
pcl->obj.index = map->m_pa >> PAGE_SHIFT;

pcl->length = (map->m_llen << Z_EROFS_PCLUSTER_LENGTH_BIT) |
@@ -429,20 +410,27 @@ static int z_erofs_register_collection(struct z_erofs_collector *clt,
clt->mode = COLLECT_PRIMARY_FOLLOWED;

cl = z_erofs_primarycollection(pcl);
+
+ /* must be cleaned before freeing to slab */
+ DBG_BUGON(cl->nr_pages);
+ DBG_BUGON(cl->vcnt);
+
cl->pageofs = map->m_la & ~PAGE_MASK;

/*
* lock all primary followed works before visible to others
* and mutex_trylock *never* fails for a new pcluster.
*/
- mutex_trylock(&cl->lock);
+ DBG_BUGON(!mutex_trylock(&cl->lock));

- err = erofs_register_workgroup(inode->i_sb, &pcl->obj);
- if (err) {
+ grp = erofs_insert_workgroup(inode->i_sb, &pcl->obj);
+ if (grp != &pcl->obj) {
+ clt->pcl = container_of(grp, struct z_erofs_pcluster, obj);
mutex_unlock(&cl->lock);
kmem_cache_free(pcluster_cachep, pcl);
- return -EAGAIN;
+ return -EEXIST;
}
+
/* used to check tail merging loop due to corrupted images */
if (clt->owned_head == Z_EROFS_PCLUSTER_TAIL)
clt->tailpcl = pcl;
@@ -456,6 +444,7 @@ static int z_erofs_collector_begin(struct z_erofs_collector *clt,
struct inode *inode,
struct erofs_map_blocks *map)
{
+ struct erofs_workgroup *grp;
int ret;

DBG_BUGON(clt->cl);
@@ -469,21 +458,25 @@ static int z_erofs_collector_begin(struct z_erofs_collector *clt,
return -EINVAL;
}

-repeat:
- ret = z_erofs_lookup_collection(clt, inode, map);
- if (ret == -ENOENT) {
+ grp = erofs_find_workgroup(inode->i_sb, map->m_pa >> PAGE_SHIFT);
+ if (grp) {
+ clt->pcl = container_of(grp, struct z_erofs_pcluster, obj);
+ } else {
ret = z_erofs_register_collection(clt, inode, map);

- /* someone registered at the same time, give another try */
- if (ret == -EAGAIN) {
- cond_resched();
- goto repeat;
- }
+ if (!ret)
+ goto out;
+ if (ret != -EEXIST)
+ return ret;
}

- if (ret)
+ ret = z_erofs_lookup_collection(clt, inode, map);
+ if (ret) {
+ erofs_workgroup_put(&clt->pcl->obj);
return ret;
+ }

+out:
z_erofs_pagevec_ctor_init(&clt->vector, Z_EROFS_NR_INLINE_PAGEVECS,
clt->cl->pagevec, clt->cl->vcnt);

--
2.17.1


2020-02-19 07:46:52

by Chao Yu

[permalink] [raw]
Subject: Re: [PATCH v2] erofs: convert workstn to XArray

On 2020/2/17 11:30, Gao Xiang wrote:
> XArray has friendly apis and it will replace the old radix
> tree in the near future.
>
> This convert makes use of __xa_cmpxchg when inserting on
> a just inserted item by other thread. In detail, instead
> of totally looking up again as what we did for the old
> radix tree, it will try to legitimize the current in-tree
> item in the XArray therefore more effective.
>
> In addition, naming is rather a challenge for non-English
> speaker like me. The basic idea of workstn is to provide
> a runtime sparse array with items arranged in the physical
> block number order. Such items (was called workgroup) can be
> used to record compress clusters or for later new features.
>
> However, both workgroup and workstn seem not good names from
> whatever point of view, so I'd like to rename them as pslot
> and managed_pslots to stand for physical slots. This patch
> handles the second as a part of the radix root convert.
>
> Cc: Chao Yu <[email protected]>
> Cc: Matthew Wilcox <[email protected]>
> Signed-off-by: Gao Xiang <[email protected]>
> ---
> changes since v1:
> - update the comment above "struct xarray managed_pslots";
> - get rid of "comparison to NULL" style in v1;
> - stress tested without strange behaviors;
> https://lore.kernel.org/r/[email protected]
>
> fs/erofs/internal.h | 8 ++---
> fs/erofs/super.c | 2 +-
> fs/erofs/utils.c | 85 ++++++++++++++++-----------------------------
> fs/erofs/zdata.c | 65 ++++++++++++++++------------------
> 4 files changed, 64 insertions(+), 96 deletions(-)
>
> diff --git a/fs/erofs/internal.h b/fs/erofs/internal.h
> index c4c6dcdc89ad..b70f52c80852 100644
> --- a/fs/erofs/internal.h
> +++ b/fs/erofs/internal.h
> @@ -52,8 +52,8 @@ struct erofs_sb_info {
> struct list_head list;
> struct mutex umount_mutex;
>
> - /* the dedicated workstation for compression */
> - struct radix_tree_root workstn_tree;
> + /* managed XArray arranged in physical block number */
> + struct xarray managed_pslots;
>
> /* threshold for decompression synchronously */
> unsigned int max_sync_decompress_pages;
> @@ -402,8 +402,8 @@ static inline void *erofs_get_pcpubuf(unsigned int pagenr)
> int erofs_workgroup_put(struct erofs_workgroup *grp);
> struct erofs_workgroup *erofs_find_workgroup(struct super_block *sb,
> pgoff_t index);
> -int erofs_register_workgroup(struct super_block *sb,
> - struct erofs_workgroup *grp);
> +struct erofs_workgroup *erofs_insert_workgroup(struct super_block *sb,
> + struct erofs_workgroup *grp);
> void erofs_workgroup_free_rcu(struct erofs_workgroup *grp);
> void erofs_shrinker_register(struct super_block *sb);
> void erofs_shrinker_unregister(struct super_block *sb);
> diff --git a/fs/erofs/super.c b/fs/erofs/super.c
> index 057e6d7b5b7f..b514c67e5fc2 100644
> --- a/fs/erofs/super.c
> +++ b/fs/erofs/super.c
> @@ -425,7 +425,7 @@ static int erofs_fill_super(struct super_block *sb, void *data, int silent)
> sb->s_flags &= ~SB_POSIXACL;
>
> #ifdef CONFIG_EROFS_FS_ZIP
> - INIT_RADIX_TREE(&sbi->workstn_tree, GFP_ATOMIC);
> + xa_init(&sbi->managed_pslots);
> #endif
>
> /* get the root inode */
> diff --git a/fs/erofs/utils.c b/fs/erofs/utils.c
> index fddc5059c930..56ab5b3f73b0 100644
> --- a/fs/erofs/utils.c
> +++ b/fs/erofs/utils.c
> @@ -37,9 +37,6 @@ void *erofs_get_pcpubuf(unsigned int pagenr)
> /* global shrink count (for all mounted EROFS instances) */
> static atomic_long_t erofs_global_shrink_cnt;
>
> -#define __erofs_workgroup_get(grp) atomic_inc(&(grp)->refcount)
> -#define __erofs_workgroup_put(grp) atomic_dec(&(grp)->refcount)
> -
> static int erofs_workgroup_get(struct erofs_workgroup *grp)
> {
> int o;
> @@ -66,7 +63,7 @@ struct erofs_workgroup *erofs_find_workgroup(struct super_block *sb,
>
> repeat:
> rcu_read_lock();
> - grp = radix_tree_lookup(&sbi->workstn_tree, index);
> + grp = xa_load(&sbi->managed_pslots, index);
> if (grp) {
> if (erofs_workgroup_get(grp)) {
> /* prefer to relax rcu read side */
> @@ -80,43 +77,36 @@ struct erofs_workgroup *erofs_find_workgroup(struct super_block *sb,
> return grp;
> }
>
> -int erofs_register_workgroup(struct super_block *sb,
> - struct erofs_workgroup *grp)
> +struct erofs_workgroup *erofs_insert_workgroup(struct super_block *sb,
> + struct erofs_workgroup *grp)
> {
> struct erofs_sb_info *sbi;
> - int err;
> -
> - /* grp shouldn't be broken or used before */
> - if (atomic_read(&grp->refcount) != 1) {
> - DBG_BUGON(1);
> - return -EINVAL;
> - }
> -
> - err = radix_tree_preload(GFP_NOFS);
> - if (err)
> - return err;
> -
> - sbi = EROFS_SB(sb);
> - xa_lock(&sbi->workstn_tree);
> + struct erofs_workgroup *pre;
>
> /*
> - * Bump up reference count before making this workgroup
> - * visible to other users in order to avoid potential UAF
> - * without serialized by workstn_lock.
> + * Bump up a reference count before making this visible
> + * to others for the XArray in order to avoid potential
> + * UAF without serialized by xa_lock.
> */
> - __erofs_workgroup_get(grp);
> + atomic_inc(&grp->refcount);
>
> - err = radix_tree_insert(&sbi->workstn_tree, grp->index, grp);
> - if (err)
> - /*
> - * it's safe to decrease since the workgroup isn't visible
> - * and refcount >= 2 (cannot be freezed).
> - */
> - __erofs_workgroup_put(grp);
> -
> - xa_unlock(&sbi->workstn_tree);
> - radix_tree_preload_end();
> - return err;
> + sbi = EROFS_SB(sb);
> +repeat:
> + xa_lock(&sbi->managed_pslots);
> + pre = __xa_cmpxchg(&sbi->managed_pslots, grp->index,
> + NULL, grp, GFP_NOFS);
> + if (pre) {

It looks __xa_cmpxchg() could return negative value in case of failure, e.g.
no memory case. We'd better handle that case and old valid workgroup separately?

Thanks,

> + /* try to legitimize the current in-tree one */
> + if (erofs_workgroup_get(pre)) {
> + xa_unlock(&sbi->managed_pslots);
> + cond_resched();
> + goto repeat;
> + }
> + atomic_dec(&grp->refcount);
> + grp = pre;
> + }
> + xa_unlock(&sbi->managed_pslots);
> + return grp;
> }
>
> static void __erofs_workgroup_free(struct erofs_workgroup *grp)
> @@ -155,7 +145,7 @@ static bool erofs_try_to_release_workgroup(struct erofs_sb_info *sbi,
>
> /*
> * Note that all cached pages should be unattached
> - * before deleted from the radix tree. Otherwise some
> + * before deleted from the XArray. Otherwise some
> * cached pages could be still attached to the orphan
> * old workgroup when the new one is available in the tree.
> */
> @@ -169,7 +159,7 @@ static bool erofs_try_to_release_workgroup(struct erofs_sb_info *sbi,
> * however in order to avoid some race conditions, add a
> * DBG_BUGON to observe this in advance.
> */
> - DBG_BUGON(radix_tree_delete(&sbi->workstn_tree, grp->index) != grp);
> + DBG_BUGON(xa_erase(&sbi->managed_pslots, grp->index) != grp);
>
> /*
> * If managed cache is on, last refcount should indicate
> @@ -182,22 +172,11 @@ static bool erofs_try_to_release_workgroup(struct erofs_sb_info *sbi,
> static unsigned long erofs_shrink_workstation(struct erofs_sb_info *sbi,
> unsigned long nr_shrink)
> {
> - pgoff_t first_index = 0;
> - void *batch[PAGEVEC_SIZE];
> + struct erofs_workgroup *grp;
> unsigned int freed = 0;
> + unsigned long index;
>
> - int i, found;
> -repeat:
> - xa_lock(&sbi->workstn_tree);
> -
> - found = radix_tree_gang_lookup(&sbi->workstn_tree,
> - batch, first_index, PAGEVEC_SIZE);
> -
> - for (i = 0; i < found; ++i) {
> - struct erofs_workgroup *grp = batch[i];
> -
> - first_index = grp->index + 1;
> -
> + xa_for_each(&sbi->managed_pslots, index, grp) {
> /* try to shrink each valid workgroup */
> if (!erofs_try_to_release_workgroup(sbi, grp))
> continue;
> @@ -206,10 +185,6 @@ static unsigned long erofs_shrink_workstation(struct erofs_sb_info *sbi,
> if (!--nr_shrink)
> break;
> }
> - xa_unlock(&sbi->workstn_tree);
> -
> - if (i && nr_shrink)
> - goto repeat;
> return freed;
> }
>
> diff --git a/fs/erofs/zdata.c b/fs/erofs/zdata.c
> index 80e47f07d946..0d77a166068f 100644
> --- a/fs/erofs/zdata.c
> +++ b/fs/erofs/zdata.c
> @@ -67,16 +67,6 @@ static void z_erofs_pcluster_init_once(void *ptr)
> pcl->compressed_pages[i] = NULL;
> }
>
> -static void z_erofs_pcluster_init_always(struct z_erofs_pcluster *pcl)
> -{
> - struct z_erofs_collection *cl = z_erofs_primarycollection(pcl);
> -
> - atomic_set(&pcl->obj.refcount, 1);
> -
> - DBG_BUGON(cl->nr_pages);
> - DBG_BUGON(cl->vcnt);
> -}
> -
> int __init z_erofs_init_zip_subsystem(void)
> {
> pcluster_cachep = kmem_cache_create("erofs_compress",
> @@ -341,26 +331,19 @@ static int z_erofs_lookup_collection(struct z_erofs_collector *clt,
> struct inode *inode,
> struct erofs_map_blocks *map)
> {
> - struct erofs_workgroup *grp;
> - struct z_erofs_pcluster *pcl;
> + struct z_erofs_pcluster *pcl = clt->pcl;
> struct z_erofs_collection *cl;
> unsigned int length;
>
> - grp = erofs_find_workgroup(inode->i_sb, map->m_pa >> PAGE_SHIFT);
> - if (!grp)
> - return -ENOENT;
> -
> - pcl = container_of(grp, struct z_erofs_pcluster, obj);
> + /* to avoid unexpected loop formed by corrupted images */
> if (clt->owned_head == &pcl->next || pcl == clt->tailpcl) {
> DBG_BUGON(1);
> - erofs_workgroup_put(grp);
> return -EFSCORRUPTED;
> }
>
> cl = z_erofs_primarycollection(pcl);
> if (cl->pageofs != (map->m_la & ~PAGE_MASK)) {
> DBG_BUGON(1);
> - erofs_workgroup_put(grp);
> return -EFSCORRUPTED;
> }
>
> @@ -368,7 +351,6 @@ static int z_erofs_lookup_collection(struct z_erofs_collector *clt,
> if (length & Z_EROFS_PCLUSTER_FULL_LENGTH) {
> if ((map->m_llen << Z_EROFS_PCLUSTER_LENGTH_BIT) > length) {
> DBG_BUGON(1);
> - erofs_workgroup_put(grp);
> return -EFSCORRUPTED;
> }
> } else {
> @@ -391,7 +373,6 @@ static int z_erofs_lookup_collection(struct z_erofs_collector *clt,
> /* clean tailpcl if the current owned_head is Z_EROFS_PCLUSTER_TAIL */
> if (clt->owned_head == Z_EROFS_PCLUSTER_TAIL)
> clt->tailpcl = NULL;
> - clt->pcl = pcl;
> clt->cl = cl;
> return 0;
> }
> @@ -402,14 +383,14 @@ static int z_erofs_register_collection(struct z_erofs_collector *clt,
> {
> struct z_erofs_pcluster *pcl;
> struct z_erofs_collection *cl;
> - int err;
> + struct erofs_workgroup *grp;
>
> /* no available workgroup, let's allocate one */
> pcl = kmem_cache_alloc(pcluster_cachep, GFP_NOFS);
> if (!pcl)
> return -ENOMEM;
>
> - z_erofs_pcluster_init_always(pcl);
> + atomic_set(&pcl->obj.refcount, 1);
> pcl->obj.index = map->m_pa >> PAGE_SHIFT;
>
> pcl->length = (map->m_llen << Z_EROFS_PCLUSTER_LENGTH_BIT) |
> @@ -429,20 +410,27 @@ static int z_erofs_register_collection(struct z_erofs_collector *clt,
> clt->mode = COLLECT_PRIMARY_FOLLOWED;
>
> cl = z_erofs_primarycollection(pcl);
> +
> + /* must be cleaned before freeing to slab */
> + DBG_BUGON(cl->nr_pages);
> + DBG_BUGON(cl->vcnt);
> +
> cl->pageofs = map->m_la & ~PAGE_MASK;
>
> /*
> * lock all primary followed works before visible to others
> * and mutex_trylock *never* fails for a new pcluster.
> */
> - mutex_trylock(&cl->lock);
> + DBG_BUGON(!mutex_trylock(&cl->lock));
>
> - err = erofs_register_workgroup(inode->i_sb, &pcl->obj);
> - if (err) {
> + grp = erofs_insert_workgroup(inode->i_sb, &pcl->obj);
> + if (grp != &pcl->obj) {
> + clt->pcl = container_of(grp, struct z_erofs_pcluster, obj);
> mutex_unlock(&cl->lock);
> kmem_cache_free(pcluster_cachep, pcl);
> - return -EAGAIN;
> + return -EEXIST;
> }
> +
> /* used to check tail merging loop due to corrupted images */
> if (clt->owned_head == Z_EROFS_PCLUSTER_TAIL)
> clt->tailpcl = pcl;
> @@ -456,6 +444,7 @@ static int z_erofs_collector_begin(struct z_erofs_collector *clt,
> struct inode *inode,
> struct erofs_map_blocks *map)
> {
> + struct erofs_workgroup *grp;
> int ret;
>
> DBG_BUGON(clt->cl);
> @@ -469,21 +458,25 @@ static int z_erofs_collector_begin(struct z_erofs_collector *clt,
> return -EINVAL;
> }
>
> -repeat:
> - ret = z_erofs_lookup_collection(clt, inode, map);
> - if (ret == -ENOENT) {
> + grp = erofs_find_workgroup(inode->i_sb, map->m_pa >> PAGE_SHIFT);
> + if (grp) {
> + clt->pcl = container_of(grp, struct z_erofs_pcluster, obj);
> + } else {
> ret = z_erofs_register_collection(clt, inode, map);
>
> - /* someone registered at the same time, give another try */
> - if (ret == -EAGAIN) {
> - cond_resched();
> - goto repeat;
> - }
> + if (!ret)
> + goto out;
> + if (ret != -EEXIST)
> + return ret;
> }
>
> - if (ret)
> + ret = z_erofs_lookup_collection(clt, inode, map);
> + if (ret) {
> + erofs_workgroup_put(&clt->pcl->obj);
> return ret;
> + }
>
> +out:
> z_erofs_pagevec_ctor_init(&clt->vector, Z_EROFS_NR_INLINE_PAGEVECS,
> clt->cl->pagevec, clt->cl->vcnt);
>
>

2020-02-19 08:06:52

by Gao Xiang

[permalink] [raw]
Subject: Re: [PATCH v2] erofs: convert workstn to XArray

Hi Chao,

On Wed, Feb 19, 2020 at 03:46:16PM +0800, Chao Yu wrote:
> On 2020/2/17 11:30, Gao Xiang wrote:

[]

> > - return err;
> > + sbi = EROFS_SB(sb);
> > +repeat:
> > + xa_lock(&sbi->managed_pslots);
> > + pre = __xa_cmpxchg(&sbi->managed_pslots, grp->index,
> > + NULL, grp, GFP_NOFS);
> > + if (pre) {
>
> It looks __xa_cmpxchg() could return negative value in case of failure, e.g.
> no memory case. We'd better handle that case and old valid workgroup separately?

Thanks, that is a quite good catch!

To be honest, I'm not quite sure whether __xa_cmpxchg
could fail due to no memory here (as a quick scan, it
will do __xas_nomem loop interally.)

But anyway, it needs bailing out all potential errors
by using xa_is_err(). Will do some research and send
v3 then.

Thanks,
Gao Xiang