Bunch of performance improvements and cleanups Zach Brown and I have
been working on. The code should be pretty solid at this point, though
it could of course use more review and testing.
The results in my testing are pretty impressive, particularly when an
ioctx is being shared between multiple threads. In my crappy synthetic
benchmark, with 4 threads submitting and one thread reaping completions,
I saw overhead in the aio code go from ~50% (mostly ioctx lock
contention) to low single digits. Performance with ioctx per thread
improved too, but I'd have to rerun those benchmarks.
The reason I've been focused on performance when the ioctx is shared is
that for a fair number of real world completions, userspace needs the
completions aggregated somehow - in practice people just end up
implementing this aggregation in userspace today, but if it's done right
we can do it much more efficiently in the kernel.
Performance wise, the end result of this patch series is that submitting
a kiocb writes to _no_ shared cachelines - the penalty for sharing an
ioctx is gone there. There's still going to be some cacheline contention
when we deliver the completions to the aio ringbuffer (at least if you
have interrupts being delivered on multiple cores, which for high end
stuff you do) but I have a couple more patches not in this series that
implement coalescing for that (by taking advantage of interrupt
coalescing). With that, there's basically no bottlenecks or performance
issues to speak of in the aio code.
Real world benchmarks are still lacking, I've just been focused on
profiles. I'll try and post some actual benchmarks/profiles later.
The patch series is on top of v3.7-rc7, git repo is at
http://evilpiepirate.org/git/linux-bcache.git aio-upstream
Kent Overstreet (20):
aio: Kill return value of aio_complete()
aio: kiocb_cancel()
aio: Move private stuff out of aio.h
aio: dprintk() -> pr_debug()
aio: do fget() after aio_get_req()
aio: Make aio_put_req() lockless
aio: Refcounting cleanup
aio: Convert read_events() to hrtimers
aio: Make aio_read_evt() more efficient
aio: Use cancellation list lazily
aio: Change reqs_active to include unreaped completions
aio: Kill batch allocation
aio: Kill struct aio_ring_info
aio: Give shared kioctx fields their own cachelines
aio: reqs_active -> reqs_available
aio: percpu reqs_available
Generic dynamic per cpu refcounting
aio: Percpu ioctx refcount
aio: use xchg() instead of completion_lock
aio: Don't include aio.h in sched.h
Zach Brown (5):
mm: remove old aio use_mm() comment
aio: remove dead code from aio.h
gadget: remove only user of aio retry
aio: remove retry-based AIO
char: add aio_{read,write} to /dev/{null,zero}
arch/s390/hypfs/inode.c | 1 +
block/scsi_ioctl.c | 1 +
drivers/char/mem.c | 36 +
drivers/infiniband/hw/ipath/ipath_file_ops.c | 1 +
drivers/infiniband/hw/qib/qib_file_ops.c | 2 +-
drivers/staging/android/logger.c | 1 +
drivers/usb/gadget/inode.c | 42 +-
fs/9p/vfs_addr.c | 1 +
fs/afs/write.c | 1 +
fs/aio.c | 1362 +++++++++-----------------
fs/block_dev.c | 1 +
fs/btrfs/file.c | 1 +
fs/btrfs/inode.c | 1 +
fs/ceph/file.c | 1 +
fs/compat.c | 1 +
fs/direct-io.c | 1 +
fs/ecryptfs/file.c | 1 +
fs/ext2/inode.c | 1 +
fs/ext3/inode.c | 1 +
fs/ext4/file.c | 1 +
fs/ext4/indirect.c | 1 +
fs/ext4/inode.c | 1 +
fs/ext4/page-io.c | 1 +
fs/fat/inode.c | 1 +
fs/fuse/dev.c | 1 +
fs/fuse/file.c | 1 +
fs/gfs2/aops.c | 1 +
fs/gfs2/file.c | 1 +
fs/hfs/inode.c | 1 +
fs/hfsplus/inode.c | 1 +
fs/jfs/inode.c | 1 +
fs/nilfs2/inode.c | 2 +-
fs/ntfs/file.c | 1 +
fs/ntfs/inode.c | 1 +
fs/ocfs2/aops.h | 2 +
fs/ocfs2/dlmglue.c | 2 +-
fs/ocfs2/inode.h | 2 +
fs/pipe.c | 1 +
fs/read_write.c | 35 +-
fs/reiserfs/inode.c | 1 +
fs/ubifs/file.c | 1 +
fs/udf/inode.c | 1 +
fs/xfs/xfs_aops.c | 1 +
fs/xfs/xfs_file.c | 1 +
include/linux/aio.h | 129 +--
include/linux/cgroup.h | 1 +
include/linux/errno.h | 1 -
include/linux/percpu-refcount.h | 29 +
include/linux/sched.h | 2 -
kernel/fork.c | 1 +
kernel/printk.c | 1 +
kernel/ptrace.c | 1 +
lib/Makefile | 2 +-
lib/percpu-refcount.c | 164 ++++
mm/mmu_context.c | 3 -
mm/page_io.c | 1 +
mm/shmem.c | 1 +
mm/swap.c | 1 +
security/keys/internal.h | 2 +
security/keys/keyctl.c | 1 +
sound/core/pcm_native.c | 2 +-
61 files changed, 820 insertions(+), 1042 deletions(-)
create mode 100644 include/linux/percpu-refcount.h
create mode 100644 lib/percpu-refcount.c
--
1.7.12
From: Zach Brown <[email protected]>
use_mm() is used in more places than just aio. There's no need to
mention callers when describing the function.
Signed-off-by: Zach Brown <[email protected]>
Signed-off-by: Kent Overstreet <[email protected]>
---
mm/mmu_context.c | 3 ---
1 file changed, 3 deletions(-)
diff --git a/mm/mmu_context.c b/mm/mmu_context.c
index 3dcfaf4..8a8cd02 100644
--- a/mm/mmu_context.c
+++ b/mm/mmu_context.c
@@ -14,9 +14,6 @@
* use_mm
* Makes the calling kernel thread take on the specified
* mm context.
- * Called by the retry thread execute retries within the
- * iocb issuer's mm context, so that copy_from/to_user
- * operations work seamlessly for aio.
* (Note: this routine is intended to be called only
* from a kernel thread context)
*/
--
1.7.12
Signed-off-by: Kent Overstreet <[email protected]>
---
drivers/usb/gadget/inode.c | 1 +
fs/aio.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++
include/linux/aio.h | 61 ----------------------------------------------
3 files changed, 62 insertions(+), 61 deletions(-)
diff --git a/drivers/usb/gadget/inode.c b/drivers/usb/gadget/inode.c
index 2a3f001..7640e01 100644
--- a/drivers/usb/gadget/inode.c
+++ b/drivers/usb/gadget/inode.c
@@ -25,6 +25,7 @@
#include <linux/slab.h>
#include <linux/poll.h>
#include <linux/mmu_context.h>
+#include <linux/aio.h>
#include <linux/device.h>
#include <linux/moduleparam.h>
diff --git a/fs/aio.c b/fs/aio.c
index a993234..4dcc02f 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -45,6 +45,67 @@
#define dprintk(x...) do { ; } while (0)
#endif
+#define AIO_RING_MAGIC 0xa10a10a1
+#define AIO_RING_COMPAT_FEATURES 1
+#define AIO_RING_INCOMPAT_FEATURES 0
+struct aio_ring {
+ unsigned id; /* kernel internal index number */
+ unsigned nr; /* number of io_events */
+ unsigned head;
+ unsigned tail;
+
+ unsigned magic;
+ unsigned compat_features;
+ unsigned incompat_features;
+ unsigned header_length; /* size of aio_ring */
+
+
+ struct io_event io_events[0];
+}; /* 128 bytes + ring size */
+
+#define AIO_RING_PAGES 8
+struct aio_ring_info {
+ unsigned long mmap_base;
+ unsigned long mmap_size;
+
+ struct page **ring_pages;
+ spinlock_t ring_lock;
+ long nr_pages;
+
+ unsigned nr, tail;
+
+ struct page *internal_pages[AIO_RING_PAGES];
+};
+
+static inline unsigned aio_ring_avail(struct aio_ring_info *info,
+ struct aio_ring *ring)
+{
+ return (ring->head + info->nr - 1 - ring->tail) % info->nr;
+}
+
+struct kioctx {
+ atomic_t users;
+ int dead;
+
+ /* This needs improving */
+ unsigned long user_id;
+ struct hlist_node list;
+
+ wait_queue_head_t wait;
+
+ spinlock_t ctx_lock;
+
+ int reqs_active;
+ struct list_head active_reqs; /* used for cancellation */
+
+ /* sys_io_setup currently limits this to an unsigned int */
+ unsigned max_reqs;
+
+ struct aio_ring_info ring_info;
+
+ struct rcu_head rcu_head;
+};
+
/*------ sysctl variables----*/
static DEFINE_SPINLOCK(aio_nr_lock);
unsigned long aio_nr; /* current system wide number of aio requests */
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 0ce99e8..f29de1f 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -104,67 +104,6 @@ static inline void init_sync_kiocb(struct kiocb *kiocb, struct file *filp)
};
}
-#define AIO_RING_MAGIC 0xa10a10a1
-#define AIO_RING_COMPAT_FEATURES 1
-#define AIO_RING_INCOMPAT_FEATURES 0
-struct aio_ring {
- unsigned id; /* kernel internal index number */
- unsigned nr; /* number of io_events */
- unsigned head;
- unsigned tail;
-
- unsigned magic;
- unsigned compat_features;
- unsigned incompat_features;
- unsigned header_length; /* size of aio_ring */
-
-
- struct io_event io_events[0];
-}; /* 128 bytes + ring size */
-
-#define AIO_RING_PAGES 8
-struct aio_ring_info {
- unsigned long mmap_base;
- unsigned long mmap_size;
-
- struct page **ring_pages;
- spinlock_t ring_lock;
- long nr_pages;
-
- unsigned nr, tail;
-
- struct page *internal_pages[AIO_RING_PAGES];
-};
-
-static inline unsigned aio_ring_avail(struct aio_ring_info *info,
- struct aio_ring *ring)
-{
- return (ring->head + info->nr - 1 - ring->tail) % info->nr;
-}
-
-struct kioctx {
- atomic_t users;
- int dead;
-
- /* This needs improving */
- unsigned long user_id;
- struct hlist_node list;
-
- wait_queue_head_t wait;
-
- spinlock_t ctx_lock;
-
- int reqs_active;
- struct list_head active_reqs; /* used for cancellation */
-
- /* sys_io_setup currently limits this to an unsigned int */
- unsigned max_reqs;
-
- struct aio_ring_info ring_info;
-
- struct rcu_head rcu_head;
-};
-
/* prototypes */
#ifdef CONFIG_AIO
extern ssize_t wait_on_sync_kiocb(struct kiocb *iocb);
--
1.7.12
The number of outstanding kiocbs is one of the few shared things left
that has to be touched for every kiocb - it'd be nice to make it percpu.
We can make it per cpu by treating it like an allocation problem: we
have a maximum number of kiocbs that can be outstanding (i.e. slots) -
then we just allocate and free slots, and we know how to write per cpu
allocators.
So as prep work for that, we convert reqs_active to reqs_available.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 29 ++++++++++++++---------------
1 file changed, 14 insertions(+), 15 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 7dee3aa..e6f29dc 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -79,7 +79,7 @@ struct kioctx {
long nr_pages;
struct {
- atomic_t reqs_active;
+ atomic_t reqs_available;
} ____cacheline_aligned;
struct {
@@ -301,17 +301,17 @@ static void free_ioctx(struct kioctx *ctx)
head = ring->head;
kunmap_atomic(ring);
- while (atomic_read(&ctx->reqs_active) > 0) {
+ while (atomic_read(&ctx->reqs_available) < ctx->nr) {
wait_event(ctx->wait, head != ctx->tail);
avail = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
- atomic_sub(avail, &ctx->reqs_active);
+ atomic_add(avail, &ctx->reqs_available);
head += avail;
head %= ctx->nr;
}
- WARN_ON(atomic_read(&ctx->reqs_active) < 0);
+ WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr);
aio_free_ring(ctx);
@@ -368,6 +368,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
if (aio_setup_ring(ctx) < 0)
goto out_freectx;
+ atomic_set(&ctx->reqs_available, ctx->nr);
+
/* limit the number of system wide aios */
spin_lock(&aio_nr_lock);
if (aio_nr + nr_events > aio_max_nr ||
@@ -448,7 +450,7 @@ void exit_aio(struct mm_struct *mm)
"exit_aio:ioctx still alive: %d %d %d\n",
atomic_read(&ctx->users),
atomic_read(&ctx->dead),
- atomic_read(&ctx->reqs_active));
+ atomic_read(&ctx->reqs_available));
/*
* We don't need to bother with munmap() here -
* exit_mmap(mm) is coming and it'll unmap everything.
@@ -476,12 +478,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
struct kiocb *req;
- if (atomic_read(&ctx->reqs_active) >= ctx->nr)
+ if (atomic_dec_if_positive(&ctx->reqs_available) <= 0)
return NULL;
- if (atomic_inc_return(&ctx->reqs_active) > ctx->nr)
- goto out_put;
-
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
if (unlikely(!req))
goto out_put;
@@ -491,7 +490,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
return req;
out_put:
- atomic_dec(&ctx->reqs_active);
+ atomic_inc(&ctx->reqs_available);
return NULL;
}
@@ -565,7 +564,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
/*
* Take rcu_read_lock() in case the kioctx is being destroyed, as we
- * need to issue a wakeup after decrementing reqs_active.
+ * need to issue a wakeup after incrementing reqs_available.
*/
rcu_read_lock();
@@ -582,7 +581,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
* when the event got cancelled.
*/
if (test_and_set_bit(KIF_CANCELLED, &iocb->ki_flags)) {
- atomic_dec(&ctx->reqs_active);
+ atomic_inc(&ctx->reqs_available);
/* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
}
@@ -707,7 +706,7 @@ static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
ring->head = head;
kunmap_atomic(ring);
- atomic_sub(ret, &ctx->reqs_active);
+ atomic_add(ret, &ctx->reqs_available);
pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
out:
@@ -770,7 +769,7 @@ static int read_events(struct kioctx *ctx,
break;
/* Try to only show up in io wait if there are ops in flight */
- if (atomic_read(&ctx->reqs_active))
+ if (atomic_read(&ctx->reqs_available) != ctx->nr)
io_schedule();
else
schedule();
@@ -1161,7 +1160,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0;
out_put_req:
- atomic_dec(&ctx->reqs_active);
+ atomic_inc(&ctx->reqs_available);
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
--
1.7.12
This just converts the ioctx refcount to the new generic dynamic percpu
refcount code.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 30 +++++++++++++-----------------
1 file changed, 13 insertions(+), 17 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 94218b7..0975675 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -36,6 +36,7 @@
#include <linux/eventfd.h>
#include <linux/blkdev.h>
#include <linux/compat.h>
+#include <linux/percpu-refcount.h>
#include <asm/kmap_types.h>
#include <asm/uaccess.h>
@@ -65,7 +66,7 @@ struct kioctx_cpu {
};
struct kioctx {
- atomic_t users;
+ struct percpu_ref users;
atomic_t dead;
/* This needs improving */
@@ -297,6 +298,8 @@ static void free_ioctx(struct kioctx *ctx)
struct io_event res;
unsigned cpu, head, avail;
+ pr_debug("freeing %p\n", ctx);
+
spin_lock_irq(&ctx->ctx_lock);
while (!list_empty(&ctx->active_reqs)) {
@@ -341,14 +344,14 @@ static void free_ioctx(struct kioctx *ctx)
synchronize_rcu();
- pr_debug("freeing %p\n", ctx);
+ pr_debug("freed %p\n", ctx);
free_percpu(ctx->cpu);
kmem_cache_free(kioctx_cachep, ctx);
}
static void put_ioctx(struct kioctx *ctx)
{
- if (unlikely(atomic_dec_and_test(&ctx->users)))
+ if (percpu_ref_put(&ctx->users))
free_ioctx(ctx);
}
@@ -377,7 +380,10 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
ctx->max_reqs = nr_events;
- atomic_set(&ctx->users, 2);
+ percpu_ref_init(&ctx->users);
+ rcu_read_lock();
+ percpu_ref_get(&ctx->users);
+ rcu_read_unlock();
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_lock);
@@ -433,12 +439,9 @@ out_freectx:
*/
static void kill_ioctx(struct kioctx *ctx)
{
- if (!atomic_xchg(&ctx->dead, 1)) {
+ if (percpu_ref_kill(&ctx->users)) {
hlist_del_rcu(&ctx->list);
synchronize_rcu();
-
- wake_up_all(&ctx->wait);
-
put_ioctx(ctx);
}
}
@@ -473,12 +476,6 @@ void exit_aio(struct mm_struct *mm)
struct hlist_node *p, *n;
hlist_for_each_entry_safe(ctx, p, n, &mm->ioctx_list, list) {
- if (1 != atomic_read(&ctx->users))
- printk(KERN_DEBUG
- "exit_aio:ioctx still alive: %d %d %d\n",
- atomic_read(&ctx->users),
- atomic_read(&ctx->dead),
- atomic_read(&ctx->reqs_available));
/*
* We don't need to bother with munmap() here -
* exit_mmap(mm) is coming and it'll unmap everything.
@@ -597,8 +594,7 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
if (ctx->user_id == ctx_id){
- BUG_ON(atomic_read(&ctx->dead));
- atomic_inc(&ctx->users);
+ percpu_ref_get(&ctx->users);
ret = ctx;
break;
}
@@ -838,7 +834,7 @@ static int read_events(struct kioctx *ctx,
i += ret;
if (i >= min_nr)
break;
- if (unlikely(atomic_read(&ctx->dead))) {
+ if (unlikely(percpu_ref_dead(&ctx->users))) {
ret = -EINVAL;
break;
}
--
1.7.12
So, for sticking kiocb completions on the kioctx ringbuffer, we need a
lock - it unfortunately can't be lockless.
When the kioctx is shared between threads on different cpus and the rate
of completions is high, this lock sees quite a bit of contention - in
terms of cacheline contention it's the hottest thing in the aio
subsystem.
That means, with a regular spinlock, we're going to take a cache miss
to grab the lock, then another cache miss when we touch the data the
lock protects - if it's on the same cacheline as the lock, other cpus
spinning on the lock are going to be pulling it out from under us as
we're using it.
So, we use an old trick to get rid of this second forced cache miss -
make the data the lock protects be the lock itself, so we grab them both
at once.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 24 +++++++++++++-----------
1 file changed, 13 insertions(+), 11 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 0975675..03b36a0 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -99,11 +99,11 @@ struct kioctx {
struct {
struct mutex ring_lock;
+ unsigned shadow_tail;
} ____cacheline_aligned;
struct {
unsigned tail;
- spinlock_t completion_lock;
} ____cacheline_aligned;
struct {
@@ -324,9 +324,9 @@ static void free_ioctx(struct kioctx *ctx)
kunmap_atomic(ring);
while (atomic_read(&ctx->reqs_available) < ctx->nr) {
- wait_event(ctx->wait, head != ctx->tail);
+ wait_event(ctx->wait, head != ctx->shadow_tail);
- avail = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
+ avail = (head < ctx->shadow_tail ? ctx->shadow_tail : ctx->nr) - head;
atomic_add(avail, &ctx->reqs_available);
head += avail;
@@ -385,7 +385,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
percpu_ref_get(&ctx->users);
rcu_read_unlock();
spin_lock_init(&ctx->ctx_lock);
- spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_lock);
init_waitqueue_head(&ctx->wait);
@@ -664,11 +663,12 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
* ctx->ctx_lock to prevent other code from messing with the tail
* pointer since we might be called from irq context.
*/
- spin_lock_irqsave(&ctx->completion_lock, flags);
+ local_irq_save(flags);
+ while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
+ cpu_relax();
ring = kmap_atomic(ctx->ring_pages[0]);
- tail = ctx->tail;
event = aio_ring_event(ctx, tail);
if (++tail >= ctx->nr)
tail = 0;
@@ -687,14 +687,16 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
smp_wmb(); /* make event visible before updating tail */
- ctx->tail = tail;
+ ctx->shadow_tail = tail;
ring->tail = tail;
+ smp_wmb();
+ ctx->tail = tail;
+ local_irq_restore(flags);
+
put_aio_ring_event(event);
kunmap_atomic(ring);
- spin_unlock_irqrestore(&ctx->completion_lock, flags);
-
pr_debug("added to ring %p at [%lu]\n", iocb, tail);
/*
@@ -744,11 +746,11 @@ static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr);
while (ret < nr) {
- unsigned i = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
+ unsigned i = (head < ctx->shadow_tail ? ctx->shadow_tail : ctx->nr) - head;
struct io_event *ev;
struct page *page;
- if (head == ctx->tail)
+ if (head == ctx->shadow_tail)
break;
i = min_t(int, i, nr - ret);
--
1.7.12
Yay faster kernel compiles
Signed-off-by: Kent Overstreet <[email protected]>
---
arch/s390/hypfs/inode.c | 1 +
block/scsi_ioctl.c | 1 +
drivers/char/mem.c | 1 +
drivers/infiniband/hw/ipath/ipath_file_ops.c | 1 +
drivers/infiniband/hw/qib/qib_file_ops.c | 2 +-
drivers/staging/android/logger.c | 1 +
fs/9p/vfs_addr.c | 1 +
fs/afs/write.c | 1 +
fs/block_dev.c | 1 +
fs/btrfs/file.c | 1 +
fs/btrfs/inode.c | 1 +
fs/ceph/file.c | 1 +
fs/compat.c | 1 +
fs/direct-io.c | 1 +
fs/ecryptfs/file.c | 1 +
fs/ext2/inode.c | 1 +
fs/ext3/inode.c | 1 +
fs/ext4/file.c | 1 +
fs/ext4/indirect.c | 1 +
fs/ext4/inode.c | 1 +
fs/ext4/page-io.c | 1 +
fs/fat/inode.c | 1 +
fs/fuse/dev.c | 1 +
fs/fuse/file.c | 1 +
fs/gfs2/aops.c | 1 +
fs/gfs2/file.c | 1 +
fs/hfs/inode.c | 1 +
fs/hfsplus/inode.c | 1 +
fs/jfs/inode.c | 1 +
fs/nilfs2/inode.c | 2 +-
fs/ntfs/file.c | 1 +
fs/ntfs/inode.c | 1 +
fs/ocfs2/aops.h | 2 ++
fs/ocfs2/inode.h | 2 ++
fs/pipe.c | 1 +
fs/read_write.c | 1 +
fs/reiserfs/inode.c | 1 +
fs/ubifs/file.c | 1 +
fs/udf/inode.c | 1 +
fs/xfs/xfs_aops.c | 1 +
fs/xfs/xfs_file.c | 1 +
include/linux/cgroup.h | 1 +
include/linux/sched.h | 2 --
kernel/fork.c | 1 +
kernel/printk.c | 1 +
kernel/ptrace.c | 1 +
mm/page_io.c | 1 +
mm/shmem.c | 1 +
mm/swap.c | 1 +
security/keys/internal.h | 2 ++
security/keys/keyctl.c | 1 +
sound/core/pcm_native.c | 2 +-
52 files changed, 54 insertions(+), 5 deletions(-)
diff --git a/arch/s390/hypfs/inode.c b/arch/s390/hypfs/inode.c
index 06ea69b..c6c6f43 100644
--- a/arch/s390/hypfs/inode.c
+++ b/arch/s390/hypfs/inode.c
@@ -21,6 +21,7 @@
#include <linux/module.h>
#include <linux/seq_file.h>
#include <linux/mount.h>
+#include <linux/aio.h>
#include <asm/ebcdic.h>
#include "hypfs.h"
diff --git a/block/scsi_ioctl.c b/block/scsi_ioctl.c
index 9a87daa..a5ffcc9 100644
--- a/block/scsi_ioctl.c
+++ b/block/scsi_ioctl.c
@@ -27,6 +27,7 @@
#include <linux/ratelimit.h>
#include <linux/slab.h>
#include <linux/times.h>
+#include <linux/uio.h>
#include <asm/uaccess.h>
#include <scsi/scsi.h>
diff --git a/drivers/char/mem.c b/drivers/char/mem.c
index 968ae6e..6447854 100644
--- a/drivers/char/mem.c
+++ b/drivers/char/mem.c
@@ -28,6 +28,7 @@
#include <linux/pfn.h>
#include <linux/export.h>
#include <linux/io.h>
+#include <linux/aio.h>
#include <asm/uaccess.h>
diff --git a/drivers/infiniband/hw/ipath/ipath_file_ops.c b/drivers/infiniband/hw/ipath/ipath_file_ops.c
index 3eb7e45..62edc41 100644
--- a/drivers/infiniband/hw/ipath/ipath_file_ops.c
+++ b/drivers/infiniband/hw/ipath/ipath_file_ops.c
@@ -40,6 +40,7 @@
#include <linux/slab.h>
#include <linux/highmem.h>
#include <linux/io.h>
+#include <linux/aio.h>
#include <linux/jiffies.h>
#include <linux/cpu.h>
#include <asm/pgtable.h>
diff --git a/drivers/infiniband/hw/qib/qib_file_ops.c b/drivers/infiniband/hw/qib/qib_file_ops.c
index 959a5c4..488300c 100644
--- a/drivers/infiniband/hw/qib/qib_file_ops.c
+++ b/drivers/infiniband/hw/qib/qib_file_ops.c
@@ -39,7 +39,7 @@
#include <linux/vmalloc.h>
#include <linux/highmem.h>
#include <linux/io.h>
-#include <linux/uio.h>
+#include <linux/aio.h>
#include <linux/jiffies.h>
#include <asm/pgtable.h>
#include <linux/delay.h>
diff --git a/drivers/staging/android/logger.c b/drivers/staging/android/logger.c
index 1d5ed47..c79c101 100644
--- a/drivers/staging/android/logger.c
+++ b/drivers/staging/android/logger.c
@@ -28,6 +28,7 @@
#include <linux/slab.h>
#include <linux/time.h>
#include <linux/vmalloc.h>
+#include <linux/aio.h>
#include "logger.h"
#include <asm/ioctls.h>
diff --git a/fs/9p/vfs_addr.c b/fs/9p/vfs_addr.c
index 0ad61c6..055562c 100644
--- a/fs/9p/vfs_addr.c
+++ b/fs/9p/vfs_addr.c
@@ -33,6 +33,7 @@
#include <linux/pagemap.h>
#include <linux/idr.h>
#include <linux/sched.h>
+#include <linux/aio.h>
#include <net/9p/9p.h>
#include <net/9p/client.h>
diff --git a/fs/afs/write.c b/fs/afs/write.c
index 9aa52d9..5151ea3 100644
--- a/fs/afs/write.c
+++ b/fs/afs/write.c
@@ -14,6 +14,7 @@
#include <linux/pagemap.h>
#include <linux/writeback.h>
#include <linux/pagevec.h>
+#include <linux/aio.h>
#include "internal.h"
static int afs_write_back_from_locked_page(struct afs_writeback *wb,
diff --git a/fs/block_dev.c b/fs/block_dev.c
index 1a1e5e3..e758dc4 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -27,6 +27,7 @@
#include <linux/namei.h>
#include <linux/log2.h>
#include <linux/cleancache.h>
+#include <linux/aio.h>
#include <asm/uaccess.h>
#include "internal.h"
diff --git a/fs/btrfs/file.c b/fs/btrfs/file.c
index 9ab1bed..166f386 100644
--- a/fs/btrfs/file.c
+++ b/fs/btrfs/file.c
@@ -30,6 +30,7 @@
#include <linux/statfs.h>
#include <linux/compat.h>
#include <linux/slab.h>
+#include <linux/aio.h>
#include "ctree.h"
#include "disk-io.h"
#include "transaction.h"
diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c
index 95542a1..0a7c785 100644
--- a/fs/btrfs/inode.c
+++ b/fs/btrfs/inode.c
@@ -39,6 +39,7 @@
#include <linux/slab.h>
#include <linux/ratelimit.h>
#include <linux/mount.h>
+#include <linux/aio.h>
#include "compat.h"
#include "ctree.h"
#include "disk-io.h"
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 5840d2a..74cee4e 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -7,6 +7,7 @@
#include <linux/mount.h>
#include <linux/namei.h>
#include <linux/writeback.h>
+#include <linux/aio.h>
#include "super.h"
#include "mds_client.h"
diff --git a/fs/compat.c b/fs/compat.c
index 015e1e1..6538684 100644
--- a/fs/compat.c
+++ b/fs/compat.c
@@ -48,6 +48,7 @@
#include <linux/fs_struct.h>
#include <linux/slab.h>
#include <linux/pagemap.h>
+#include <linux/aio.h>
#include <asm/uaccess.h>
#include <asm/mmu_context.h>
diff --git a/fs/direct-io.c b/fs/direct-io.c
index f86c720..1f45a5a 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -37,6 +37,7 @@
#include <linux/uio.h>
#include <linux/atomic.h>
#include <linux/prefetch.h>
+#include <linux/aio.h>
/*
* How many user pages to map in one call to get_user_pages(). This determines
diff --git a/fs/ecryptfs/file.c b/fs/ecryptfs/file.c
index d45ba45..fd04283 100644
--- a/fs/ecryptfs/file.c
+++ b/fs/ecryptfs/file.c
@@ -31,6 +31,7 @@
#include <linux/security.h>
#include <linux/compat.h>
#include <linux/fs_stack.h>
+#include <linux/aio.h>
#include "ecryptfs_kernel.h"
/**
diff --git a/fs/ext2/inode.c b/fs/ext2/inode.c
index 6363ac6..388e77d 100644
--- a/fs/ext2/inode.c
+++ b/fs/ext2/inode.c
@@ -31,6 +31,7 @@
#include <linux/mpage.h>
#include <linux/fiemap.h>
#include <linux/namei.h>
+#include <linux/aio.h>
#include "ext2.h"
#include "acl.h"
#include "xip.h"
diff --git a/fs/ext3/inode.c b/fs/ext3/inode.c
index 7e87e37..f99db41 100644
--- a/fs/ext3/inode.c
+++ b/fs/ext3/inode.c
@@ -27,6 +27,7 @@
#include <linux/writeback.h>
#include <linux/mpage.h>
#include <linux/namei.h>
+#include <linux/aio.h>
#include "ext3.h"
#include "xattr.h"
#include "acl.h"
diff --git a/fs/ext4/file.c b/fs/ext4/file.c
index bf3966b..246d693 100644
--- a/fs/ext4/file.c
+++ b/fs/ext4/file.c
@@ -24,6 +24,7 @@
#include <linux/mount.h>
#include <linux/path.h>
#include <linux/quotaops.h>
+#include <linux/aio.h>
#include "ext4.h"
#include "ext4_jbd2.h"
#include "xattr.h"
diff --git a/fs/ext4/indirect.c b/fs/ext4/indirect.c
index 792e388..dcfcb0d 100644
--- a/fs/ext4/indirect.c
+++ b/fs/ext4/indirect.c
@@ -20,6 +20,7 @@
* ([email protected]), 1993, 1998
*/
+#include <linux/aio.h>
#include "ext4_jbd2.h"
#include "truncate.h"
diff --git a/fs/ext4/inode.c b/fs/ext4/inode.c
index b3c243b..6e42b49 100644
--- a/fs/ext4/inode.c
+++ b/fs/ext4/inode.c
@@ -37,6 +37,7 @@
#include <linux/printk.h>
#include <linux/slab.h>
#include <linux/ratelimit.h>
+#include <linux/aio.h>
#include "ext4_jbd2.h"
#include "xattr.h"
diff --git a/fs/ext4/page-io.c b/fs/ext4/page-io.c
index 68e896e..c3b15e2 100644
--- a/fs/ext4/page-io.c
+++ b/fs/ext4/page-io.c
@@ -23,6 +23,7 @@
#include <linux/workqueue.h>
#include <linux/kernel.h>
#include <linux/slab.h>
+#include <linux/aio.h>
#include "ext4_jbd2.h"
#include "xattr.h"
diff --git a/fs/fat/inode.c b/fs/fat/inode.c
index 5bafaad..4dd0d8d 100644
--- a/fs/fat/inode.c
+++ b/fs/fat/inode.c
@@ -26,6 +26,7 @@
#include <linux/writeback.h>
#include <linux/log2.h>
#include <linux/hash.h>
+#include <linux/aio.h>
#include <asm/unaligned.h>
#include "fat.h"
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 8c23fa7..b4c3f56 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -19,6 +19,7 @@
#include <linux/pipe_fs_i.h>
#include <linux/swap.h>
#include <linux/splice.h>
+#include <linux/aio.h>
MODULE_ALIAS_MISCDEV(FUSE_MINOR);
MODULE_ALIAS("devname:fuse");
diff --git a/fs/fuse/file.c b/fs/fuse/file.c
index 78d2837..c65e75d 100644
--- a/fs/fuse/file.c
+++ b/fs/fuse/file.c
@@ -15,6 +15,7 @@
#include <linux/module.h>
#include <linux/compat.h>
#include <linux/swap.h>
+#include <linux/aio.h>
static const struct file_operations fuse_direct_io_file_operations;
diff --git a/fs/gfs2/aops.c b/fs/gfs2/aops.c
index 01c4975..5b8d824 100644
--- a/fs/gfs2/aops.c
+++ b/fs/gfs2/aops.c
@@ -20,6 +20,7 @@
#include <linux/swap.h>
#include <linux/gfs2_ondisk.h>
#include <linux/backing-dev.h>
+#include <linux/aio.h>
#include "gfs2.h"
#include "incore.h"
diff --git a/fs/gfs2/file.c b/fs/gfs2/file.c
index e056b4c..ac49082 100644
--- a/fs/gfs2/file.c
+++ b/fs/gfs2/file.c
@@ -25,6 +25,7 @@
#include <asm/uaccess.h>
#include <linux/dlm.h>
#include <linux/dlm_plock.h>
+#include <linux/aio.h>
#include "gfs2.h"
#include "incore.h"
diff --git a/fs/hfs/inode.c b/fs/hfs/inode.c
index 0b35903..8e4c003 100644
--- a/fs/hfs/inode.c
+++ b/fs/hfs/inode.c
@@ -14,6 +14,7 @@
#include <linux/pagemap.h>
#include <linux/mpage.h>
#include <linux/sched.h>
+#include <linux/aio.h>
#include "hfs_fs.h"
#include "btree.h"
diff --git a/fs/hfsplus/inode.c b/fs/hfsplus/inode.c
index 2172aa5..50ae88c 100644
--- a/fs/hfsplus/inode.c
+++ b/fs/hfsplus/inode.c
@@ -14,6 +14,7 @@
#include <linux/pagemap.h>
#include <linux/mpage.h>
#include <linux/sched.h>
+#include <linux/aio.h>
#include "hfsplus_fs.h"
#include "hfsplus_raw.h"
diff --git a/fs/jfs/inode.c b/fs/jfs/inode.c
index 4692bf3..a462b57 100644
--- a/fs/jfs/inode.c
+++ b/fs/jfs/inode.c
@@ -23,6 +23,7 @@
#include <linux/pagemap.h>
#include <linux/quotaops.h>
#include <linux/writeback.h>
+#include <linux/aio.h>
#include "jfs_incore.h"
#include "jfs_inode.h"
#include "jfs_filsys.h"
diff --git a/fs/nilfs2/inode.c b/fs/nilfs2/inode.c
index 4d31d2c..1e246df 100644
--- a/fs/nilfs2/inode.c
+++ b/fs/nilfs2/inode.c
@@ -25,7 +25,7 @@
#include <linux/gfp.h>
#include <linux/mpage.h>
#include <linux/writeback.h>
-#include <linux/uio.h>
+#include <linux/aio.h>
#include "nilfs.h"
#include "btnode.h"
#include "segment.h"
diff --git a/fs/ntfs/file.c b/fs/ntfs/file.c
index 1ecf464..1373f97 100644
--- a/fs/ntfs/file.c
+++ b/fs/ntfs/file.c
@@ -27,6 +27,7 @@
#include <linux/swap.h>
#include <linux/uio.h>
#include <linux/writeback.h>
+#include <linux/aio.h>
#include <asm/page.h>
#include <asm/uaccess.h>
diff --git a/fs/ntfs/inode.c b/fs/ntfs/inode.c
index 1d27331..e3afc6e 100644
--- a/fs/ntfs/inode.c
+++ b/fs/ntfs/inode.c
@@ -28,6 +28,7 @@
#include <linux/quotaops.h>
#include <linux/slab.h>
#include <linux/log2.h>
+#include <linux/aio.h>
#include "aops.h"
#include "attrib.h"
diff --git a/fs/ocfs2/aops.h b/fs/ocfs2/aops.h
index ffb2da3..f671e49 100644
--- a/fs/ocfs2/aops.h
+++ b/fs/ocfs2/aops.h
@@ -22,6 +22,8 @@
#ifndef OCFS2_AOPS_H
#define OCFS2_AOPS_H
+#include <linux/aio.h>
+
handle_t *ocfs2_start_walk_page_trans(struct inode *inode,
struct page *page,
unsigned from,
diff --git a/fs/ocfs2/inode.h b/fs/ocfs2/inode.h
index 88924a3..c765bdf 100644
--- a/fs/ocfs2/inode.h
+++ b/fs/ocfs2/inode.h
@@ -28,6 +28,8 @@
#include "extent_map.h"
+struct iocb;
+
/* OCFS2 Inode Private Data */
struct ocfs2_inode_info
{
diff --git a/fs/pipe.c b/fs/pipe.c
index bd3479d..dc86533 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -21,6 +21,7 @@
#include <linux/audit.h>
#include <linux/syscalls.h>
#include <linux/fcntl.h>
+#include <linux/aio.h>
#include <asm/uaccess.h>
#include <asm/ioctls.h>
diff --git a/fs/read_write.c b/fs/read_write.c
index 7347732..a088b00 100644
--- a/fs/read_write.c
+++ b/fs/read_write.c
@@ -15,6 +15,7 @@
#include <linux/syscalls.h>
#include <linux/pagemap.h>
#include <linux/splice.h>
+#include <linux/aio.h>
#include "read_write.h"
#include <asm/uaccess.h>
diff --git a/fs/reiserfs/inode.c b/fs/reiserfs/inode.c
index d83736f..c760aed 100644
--- a/fs/reiserfs/inode.c
+++ b/fs/reiserfs/inode.c
@@ -18,6 +18,7 @@
#include <linux/writeback.h>
#include <linux/quotaops.h>
#include <linux/swap.h>
+#include <linux/aio.h>
int reiserfs_commit_write(struct file *f, struct page *page,
unsigned from, unsigned to);
diff --git a/fs/ubifs/file.c b/fs/ubifs/file.c
index 5bc7781..1b337c7 100644
--- a/fs/ubifs/file.c
+++ b/fs/ubifs/file.c
@@ -50,6 +50,7 @@
*/
#include "ubifs.h"
+#include <linux/aio.h>
#include <linux/mount.h>
#include <linux/namei.h>
#include <linux/slab.h>
diff --git a/fs/udf/inode.c b/fs/udf/inode.c
index df88b95..7cd8448 100644
--- a/fs/udf/inode.c
+++ b/fs/udf/inode.c
@@ -38,6 +38,7 @@
#include <linux/slab.h>
#include <linux/crc-itu-t.h>
#include <linux/mpage.h>
+#include <linux/aio.h>
#include "udf_i.h"
#include "udf_sb.h"
diff --git a/fs/xfs/xfs_aops.c b/fs/xfs/xfs_aops.c
index e57e2da..d519030 100644
--- a/fs/xfs/xfs_aops.c
+++ b/fs/xfs/xfs_aops.c
@@ -31,6 +31,7 @@
#include "xfs_vnodeops.h"
#include "xfs_trace.h"
#include "xfs_bmap.h"
+#include <linux/aio.h>
#include <linux/gfp.h>
#include <linux/mpage.h>
#include <linux/pagevec.h>
diff --git a/fs/xfs/xfs_file.c b/fs/xfs/xfs_file.c
index aa473fa..46176a5 100644
--- a/fs/xfs/xfs_file.c
+++ b/fs/xfs/xfs_file.c
@@ -34,6 +34,7 @@
#include "xfs_ioctl.h"
#include "xfs_trace.h"
+#include <linux/aio.h>
#include <linux/dcache.h>
#include <linux/falloc.h>
#include <linux/pagevec.h>
diff --git a/include/linux/cgroup.h b/include/linux/cgroup.h
index f8a030c..33e158f 100644
--- a/include/linux/cgroup.h
+++ b/include/linux/cgroup.h
@@ -26,6 +26,7 @@ struct cgroup_subsys;
struct inode;
struct cgroup;
struct css_id;
+struct eventfd_ctx;
extern int cgroup_init_early(void);
extern int cgroup_init(void);
diff --git a/include/linux/sched.h b/include/linux/sched.h
index 0dd42a0..a04716d 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -345,8 +345,6 @@ struct user_namespace;
extern int sysctl_max_map_count;
-#include <linux/aio.h>
-
#ifdef CONFIG_MMU
extern void arch_pick_mmap_layout(struct mm_struct *mm);
extern unsigned long
diff --git a/kernel/fork.c b/kernel/fork.c
index 8b20ab7..1104945 100644
--- a/kernel/fork.c
+++ b/kernel/fork.c
@@ -70,6 +70,7 @@
#include <linux/khugepaged.h>
#include <linux/signalfd.h>
#include <linux/uprobes.h>
+#include <linux/aio.h>
#include <asm/pgtable.h>
#include <asm/pgalloc.h>
diff --git a/kernel/printk.c b/kernel/printk.c
index 2d607f4..3fce6e3 100644
--- a/kernel/printk.c
+++ b/kernel/printk.c
@@ -42,6 +42,7 @@
#include <linux/notifier.h>
#include <linux/rculist.h>
#include <linux/poll.h>
+#include <linux/uio.h>
#include <asm/uaccess.h>
diff --git a/kernel/ptrace.c b/kernel/ptrace.c
index 1f5e55d..3f08b89 100644
--- a/kernel/ptrace.c
+++ b/kernel/ptrace.c
@@ -24,6 +24,7 @@
#include <linux/regset.h>
#include <linux/hw_breakpoint.h>
#include <linux/cn_proc.h>
+#include <linux/uio.h>
static int ptrace_trapping_sleep_fn(void *flags)
diff --git a/mm/page_io.c b/mm/page_io.c
index 78eee32..c535d39 100644
--- a/mm/page_io.c
+++ b/mm/page_io.c
@@ -20,6 +20,7 @@
#include <linux/buffer_head.h>
#include <linux/writeback.h>
#include <linux/frontswap.h>
+#include <linux/aio.h>
#include <asm/pgtable.h>
static struct bio *get_swap_bio(gfp_t gfp_flags,
diff --git a/mm/shmem.c b/mm/shmem.c
index 89341b6..9647fb9 100644
--- a/mm/shmem.c
+++ b/mm/shmem.c
@@ -30,6 +30,7 @@
#include <linux/mm.h>
#include <linux/export.h>
#include <linux/swap.h>
+#include <linux/aio.h>
static struct vfsmount *shm_mnt;
diff --git a/mm/swap.c b/mm/swap.c
index 6310dc2..d0cb95e 100644
--- a/mm/swap.c
+++ b/mm/swap.c
@@ -30,6 +30,7 @@
#include <linux/backing-dev.h>
#include <linux/memcontrol.h>
#include <linux/gfp.h>
+#include <linux/uio.h>
#include "internal.h"
diff --git a/security/keys/internal.h b/security/keys/internal.h
index 8bbefc3..d4f1468 100644
--- a/security/keys/internal.h
+++ b/security/keys/internal.h
@@ -16,6 +16,8 @@
#include <linux/key-type.h>
#include <linux/task_work.h>
+struct iovec;
+
#ifdef __KDEBUG
#define kenter(FMT, ...) \
printk(KERN_DEBUG "==> %s("FMT")\n", __func__, ##__VA_ARGS__)
diff --git a/security/keys/keyctl.c b/security/keys/keyctl.c
index 5d34b4e..c664201 100644
--- a/security/keys/keyctl.c
+++ b/security/keys/keyctl.c
@@ -22,6 +22,7 @@
#include <linux/err.h>
#include <linux/vmalloc.h>
#include <linux/security.h>
+#include <linux/uio.h>
#include <asm/uaccess.h>
#include "internal.h"
diff --git a/sound/core/pcm_native.c b/sound/core/pcm_native.c
index f9ddecf..2cdd17a 100644
--- a/sound/core/pcm_native.c
+++ b/sound/core/pcm_native.c
@@ -25,7 +25,7 @@
#include <linux/slab.h>
#include <linux/time.h>
#include <linux/pm_qos.h>
-#include <linux/uio.h>
+#include <linux/aio.h>
#include <linux/dma-mapping.h>
#include <sound/core.h>
#include <sound/control.h>
--
1.7.12
This implements a refcount with similar semantics to
atomic_get()/atomic_dec_and_test(), that starts out as just an atomic_t
but dynamically switches to per cpu refcounting when the rate of
gets/puts becomes too high.
It also implements two stage shutdown, as we need it to tear down the
percpu counts. Before dropping the initial refcount, you must call
percpu_ref_kill(); this puts the refcount in "shutting down mode" and
switches back to a single atomic refcount with the appropriate barriers
(synchronize_rcu()).
It's also legal to call percpu_ref_kill() multiple times - it only
returns true once, so callers don't have to reimplement shutdown
synchronization.
For the sake of simplicity/efficiency, the heuristic is pretty simple -
it just switches to percpu refcounting if there are more than x gets
in one second (completely arbitrarily, 4096).
It'd be more correct to count the number of cache misses or something
else more profile driven, but doing so would require accessing the
shared ref twice per get - by just counting the number of gets(), we can
stick that counter in the high bits of the refcount and increment both
with a single atomic64_add(). But I expect this'll be good enough in
practice.
Signed-off-by: Kent Overstreet <[email protected]>
---
include/linux/percpu-refcount.h | 29 +++++++
lib/Makefile | 2 +-
lib/percpu-refcount.c | 164 ++++++++++++++++++++++++++++++++++++++++
3 files changed, 194 insertions(+), 1 deletion(-)
create mode 100644 include/linux/percpu-refcount.h
create mode 100644 lib/percpu-refcount.c
diff --git a/include/linux/percpu-refcount.h b/include/linux/percpu-refcount.h
new file mode 100644
index 0000000..1268010
--- /dev/null
+++ b/include/linux/percpu-refcount.h
@@ -0,0 +1,29 @@
+#ifndef _LINUX_PERCPU_REFCOUNT_H
+#define _LINUX_PERCPU_REFCOUNT_H
+
+#include <linux/atomic.h>
+#include <linux/percpu.h>
+
+struct percpu_ref {
+ atomic64_t count;
+ unsigned __percpu *pcpu_count;
+};
+
+void percpu_ref_init(struct percpu_ref *ref);
+void __percpu_ref_get(struct percpu_ref *ref, bool alloc);
+int percpu_ref_put(struct percpu_ref *ref);
+
+int percpu_ref_kill(struct percpu_ref *ref);
+int percpu_ref_dead(struct percpu_ref *ref);
+
+static inline void percpu_ref_get(struct percpu_ref *ref)
+{
+ __percpu_ref_get(ref, true);
+}
+
+static inline void percpu_ref_get_noalloc(struct percpu_ref *ref)
+{
+ __percpu_ref_get(ref, false);
+}
+
+#endif
diff --git a/lib/Makefile b/lib/Makefile
index 821a162..7cb276e 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -12,7 +12,7 @@ lib-y := ctype.o string.o vsprintf.o cmdline.o \
idr.o int_sqrt.o extable.o \
sha1.o md5.o irq_regs.o reciprocal_div.o argv_split.o \
proportions.o flex_proportions.o prio_heap.o ratelimit.o show_mem.o \
- is_single_threaded.o plist.o decompress.o
+ is_single_threaded.o plist.o decompress.o percpu-refcount.o
lib-$(CONFIG_MMU) += ioremap.o
lib-$(CONFIG_SMP) += cpumask.o
diff --git a/lib/percpu-refcount.c b/lib/percpu-refcount.c
new file mode 100644
index 0000000..522b2df
--- /dev/null
+++ b/lib/percpu-refcount.c
@@ -0,0 +1,164 @@
+#define pr_fmt(fmt) "%s: " fmt "\n", __func__
+
+#include <linux/kernel.h>
+#include <linux/percpu-refcount.h>
+#include <linux/rcupdate.h>
+
+#define PCPU_COUNT_BITS 50
+#define PCPU_COUNT_MASK ((1LL << PCPU_COUNT_BITS) - 1)
+
+#define PCPU_STATUS_BITS 2
+#define PCPU_STATUS_MASK ((1 << PCPU_STATUS_BITS) - 1)
+
+#define PCPU_REF_PTR 0
+#define PCPU_REF_NONE 1
+#define PCPU_REF_DYING 2
+#define PCPU_REF_DEAD 3
+
+#define REF_STATUS(count) ((unsigned long) count & PCPU_STATUS_MASK)
+
+void percpu_ref_init(struct percpu_ref *ref)
+{
+ unsigned long now = jiffies;
+
+ atomic64_set(&ref->count, 1);
+
+ now <<= PCPU_STATUS_BITS;
+ now |= PCPU_REF_NONE;
+
+ ref->pcpu_count = (void *) now;
+}
+
+static void percpu_ref_alloc(struct percpu_ref *ref, unsigned __user *pcpu_count)
+{
+ unsigned __percpu *new;
+ unsigned long last = (unsigned long) pcpu_count;
+ unsigned long now = jiffies;
+
+ now <<= PCPU_STATUS_BITS;
+ now |= PCPU_REF_NONE;
+
+ if (now - last <= HZ << PCPU_STATUS_BITS) {
+ rcu_read_unlock();
+ new = alloc_percpu(unsigned);
+ rcu_read_lock();
+
+ if (!new)
+ goto update_time;
+
+ BUG_ON(((unsigned long) new) & PCPU_STATUS_MASK);
+
+ if (cmpxchg(&ref->pcpu_count, pcpu_count, new) != pcpu_count)
+ free_percpu(new);
+ else
+ pr_debug("created");
+ } else {
+update_time: new = (void *) now;
+ cmpxchg(&ref->pcpu_count, pcpu_count, new);
+ }
+}
+
+void __percpu_ref_get(struct percpu_ref *ref, bool alloc)
+{
+ unsigned __percpu *pcpu_count;
+ uint64_t v;
+
+ pcpu_count = rcu_dereference(ref->pcpu_count);
+
+ if (REF_STATUS(pcpu_count) == PCPU_REF_PTR) {
+ __this_cpu_inc(*pcpu_count);
+ } else {
+ v = atomic64_add_return(1 + (1ULL << PCPU_COUNT_BITS),
+ &ref->count);
+
+ if (!(v >> PCPU_COUNT_BITS) &&
+ REF_STATUS(pcpu_count) == PCPU_REF_NONE && alloc)
+ percpu_ref_alloc(ref, pcpu_count);
+ }
+}
+
+int percpu_ref_put(struct percpu_ref *ref)
+{
+ unsigned __percpu *pcpu_count;
+ uint64_t v;
+ int ret = 0;
+
+ rcu_read_lock();
+
+ pcpu_count = rcu_dereference(ref->pcpu_count);
+
+ switch (REF_STATUS(pcpu_count)) {
+ case PCPU_REF_PTR:
+ __this_cpu_dec(*pcpu_count);
+ break;
+ case PCPU_REF_NONE:
+ case PCPU_REF_DYING:
+ atomic64_dec(&ref->count);
+ break;
+ case PCPU_REF_DEAD:
+ v = atomic64_dec_return(&ref->count);
+ v &= PCPU_COUNT_MASK;
+
+ ret = v == 0;
+ break;
+ }
+
+ rcu_read_unlock();
+
+ return ret;
+}
+
+int percpu_ref_kill(struct percpu_ref *ref)
+{
+ unsigned __percpu *old, *new, *pcpu_count = ref->pcpu_count;
+ unsigned long status;
+
+ do {
+ status = REF_STATUS(pcpu_count);
+
+ switch (status) {
+ case PCPU_REF_PTR:
+ new = (void *) PCPU_REF_DYING;
+ break;
+ case PCPU_REF_NONE:
+ new = (void *) PCPU_REF_DEAD;
+ break;
+ case PCPU_REF_DYING:
+ case PCPU_REF_DEAD:
+ return 0;
+ }
+
+ old = pcpu_count;
+ pcpu_count = cmpxchg(&ref->pcpu_count, old, new);
+ } while (pcpu_count != old);
+
+ if (status == PCPU_REF_PTR) {
+ unsigned count = 0, cpu;
+
+ synchronize_rcu();
+
+ for_each_possible_cpu(cpu)
+ count += *per_cpu_ptr(pcpu_count, cpu);
+
+ pr_debug("global %lli pcpu %i",
+ atomic64_read(&ref->count) & PCPU_COUNT_MASK,
+ (int) count);
+
+ atomic64_add((int) count, &ref->count);
+ smp_wmb();
+ /* Between setting global count and setting PCPU_REF_DEAD */
+ ref->pcpu_count = (void *) PCPU_REF_DEAD;
+
+ free_percpu(pcpu_count);
+ }
+
+ return 1;
+}
+
+int percpu_ref_dead(struct percpu_ref *ref)
+{
+ unsigned status = REF_STATUS(ref->pcpu_count);
+
+ return status == PCPU_REF_DYING ||
+ status == PCPU_REF_DEAD;
+}
--
1.7.12
Previously, aio_read_event() pulled a single completion off the
ringbuffer at a time, locking and unlocking each time.
Changed it to pull off as many events as it can at a time, and copy them
directly to userspace. The easiest way to do this was by making it
lockless, since we can't copy_to_user() with a spinlock held.
We can't use cmpxchg() on the ring buffer's head pointer directly, since
it's modded to nr_events and would be susceptible to ABA. So instead we
maintain a shadow head that uses the full 32 bits, and cmpxchg() that
and then updated the real head pointer.
This also fixes a bug where if copying the event to userspace failed,
we'd lose the event.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 173 +++++++++++++++++++++++++++++----------------------------------
1 file changed, 80 insertions(+), 93 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 7961f60..de255d8 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -63,7 +63,7 @@ struct aio_ring_info {
unsigned long mmap_size;
struct page **ring_pages;
- spinlock_t ring_lock;
+ struct mutex ring_lock;
long nr_pages;
unsigned nr, tail;
@@ -324,7 +324,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_set(&ctx->users, 2);
spin_lock_init(&ctx->ctx_lock);
- spin_lock_init(&ctx->ring_info.ring_lock);
+ mutex_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);
INIT_LIST_HEAD(&ctx->active_reqs);
@@ -704,86 +704,86 @@ put_rq:
}
EXPORT_SYMBOL(aio_complete);
-/* aio_read_evt
- * Pull an event off of the ioctx's event ring. Returns the number of
- * events fetched (0 or 1 ;-)
- * FIXME: make this use cmpxchg.
- * TODO: make the ringbuffer user mmap()able (requires FIXME).
+/* aio_read_events
+ * Pull an event off of the ioctx's event ring. Returns the number of
+ * events fetched
*/
-static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
+static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
+ long nr)
{
- struct aio_ring_info *info = &ioctx->ring_info;
+ struct aio_ring_info *info = &ctx->ring_info;
struct aio_ring *ring;
- unsigned long head;
- int ret = 0;
+ unsigned head, pos;
+ int ret = 0, copy_ret;
+
+ mutex_lock(&info->ring_lock);
ring = kmap_atomic(info->ring_pages[0]);
- pr_debug("h%u t%u m%u\n", ring->head, ring->tail, ring->nr);
+ head = ring->head;
+ kunmap_atomic(ring);
- if (ring->head == ring->tail)
- goto out;
+ pr_debug("h%u t%u m%u\n", head, info->tail, info->nr);
+
+ while (ret < nr) {
+ unsigned i = (head < info->tail ? info->tail : info->nr) - head;
+ struct io_event *ev;
+ struct page *page;
+
+ if (head == info->tail)
+ break;
+
+ i = min_t(int, i, nr - ret);
+ i = min_t(int, i, AIO_EVENTS_PER_PAGE -
+ ((head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE));
+
+ pos = head + AIO_EVENTS_OFFSET;
+ page = info->ring_pages[pos / AIO_EVENTS_PER_PAGE];
+ pos %= AIO_EVENTS_PER_PAGE;
+
+ ev = kmap(page);
+ copy_ret = copy_to_user(event + ret, ev + pos, sizeof(*ev) * i);
+ kunmap(page);
+
+ if (unlikely(copy_ret)) {
+ ret = -EFAULT;
+ goto out;
+ }
- spin_lock(&info->ring_lock);
-
- head = ring->head % info->nr;
- if (head != ring->tail) {
- struct io_event *evp = aio_ring_event(info, head);
- *ent = *evp;
- head = (head + 1) % info->nr;
- smp_mb(); /* finish reading the event before updatng the head */
- ring->head = head;
- ret = 1;
- put_aio_ring_event(evp);
+ ret += i;
+ head += i;
+ head %= info->nr;
}
- spin_unlock(&info->ring_lock);
-out:
+ smp_mb(); /* finish reading the event before updatng the head */
+
+ ring = kmap_atomic(info->ring_pages[0]);
+ ring->head = head;
kunmap_atomic(ring);
- pr_debug("%d h%u t%u\n", ret, ring->head, ring->tail);
+
+ pr_debug("%d h%u t%u\n", ret, head, info->tail);
+out:
+ mutex_unlock(&info->ring_lock);
+
return ret;
}
static int read_events(struct kioctx *ctx,
- long min_nr, long nr,
- struct io_event __user *event,
- struct timespec __user *timeout)
+ long min_nr, long nr,
+ struct io_event __user *event,
+ struct timespec __user *timeout)
{
DEFINE_WAIT(wait);
struct hrtimer_sleeper t;
size_t i = 0;
int ret;
- struct io_event ent;
-
- /* needed to zero any padding within an entry (there shouldn't be
- * any, but C is fun!
- */
- memset(&ent, 0, sizeof(ent));
- ret = 0;
- while (likely(i < nr)) {
- ret = aio_read_evt(ctx, &ent);
- if (unlikely(ret <= 0))
- break;
- pr_debug("%Lx %Lx %Lx %Lx\n",
- ent.data, ent.obj, ent.res, ent.res2);
-
- /* Could we split the check in two? */
- ret = -EFAULT;
- if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- pr_debug("lost an event due to EFAULT.\n");
- break;
- }
- ret = 0;
-
- /* Good, event copied to userland, update counts. */
- event ++;
- i ++;
- }
+ ret = aio_read_events(ctx, event + i, nr - i);
+ if (ret < 0)
+ return ret;
- if (min_nr <= i)
+ i += ret;
+ if (i >= min_nr)
return i;
- if (ret)
- return ret;
/* End fast path */
@@ -802,47 +802,34 @@ static int read_events(struct kioctx *ctx,
current->timer_slack_ns, HRTIMER_MODE_REL);
}
- while (likely(i < nr)) {
+ while (i < nr) {
prepare_to_wait_exclusive(&ctx->wait, &wait,
TASK_INTERRUPTIBLE);
- do {
- ret = aio_read_evt(ctx, &ent);
- if (ret)
- break;
- if (min_nr <= i)
- break;
- if (unlikely(atomic_read(&ctx->dead))) {
- ret = -EINVAL;
- break;
- }
- if (!t.task) /* Only check after read evt */
- break;
- /* Try to only show up in io wait if there are ops
- * in flight */
- if (atomic_read(&ctx->reqs_active))
- io_schedule();
- else
- schedule();
- if (signal_pending(current)) {
- ret = -EINTR;
- break;
- }
- /*ret = aio_read_evt(ctx, &ent);*/
- } while (1) ;
-
- if (unlikely(ret <= 0))
+ ret = aio_read_events(ctx, event + i, nr - i);
+ if (ret < 0)
break;
- ret = -EFAULT;
- if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- pr_debug("lost an event due to EFAULT.\n");
+ i += ret;
+ if (i >= min_nr)
+ break;
+ if (unlikely(atomic_read(&ctx->dead))) {
+ ret = -EINVAL;
break;
}
+ if (!t.task) /* Only check after read evt */
+ break;
+
+ /* Try to only show up in io wait if there are ops in flight */
+ if (atomic_read(&ctx->reqs_active))
+ io_schedule();
+ else
+ schedule();
- /* Good, event copied to userland, update counts. */
- event ++;
- i ++;
+ if (signal_pending(current)) {
+ ret = -EINTR;
+ break;
+ }
}
out:
hrtimer_cancel(&t.timer);
--
1.7.12
See the previous patch for why we want to do this - this basically
implements a per cpu allocator for reqs_available that doesn't actually
allocate anything.
Note that we need to increase the size of the ringbuffer we allocate,
since a single thread won't necessarily be able to use all the
reqs_available slots - some (up to about half) might be on other per cpu
lists, unavailable for the current thread.
We size the ringbuffer based on the nr_events userspace passed to
io_setup(), so this is a slight behaviour change - but nr_events wasn't
being used as a hard limit before, it was being rounded up to the next
page before so this doesn't change the actual semantics.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 85 insertions(+), 12 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index e6f29dc..94218b7 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -26,6 +26,7 @@
#include <linux/mm.h>
#include <linux/mman.h>
#include <linux/mmu_context.h>
+#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/timer.h>
#include <linux/aio.h>
@@ -59,6 +60,10 @@ struct aio_ring {
#define AIO_RING_PAGES 8
+struct kioctx_cpu {
+ unsigned reqs_available;
+};
+
struct kioctx {
atomic_t users;
atomic_t dead;
@@ -67,6 +72,10 @@ struct kioctx {
unsigned long user_id;
struct hlist_node list;
+ struct __percpu kioctx_cpu *cpu;
+
+ unsigned req_batch;
+
unsigned nr;
/* sys_io_setup currently limits this to an unsigned int */
@@ -150,6 +159,9 @@ static int aio_setup_ring(struct kioctx *ctx)
unsigned long size;
int nr_pages;
+ nr_events = max(nr_events, num_possible_cpus() * 4);
+ nr_events *= 2;
+
/* Compensate for the ring buffer's head/tail overlap entry */
nr_events += 2; /* 1 is required, 2 for good luck */
@@ -283,7 +295,7 @@ static void free_ioctx(struct kioctx *ctx)
{
struct aio_ring *ring;
struct io_event res;
- unsigned head, avail;
+ unsigned cpu, head, avail;
spin_lock_irq(&ctx->ctx_lock);
@@ -297,6 +309,13 @@ static void free_ioctx(struct kioctx *ctx)
spin_unlock_irq(&ctx->ctx_lock);
+ for_each_possible_cpu(cpu) {
+ struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu);
+
+ atomic_add(kcpu->reqs_available, &ctx->reqs_available);
+ kcpu->reqs_available = 0;
+ }
+
ring = kmap_atomic(ctx->ring_pages[0]);
head = ring->head;
kunmap_atomic(ring);
@@ -323,6 +342,7 @@ static void free_ioctx(struct kioctx *ctx)
synchronize_rcu();
pr_debug("freeing %p\n", ctx);
+ free_percpu(ctx->cpu);
kmem_cache_free(kioctx_cachep, ctx);
}
@@ -365,10 +385,16 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
INIT_LIST_HEAD(&ctx->active_reqs);
- if (aio_setup_ring(ctx) < 0)
+ ctx->cpu = alloc_percpu(struct kioctx_cpu);
+ if (!ctx->cpu)
goto out_freectx;
+ if (aio_setup_ring(ctx) < 0)
+ goto out_freepcpu;
+
atomic_set(&ctx->reqs_available, ctx->nr);
+ ctx->req_batch = ctx->nr / (num_possible_cpus() * 4);
+ BUG_ON(!ctx->req_batch);
/* limit the number of system wide aios */
spin_lock(&aio_nr_lock);
@@ -392,6 +418,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
out_cleanup:
err = -EAGAIN;
aio_free_ring(ctx);
+out_freepcpu:
+ free_percpu(ctx->cpu);
out_freectx:
kmem_cache_free(kioctx_cachep, ctx);
pr_debug("error allocating ioctx %d\n", err);
@@ -464,6 +492,52 @@ void exit_aio(struct mm_struct *mm)
}
}
+static void put_reqs_available(struct kioctx *ctx, unsigned nr)
+{
+ struct kioctx_cpu *kcpu;
+
+ preempt_disable();
+ kcpu = this_cpu_ptr(ctx->cpu);
+
+ kcpu->reqs_available += nr;
+ while (kcpu->reqs_available >= ctx->req_batch * 2) {
+ kcpu->reqs_available -= ctx->req_batch;
+ atomic_add(ctx->req_batch, &ctx->reqs_available);
+ }
+
+ preempt_enable();
+}
+
+static bool get_reqs_available(struct kioctx *ctx)
+{
+ struct kioctx_cpu *kcpu;
+ bool ret = false;
+
+ preempt_disable();
+ kcpu = this_cpu_ptr(ctx->cpu);
+
+ if (!kcpu->reqs_available) {
+ int old, avail = atomic_read(&ctx->reqs_available);
+
+ do {
+ if (avail < ctx->req_batch)
+ goto out;
+
+ old = avail;
+ avail = atomic_cmpxchg(&ctx->reqs_available,
+ avail, avail - ctx->req_batch);
+ } while (avail != old);
+
+ kcpu->reqs_available += ctx->req_batch;
+ }
+
+ ret = true;
+ kcpu->reqs_available--;
+out:
+ preempt_enable();
+ return ret;
+}
+
/* aio_get_req
* Allocate a slot for an aio request. Increments the ki_users count
* of the kioctx so that the kioctx stays around until all requests are
@@ -478,7 +552,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
struct kiocb *req;
- if (atomic_dec_if_positive(&ctx->reqs_available) <= 0)
+ if (!get_reqs_available(ctx))
return NULL;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
@@ -487,10 +561,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
atomic_set(&req->ki_users, 2);
req->ki_ctx = ctx;
-
return req;
out_put:
- atomic_inc(&ctx->reqs_available);
+ put_reqs_available(ctx, 1);
return NULL;
}
@@ -581,6 +654,10 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
* when the event got cancelled.
*/
if (test_and_set_bit(KIF_CANCELLED, &iocb->ki_flags)) {
+ /*
+ * Can't use the percpu reqs_available here - could race with
+ * free_ioctx()
+ */
atomic_inc(&ctx->reqs_available);
/* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
@@ -706,7 +783,7 @@ static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
ring->head = head;
kunmap_atomic(ring);
- atomic_add(ret, &ctx->reqs_available);
+ put_reqs_available(ctx, ret);
pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
out:
@@ -768,11 +845,7 @@ static int read_events(struct kioctx *ctx,
if (!t.task) /* Only check after read evt */
break;
- /* Try to only show up in io wait if there are ops in flight */
- if (atomic_read(&ctx->reqs_available) != ctx->nr)
- io_schedule();
- else
- schedule();
+ io_schedule();
if (signal_pending(current)) {
ret = -EINTR;
@@ -1160,7 +1233,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0;
out_put_req:
- atomic_inc(&ctx->reqs_available);
+ put_reqs_available(ctx, 1);
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
--
1.7.12
struct aio_ring_info was kind of odd, the only place it's used is where
it's embedded in struct kioctx - there's no real need for it.
The next patch rearranges struct kioctx and puts various things on their
own cachelines - getting rid of struct aio_ring_info now makes that
reordering a bit clearer.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 142 ++++++++++++++++++++++++++++++---------------------------------
1 file changed, 68 insertions(+), 74 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index f2d616e..1ff4d3b 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -58,18 +58,6 @@ struct aio_ring {
}; /* 128 bytes + ring size */
#define AIO_RING_PAGES 8
-struct aio_ring_info {
- unsigned long mmap_base;
- unsigned long mmap_size;
-
- struct page **ring_pages;
- struct mutex ring_lock;
- long nr_pages;
-
- unsigned nr, tail;
-
- struct page *internal_pages[AIO_RING_PAGES];
-};
struct kioctx {
atomic_t users;
@@ -86,12 +74,28 @@ struct kioctx {
atomic_t reqs_active;
struct list_head active_reqs; /* used for cancellation */
+ unsigned nr;
+
/* sys_io_setup currently limits this to an unsigned int */
unsigned max_reqs;
- struct aio_ring_info ring_info;
+ unsigned long mmap_base;
+ unsigned long mmap_size;
+
+ struct page **ring_pages;
+ long nr_pages;
+
+ struct {
+ struct mutex ring_lock;
+ } ____cacheline_aligned;
+
+ struct {
+ unsigned tail;
+ spinlock_t completion_lock;
+ } ____cacheline_aligned;
+
+ struct page *internal_pages[AIO_RING_PAGES];
- spinlock_t completion_lock;
};
/*------ sysctl variables----*/
@@ -120,26 +124,21 @@ __initcall(aio_setup);
static void aio_free_ring(struct kioctx *ctx)
{
- struct aio_ring_info *info = &ctx->ring_info;
long i;
- for (i=0; i<info->nr_pages; i++)
- put_page(info->ring_pages[i]);
+ for (i = 0; i < ctx->nr_pages; i++)
+ put_page(ctx->ring_pages[i]);
- if (info->mmap_size) {
- vm_munmap(info->mmap_base, info->mmap_size);
- }
+ if (ctx->mmap_size)
+ vm_munmap(ctx->mmap_base, ctx->mmap_size);
- if (info->ring_pages && info->ring_pages != info->internal_pages)
- kfree(info->ring_pages);
- info->ring_pages = NULL;
- info->nr = 0;
+ if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages)
+ kfree(ctx->ring_pages);
}
static int aio_setup_ring(struct kioctx *ctx)
{
struct aio_ring *ring;
- struct aio_ring_info *info = &ctx->ring_info;
unsigned nr_events = ctx->max_reqs;
struct mm_struct *mm = current->mm;
unsigned long size;
@@ -157,42 +156,42 @@ static int aio_setup_ring(struct kioctx *ctx)
nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
- info->nr = 0;
- info->ring_pages = info->internal_pages;
+ ctx->nr = 0;
+ ctx->ring_pages = ctx->internal_pages;
if (nr_pages > AIO_RING_PAGES) {
- info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
- if (!info->ring_pages)
+ ctx->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
+ if (!ctx->ring_pages)
return -ENOMEM;
}
- info->mmap_size = nr_pages * PAGE_SIZE;
- pr_debug("attempting mmap of %lu bytes\n", info->mmap_size);
+ ctx->mmap_size = nr_pages * PAGE_SIZE;
+ pr_debug("attempting mmap of %lu bytes\n", ctx->mmap_size);
down_write(&mm->mmap_sem);
- info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size,
- PROT_READ|PROT_WRITE,
- MAP_ANONYMOUS|MAP_PRIVATE, 0);
- if (IS_ERR((void *)info->mmap_base)) {
+ ctx->mmap_base = do_mmap_pgoff(NULL, 0, ctx->mmap_size,
+ PROT_READ|PROT_WRITE,
+ MAP_ANONYMOUS|MAP_PRIVATE, 0);
+ if (IS_ERR((void *)ctx->mmap_base)) {
up_write(&mm->mmap_sem);
- info->mmap_size = 0;
+ ctx->mmap_size = 0;
aio_free_ring(ctx);
return -EAGAIN;
}
- pr_debug("mmap address: 0x%08lx\n", info->mmap_base);
- info->nr_pages = get_user_pages(current, mm, info->mmap_base, nr_pages,
- 1, 0, info->ring_pages, NULL);
+ pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base);
+ ctx->nr_pages = get_user_pages(current, mm, ctx->mmap_base, nr_pages,
+ 1, 0, ctx->ring_pages, NULL);
up_write(&mm->mmap_sem);
- if (unlikely(info->nr_pages != nr_pages)) {
+ if (unlikely(ctx->nr_pages != nr_pages)) {
aio_free_ring(ctx);
return -EAGAIN;
}
- ctx->user_id = info->mmap_base;
+ ctx->user_id = ctx->mmap_base;
- info->nr = nr_events; /* trusted copy */
+ ctx->nr = nr_events; /* trusted copy */
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
ring->nr = nr_events; /* user copy */
ring->id = ctx->user_id;
ring->head = ring->tail = 0;
@@ -213,11 +212,11 @@ static int aio_setup_ring(struct kioctx *ctx)
#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
-#define aio_ring_event(info, nr) ({ \
+#define aio_ring_event(ctx, nr) ({ \
unsigned pos = (nr) + AIO_EVENTS_OFFSET; \
struct io_event *__event; \
__event = kmap_atomic( \
- (info)->ring_pages[pos / AIO_EVENTS_PER_PAGE]); \
+ (ctx)->ring_pages[pos / AIO_EVENTS_PER_PAGE]); \
__event += pos % AIO_EVENTS_PER_PAGE; \
__event; \
})
@@ -276,7 +275,6 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
*/
static void free_ioctx(struct kioctx *ctx)
{
- struct aio_ring_info *info = &ctx->ring_info;
struct aio_ring *ring;
struct io_event res;
unsigned head, avail;
@@ -293,18 +291,18 @@ static void free_ioctx(struct kioctx *ctx)
spin_unlock_irq(&ctx->ctx_lock);
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
head = ring->head;
kunmap_atomic(ring);
while (atomic_read(&ctx->reqs_active) > 0) {
- wait_event(ctx->wait, head != info->tail);
+ wait_event(ctx->wait, head != ctx->tail);
- avail = (head < info->tail ? info->tail : info->nr) - head;
+ avail = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
atomic_sub(avail, &ctx->reqs_active);
head += avail;
- head %= info->nr;
+ head %= ctx->nr;
}
WARN_ON(atomic_read(&ctx->reqs_active) < 0);
@@ -356,7 +354,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_set(&ctx->users, 2);
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->completion_lock);
- mutex_init(&ctx->ring_info.ring_lock);
+ mutex_init(&ctx->ring_lock);
init_waitqueue_head(&ctx->wait);
INIT_LIST_HEAD(&ctx->active_reqs);
@@ -380,7 +378,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
spin_unlock(&mm->ioctx_lock);
pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
- ctx, ctx->user_id, mm, ctx->ring_info.nr);
+ ctx, ctx->user_id, mm, ctx->nr);
return ctx;
out_cleanup:
@@ -453,7 +451,7 @@ void exit_aio(struct mm_struct *mm)
* just set it to 0; aio_free_ring() is the only
* place that uses ->mmap_size, so it's safe.
*/
- ctx->ring_info.mmap_size = 0;
+ ctx->mmap_size = 0;
kill_ioctx(ctx);
}
}
@@ -472,10 +470,10 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
struct kiocb *req;
- if (atomic_read(&ctx->reqs_active) >= ctx->ring_info.nr)
+ if (atomic_read(&ctx->reqs_active) >= ctx->nr)
return NULL;
- if (atomic_inc_return(&ctx->reqs_active) > ctx->ring_info.nr)
+ if (atomic_inc_return(&ctx->reqs_active) > ctx->nr)
goto out_put;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
@@ -539,7 +537,6 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
void aio_complete(struct kiocb *iocb, long res, long res2)
{
struct kioctx *ctx = iocb->ki_ctx;
- struct aio_ring_info *info;
struct aio_ring *ring;
struct io_event *event;
unsigned long flags;
@@ -560,8 +557,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
return;
}
- info = &ctx->ring_info;
-
/*
* Take rcu_read_lock() in case the kioctx is being destroyed, as we
* need to issue a wakeup after decrementing reqs_active.
@@ -593,11 +588,11 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
spin_lock_irqsave(&ctx->completion_lock, flags);
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
- tail = info->tail;
- event = aio_ring_event(info, tail);
- if (++tail >= info->nr)
+ tail = ctx->tail;
+ event = aio_ring_event(ctx, tail);
+ if (++tail >= ctx->nr)
tail = 0;
event->obj = (u64)(unsigned long)iocb->ki_obj.user;
@@ -614,7 +609,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
smp_wmb(); /* make event visible before updating tail */
- info->tail = tail;
+ ctx->tail = tail;
ring->tail = tail;
put_aio_ring_event(event);
@@ -658,25 +653,24 @@ EXPORT_SYMBOL(aio_complete);
static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
long nr)
{
- struct aio_ring_info *info = &ctx->ring_info;
struct aio_ring *ring;
unsigned head, pos;
int ret = 0, copy_ret;
- mutex_lock(&info->ring_lock);
+ mutex_lock(&ctx->ring_lock);
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
head = ring->head;
kunmap_atomic(ring);
- pr_debug("h%u t%u m%u\n", head, info->tail, info->nr);
+ pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr);
while (ret < nr) {
- unsigned i = (head < info->tail ? info->tail : info->nr) - head;
+ unsigned i = (head < ctx->tail ? ctx->tail : ctx->nr) - head;
struct io_event *ev;
struct page *page;
- if (head == info->tail)
+ if (head == ctx->tail)
break;
i = min_t(int, i, nr - ret);
@@ -684,7 +678,7 @@ static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
((head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE));
pos = head + AIO_EVENTS_OFFSET;
- page = info->ring_pages[pos / AIO_EVENTS_PER_PAGE];
+ page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE];
pos %= AIO_EVENTS_PER_PAGE;
ev = kmap(page);
@@ -698,20 +692,20 @@ static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
ret += i;
head += i;
- head %= info->nr;
+ head %= ctx->nr;
}
smp_mb(); /* finish reading the event before updatng the head */
- ring = kmap_atomic(info->ring_pages[0]);
+ ring = kmap_atomic(ctx->ring_pages[0]);
ring->head = head;
kunmap_atomic(ring);
atomic_sub(ret, &ctx->reqs_active);
- pr_debug("%d h%u t%u\n", ret, head, info->tail);
+ pr_debug("%d h%u t%u\n", ret, head, ctx->tail);
out:
- mutex_unlock(&info->ring_lock);
+ mutex_unlock(&ctx->ring_lock);
return ret;
}
--
1.7.12
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 1ff4d3b..7dee3aa 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -67,13 +67,6 @@ struct kioctx {
unsigned long user_id;
struct hlist_node list;
- wait_queue_head_t wait;
-
- spinlock_t ctx_lock;
-
- atomic_t reqs_active;
- struct list_head active_reqs; /* used for cancellation */
-
unsigned nr;
/* sys_io_setup currently limits this to an unsigned int */
@@ -86,6 +79,15 @@ struct kioctx {
long nr_pages;
struct {
+ atomic_t reqs_active;
+ } ____cacheline_aligned;
+
+ struct {
+ spinlock_t ctx_lock;
+ struct list_head active_reqs; /* used for cancellation */
+ } ____cacheline_aligned;
+
+ struct {
struct mutex ring_lock;
} ____cacheline_aligned;
@@ -94,6 +96,10 @@ struct kioctx {
spinlock_t completion_lock;
} ____cacheline_aligned;
+ struct {
+ wait_queue_head_t wait;
+ } ____cacheline_aligned;
+
struct page *internal_pages[AIO_RING_PAGES];
};
--
1.7.12
The usage of ctx->dead was fubar - it makes no sense to explicitly
check it all over the place, especially when we're already using RCU.
Now, ctx->dead only indicates whether we've dropped the initial
refcount. The new teardown sequence is:
set ctx->dead
hlist_del_rcu();
synchronize_rcu();
Now we know no system calls can take a new ref, and it's safe to drop
the initial ref:
put_ioctx();
We also need to ensure there are no more outstanding kiocbs. This was
done incorrectly - it was being done in kill_ctx(), and before dropping
the initial refcount. At this point, other syscalls may still be
submitting kiocbs!
Now, we cancel and wait for outstanding kiocbs in free_ioctx(), after
kioctx->users has dropped to 0 and we know no more iocbs could be
submitted.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 236 ++++++++++++++++++++++-----------------------------------------
1 file changed, 80 insertions(+), 156 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 9a60f13..9d27508 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -79,7 +79,7 @@ static inline unsigned aio_ring_avail(struct aio_ring_info *info,
struct kioctx {
atomic_t users;
- int dead;
+ atomic_t dead;
/* This needs improving */
unsigned long user_id;
@@ -96,8 +96,6 @@ struct kioctx {
unsigned max_reqs;
struct aio_ring_info ring_info;
-
- struct rcu_head rcu_head;
};
/*------ sysctl variables----*/
@@ -234,44 +232,6 @@ static int aio_setup_ring(struct kioctx *ctx)
kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
} while(0)
-static void ctx_rcu_free(struct rcu_head *head)
-{
- struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
- kmem_cache_free(kioctx_cachep, ctx);
-}
-
-/* __put_ioctx
- * Called when the last user of an aio context has gone away,
- * and the struct needs to be freed.
- */
-static void __put_ioctx(struct kioctx *ctx)
-{
- unsigned nr_events = ctx->max_reqs;
- BUG_ON(atomic_read(&ctx->reqs_active));
-
- aio_free_ring(ctx);
- if (nr_events) {
- spin_lock(&aio_nr_lock);
- BUG_ON(aio_nr - nr_events > aio_nr);
- aio_nr -= nr_events;
- spin_unlock(&aio_nr_lock);
- }
- pr_debug("freeing %p\n", ctx);
- call_rcu(&ctx->rcu_head, ctx_rcu_free);
-}
-
-static inline int try_get_ioctx(struct kioctx *kioctx)
-{
- return atomic_inc_not_zero(&kioctx->users);
-}
-
-static inline void put_ioctx(struct kioctx *kioctx)
-{
- BUG_ON(atomic_read(&kioctx->users) <= 0);
- if (unlikely(atomic_dec_and_test(&kioctx->users)))
- __put_ioctx(kioctx);
-}
-
static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
struct io_event *res)
{
@@ -295,6 +255,48 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
return ret;
}
+/*
+ * When this function runs, the kioctx has been removed from the "hash table"
+ * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
+ * now it's safe to cancel any that need to be.
+ */
+static void free_ioctx(struct kioctx *ctx)
+{
+ struct io_event res;
+
+ spin_lock_irq(&ctx->ctx_lock);
+
+ while (!list_empty(&ctx->active_reqs)) {
+ struct list_head *pos = ctx->active_reqs.next;
+ struct kiocb *iocb = list_kiocb(pos);
+ list_del_init(&iocb->ki_list);
+
+ kiocb_cancel(ctx, iocb, &res);
+ }
+
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ wait_event(ctx->wait, !atomic_read(&ctx->reqs_active));
+
+ aio_free_ring(ctx);
+
+ spin_lock(&aio_nr_lock);
+ BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
+ aio_nr -= ctx->max_reqs;
+ spin_unlock(&aio_nr_lock);
+
+ synchronize_rcu();
+
+ pr_debug("freeing %p\n", ctx);
+ kmem_cache_free(kioctx_cachep, ctx);
+}
+
+static void put_ioctx(struct kioctx *ctx)
+{
+ if (unlikely(atomic_dec_and_test(&ctx->users)))
+ free_ioctx(ctx);
+}
+
/* ioctx_alloc
* Allocates and initializes an ioctx. Returns an ERR_PTR if it failed.
*/
@@ -358,43 +360,21 @@ out_freectx:
return ERR_PTR(err);
}
-/* kill_ctx
- * Cancels all outstanding aio requests on an aio context. Used
- * when the processes owning a context have all exited to encourage
+/* kill_ioctx
+ * Cancels all outstanding aio requests on an aio context. Used
+ * when the processes owning a context have all exited to encourage
* the rapid destruction of the kioctx.
*/
-static void kill_ctx(struct kioctx *ctx)
+static void kill_ioctx(struct kioctx *ctx)
{
- struct task_struct *tsk = current;
- DECLARE_WAITQUEUE(wait, tsk);
- struct io_event res;
-
- spin_lock_irq(&ctx->ctx_lock);
- ctx->dead = 1;
- while (!list_empty(&ctx->active_reqs)) {
- struct list_head *pos = ctx->active_reqs.next;
- struct kiocb *iocb = list_kiocb(pos);
- list_del_init(&iocb->ki_list);
-
- kiocb_cancel(ctx, iocb, &res);
- }
+ if (!atomic_xchg(&ctx->dead, 1)) {
+ hlist_del_rcu(&ctx->list);
+ synchronize_rcu();
- if (!atomic_read(&ctx->reqs_active))
- goto out;
+ wake_up_all(&ctx->wait);
- add_wait_queue(&ctx->wait, &wait);
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- while (atomic_read(&ctx->reqs_active)) {
- spin_unlock_irq(&ctx->ctx_lock);
- io_schedule();
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- spin_lock_irq(&ctx->ctx_lock);
+ put_ioctx(ctx);
}
- __set_task_state(tsk, TASK_RUNNING);
- remove_wait_queue(&ctx->wait, &wait);
-
-out:
- spin_unlock_irq(&ctx->ctx_lock);
}
/* wait_on_sync_kiocb:
@@ -413,27 +393,25 @@ ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
}
EXPORT_SYMBOL(wait_on_sync_kiocb);
-/* exit_aio: called when the last user of mm goes away. At this point,
- * there is no way for any new requests to be submited or any of the
- * io_* syscalls to be called on the context. However, there may be
- * outstanding requests which hold references to the context; as they
- * go away, they will call put_ioctx and release any pinned memory
- * associated with the request (held via struct page * references).
+/*
+ * exit_aio: called when the last user of mm goes away. At this point, there is
+ * no way for any new requests to be submited or any of the io_* syscalls to be
+ * called on the context.
+ *
+ * There may be outstanding kiocbs, but free_ioctx() will explicitly wait on
+ * them.
*/
void exit_aio(struct mm_struct *mm)
{
struct kioctx *ctx;
+ struct hlist_node *p, *n;
- while (!hlist_empty(&mm->ioctx_list)) {
- ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
- hlist_del_rcu(&ctx->list);
-
- kill_ctx(ctx);
-
+ hlist_for_each_entry_safe(ctx, p, n, &mm->ioctx_list, list) {
if (1 != atomic_read(&ctx->users))
printk(KERN_DEBUG
"exit_aio:ioctx still alive: %d %d %d\n",
- atomic_read(&ctx->users), ctx->dead,
+ atomic_read(&ctx->users),
+ atomic_read(&ctx->dead),
atomic_read(&ctx->reqs_active));
/*
* We don't need to bother with munmap() here -
@@ -444,7 +422,7 @@ void exit_aio(struct mm_struct *mm)
* place that uses ->mmap_size, so it's safe.
*/
ctx->ring_info.mmap_size = 0;
- put_ioctx(ctx);
+ kill_ioctx(ctx);
}
}
@@ -510,8 +488,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
kmem_cache_free(kiocb_cachep, req);
atomic_dec(&ctx->reqs_active);
}
- if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
- wake_up_all(&ctx->wait);
spin_unlock_irq(&ctx->ctx_lock);
}
@@ -607,18 +583,13 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
rcu_read_lock();
- hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
- /*
- * RCU protects us against accessing freed memory but
- * we have to be careful not to get a reference when the
- * reference count already dropped to 0 (ctx->dead test
- * is unreliable because of races).
- */
- if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
+ hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
+ if (ctx->user_id == ctx_id){
+ BUG_ON(atomic_read(&ctx->dead));
+ atomic_inc(&ctx->users);
ret = ctx;
break;
}
- }
rcu_read_unlock();
return ret;
@@ -655,12 +626,15 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
info = &ctx->ring_info;
- /* add a completion event to the ring buffer.
- * must be done holding ctx->ctx_lock to prevent
- * other code from messing with the tail
- * pointer since we might be called from irq
- * context.
+ /*
+ * Add a completion event to the ring buffer. Must be done holding
+ * ctx->ctx_lock to prevent other code from messing with the tail
+ * pointer since we might be called from irq context.
+ *
+ * Take rcu_read_lock() in case the kioctx is being destroyed, as we
+ * need to issue a wakeup after decrementing reqs_active.
*/
+ rcu_read_lock();
spin_lock_irqsave(&ctx->ctx_lock, flags);
list_del(&iocb->ki_list); /* remove from active_reqs */
@@ -726,6 +700,7 @@ put_rq:
wake_up(&ctx->wait);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ rcu_read_unlock();
}
EXPORT_SYMBOL(aio_complete);
@@ -869,7 +844,7 @@ static int read_events(struct kioctx *ctx,
break;
if (min_nr <= i)
break;
- if (unlikely(ctx->dead)) {
+ if (unlikely(atomic_read(&ctx->dead))) {
ret = -EINVAL;
break;
}
@@ -912,35 +887,6 @@ out:
return i ? i : ret;
}
-/* Take an ioctx and remove it from the list of ioctx's. Protects
- * against races with itself via ->dead.
- */
-static void io_destroy(struct kioctx *ioctx)
-{
- struct mm_struct *mm = current->mm;
- int was_dead;
-
- /* delete the entry from the list is someone else hasn't already */
- spin_lock(&mm->ioctx_lock);
- was_dead = ioctx->dead;
- ioctx->dead = 1;
- hlist_del_rcu(&ioctx->list);
- spin_unlock(&mm->ioctx_lock);
-
- pr_debug("(%p)\n", ioctx);
- if (likely(!was_dead))
- put_ioctx(ioctx); /* twice for the list */
-
- kill_ctx(ioctx);
-
- /*
- * Wake up any waiters. The setting of ctx->dead must be seen
- * by other CPUs at this point. Right now, we rely on the
- * locking done by the above calls to ensure this consistency.
- */
- wake_up_all(&ioctx->wait);
-}
-
/* sys_io_setup:
* Create an aio_context capable of receiving at least nr_events.
* ctxp must not point to an aio_context that already exists, and
@@ -976,7 +922,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
if (!IS_ERR(ioctx)) {
ret = put_user(ioctx->user_id, ctxp);
if (ret)
- io_destroy(ioctx);
+ kill_ioctx(ioctx);
put_ioctx(ioctx);
}
@@ -994,7 +940,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
{
struct kioctx *ioctx = lookup_ioctx(ctx);
if (likely(NULL != ioctx)) {
- io_destroy(ioctx);
+ kill_ioctx(ioctx);
put_ioctx(ioctx);
return 0;
}
@@ -1297,25 +1243,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
if (ret)
goto out_put_req;
- spin_lock_irq(&ctx->ctx_lock);
- /*
- * We could have raced with io_destroy() and are currently holding a
- * reference to ctx which should be destroyed. We cannot submit IO
- * since ctx gets freed as soon as io_submit() puts its reference. The
- * check here is reliable: io_destroy() sets ctx->dead before waiting
- * for outstanding IO and the barrier between these two is realized by
- * unlock of mm->ioctx_lock and lock of ctx->ctx_lock. Analogously we
- * increment ctx->reqs_active before checking for ctx->dead and the
- * barrier is realized by unlock and lock of ctx->ctx_lock. Thus if we
- * don't see ctx->dead set here, io_destroy() waits for our IO to
- * finish.
- */
- if (ctx->dead)
- ret = -EINVAL;
- spin_unlock_irq(&ctx->ctx_lock);
- if (ret)
- goto out_put_req;
-
if (unlikely(kiocbIsCancelled(req))) {
ret = -EINTR;
} else {
@@ -1341,9 +1268,6 @@ out_put_req:
spin_unlock_irq(&ctx->ctx_lock);
atomic_dec(&ctx->reqs_active);
- if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
- wake_up_all(&ctx->wait);
-
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
--
1.7.12
Previously, allocating a kiocb required touching quite a few global
(well, per kioctx) cachelines... so batching up allocation to amortize
those was worthwhile. But we've gotten rid of some of those, and in
another couple of patches kiocb allocation won't require writing to any
shared cachelines, so that means we can just rip this code out.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 117 +++++++---------------------------------------------
include/linux/aio.h | 1 -
2 files changed, 15 insertions(+), 103 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 0d5062d..f2d616e 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -468,109 +468,27 @@ void exit_aio(struct mm_struct *mm)
* This prevents races between the aio code path referencing the
* req (after submitting it) and aio_complete() freeing the req.
*/
-static struct kiocb *__aio_get_req(struct kioctx *ctx)
+static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
- struct kiocb *req = NULL;
+ struct kiocb *req;
+
+ if (atomic_read(&ctx->reqs_active) >= ctx->ring_info.nr)
+ return NULL;
+
+ if (atomic_inc_return(&ctx->reqs_active) > ctx->ring_info.nr)
+ goto out_put;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
if (unlikely(!req))
- return NULL;
+ goto out_put;
atomic_set(&req->ki_users, 2);
req->ki_ctx = ctx;
return req;
-}
-
-/*
- * struct kiocb's are allocated in batches to reduce the number of
- * times the ctx lock is acquired and released.
- */
-#define KIOCB_BATCH_SIZE 32L
-struct kiocb_batch {
- struct list_head head;
- long count; /* number of requests left to allocate */
-};
-
-static void kiocb_batch_init(struct kiocb_batch *batch, long total)
-{
- INIT_LIST_HEAD(&batch->head);
- batch->count = total;
-}
-
-static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
-{
- struct kiocb *req, *n;
-
- if (list_empty(&batch->head))
- return;
-
- spin_lock_irq(&ctx->ctx_lock);
- list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
- list_del(&req->ki_batch);
- list_del(&req->ki_list);
- kmem_cache_free(kiocb_cachep, req);
- atomic_dec(&ctx->reqs_active);
- }
- spin_unlock_irq(&ctx->ctx_lock);
-}
-
-/*
- * Allocate a batch of kiocbs. This avoids taking and dropping the
- * context lock a lot during setup.
- */
-static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
-{
- unsigned short allocated, to_alloc;
- long avail;
- struct kiocb *req, *n;
-
- to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
- for (allocated = 0; allocated < to_alloc; allocated++) {
- req = __aio_get_req(ctx);
- if (!req)
- /* allocation failed, go with what we've got */
- break;
- list_add(&req->ki_batch, &batch->head);
- }
-
- if (allocated == 0)
- goto out;
-
- spin_lock_irq(&ctx->ctx_lock);
-
- avail = ctx->ring_info.nr - atomic_read(&ctx->reqs_active);
- BUG_ON(avail < 0);
- if (avail < allocated) {
- /* Trim back the number of requests. */
- list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
- list_del(&req->ki_batch);
- kmem_cache_free(kiocb_cachep, req);
- if (--allocated <= avail)
- break;
- }
- }
-
- batch->count -= allocated;
- atomic_add(allocated, &ctx->reqs_active);
-
- spin_unlock_irq(&ctx->ctx_lock);
-
-out:
- return allocated;
-}
-
-static inline struct kiocb *aio_get_req(struct kioctx *ctx,
- struct kiocb_batch *batch)
-{
- struct kiocb *req;
-
- if (list_empty(&batch->head))
- if (kiocb_batch_refill(ctx, batch) == 0)
- return NULL;
- req = list_first_entry(&batch->head, struct kiocb, ki_batch);
- list_del(&req->ki_batch);
- return req;
+out_put:
+ atomic_dec(&ctx->reqs_active);
+ return NULL;
}
static void kiocb_free(struct kiocb *req)
@@ -1159,8 +1077,7 @@ static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
}
static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
- struct iocb *iocb, struct kiocb_batch *batch,
- bool compat)
+ struct iocb *iocb, bool compat)
{
struct kiocb *req;
ssize_t ret;
@@ -1181,7 +1098,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return -EINVAL;
}
- req = aio_get_req(ctx, batch); /* returns with 2 references to req */
+ req = aio_get_req(ctx); /* returns with 2 references to req */
if (unlikely(!req))
return -EAGAIN;
@@ -1257,7 +1174,6 @@ long do_io_submit(aio_context_t ctx_id, long nr,
long ret = 0;
int i = 0;
struct blk_plug plug;
- struct kiocb_batch batch;
if (unlikely(nr < 0))
return -EINVAL;
@@ -1274,8 +1190,6 @@ long do_io_submit(aio_context_t ctx_id, long nr,
return -EINVAL;
}
- kiocb_batch_init(&batch, nr);
-
blk_start_plug(&plug);
/*
@@ -1296,13 +1210,12 @@ long do_io_submit(aio_context_t ctx_id, long nr,
break;
}
- ret = io_submit_one(ctx, user_iocb, &tmp, &batch, compat);
+ ret = io_submit_one(ctx, user_iocb, &tmp, compat);
if (ret)
break;
}
blk_finish_plug(&plug);
- kiocb_batch_free(ctx, &batch);
put_ioctx(ctx);
return i ? i : ret;
}
diff --git a/include/linux/aio.h b/include/linux/aio.h
index ea048ee..7701a41 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -81,7 +81,6 @@ struct kiocb {
struct list_head ki_list; /* the aio core uses this
* for cancellation */
- struct list_head ki_batch; /* batch allocation */
/*
* If the aio_resfd field of the userspace iocb is not zero,
--
1.7.12
Converting read_events() to prepare_to_wait_exclusive() and
hrtimers simplifies it quite a bit.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 78 +++++++++++++++++-----------------------------------------------
1 file changed, 21 insertions(+), 57 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 9d27508..7961f60 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -743,54 +743,16 @@ out:
return ret;
}
-struct aio_timeout {
- struct timer_list timer;
- int timed_out;
- struct task_struct *p;
-};
-
-static void timeout_func(unsigned long data)
-{
- struct aio_timeout *to = (struct aio_timeout *)data;
-
- to->timed_out = 1;
- wake_up_process(to->p);
-}
-
-static inline void init_timeout(struct aio_timeout *to)
-{
- setup_timer_on_stack(&to->timer, timeout_func, (unsigned long) to);
- to->timed_out = 0;
- to->p = current;
-}
-
-static inline void set_timeout(long start_jiffies, struct aio_timeout *to,
- const struct timespec *ts)
-{
- to->timer.expires = start_jiffies + timespec_to_jiffies(ts);
- if (time_after(to->timer.expires, jiffies))
- add_timer(&to->timer);
- else
- to->timed_out = 1;
-}
-
-static inline void clear_timeout(struct aio_timeout *to)
-{
- del_singleshot_timer_sync(&to->timer);
-}
-
static int read_events(struct kioctx *ctx,
long min_nr, long nr,
struct io_event __user *event,
struct timespec __user *timeout)
{
- long start_jiffies = jiffies;
- struct task_struct *tsk = current;
- DECLARE_WAITQUEUE(wait, tsk);
- int ret;
- int i = 0;
+ DEFINE_WAIT(wait);
+ struct hrtimer_sleeper t;
+ size_t i = 0;
+ int ret;
struct io_event ent;
- struct aio_timeout to;
/* needed to zero any padding within an entry (there shouldn't be
* any, but C is fun!
@@ -825,20 +787,26 @@ static int read_events(struct kioctx *ctx,
/* End fast path */
- init_timeout(&to);
+ hrtimer_init_on_stack(&t.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+ hrtimer_init_sleeper(&t, current);
+
if (timeout) {
struct timespec ts;
- ret = -EFAULT;
- if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
+
+ if (unlikely(copy_from_user(&ts, timeout, sizeof(ts)))) {
+ ret = -EFAULT;
goto out;
+ }
- set_timeout(start_jiffies, &to, &ts);
+ hrtimer_start_range_ns(&t.timer, timespec_to_ktime(ts),
+ current->timer_slack_ns, HRTIMER_MODE_REL);
}
while (likely(i < nr)) {
- add_wait_queue_exclusive(&ctx->wait, &wait);
+ prepare_to_wait_exclusive(&ctx->wait, &wait,
+ TASK_INTERRUPTIBLE);
+
do {
- set_task_state(tsk, TASK_INTERRUPTIBLE);
ret = aio_read_evt(ctx, &ent);
if (ret)
break;
@@ -848,7 +816,7 @@ static int read_events(struct kioctx *ctx,
ret = -EINVAL;
break;
}
- if (to.timed_out) /* Only check after read evt */
+ if (!t.task) /* Only check after read evt */
break;
/* Try to only show up in io wait if there are ops
* in flight */
@@ -856,16 +824,13 @@ static int read_events(struct kioctx *ctx,
io_schedule();
else
schedule();
- if (signal_pending(tsk)) {
+ if (signal_pending(current)) {
ret = -EINTR;
break;
}
/*ret = aio_read_evt(ctx, &ent);*/
} while (1) ;
- set_task_state(tsk, TASK_RUNNING);
- remove_wait_queue(&ctx->wait, &wait);
-
if (unlikely(ret <= 0))
break;
@@ -879,11 +844,10 @@ static int read_events(struct kioctx *ctx,
event ++;
i ++;
}
-
- if (timeout)
- clear_timeout(&to);
out:
- destroy_timer_on_stack(&to.timer);
+ hrtimer_cancel(&t.timer);
+ destroy_hrtimer_on_stack(&t.timer);
+ finish_wait(&ctx->wait, &wait);
return i ? i : ret;
}
--
1.7.12
The aio code tries really hard to avoid having to deal with the
completion ringbuffer overflowing. To do that, it has to keep track of
the number of outstanding kiocbs, and the number of completions
currently in the ringbuffer - and it's got to check that every time we
allocate a kiocb. Ouch.
But - we can improve this quite a bit if we just change reqs_active to
mean "number of outstanding requests and unreaped completions" - that
means kiocb allocation doesn't have to look at the ringbuffer, which is
a fairly significant win.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 38 +++++++++++++++++++++++++-------------
1 file changed, 25 insertions(+), 13 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index f1f2345..0d5062d 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -71,12 +71,6 @@ struct aio_ring_info {
struct page *internal_pages[AIO_RING_PAGES];
};
-static inline unsigned aio_ring_avail(struct aio_ring_info *info,
- struct aio_ring *ring)
-{
- return (ring->head + info->nr - 1 - ring->tail) % info->nr;
-}
-
struct kioctx {
atomic_t users;
atomic_t dead;
@@ -282,7 +276,10 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
*/
static void free_ioctx(struct kioctx *ctx)
{
+ struct aio_ring_info *info = &ctx->ring_info;
+ struct aio_ring *ring;
struct io_event res;
+ unsigned head, avail;
spin_lock_irq(&ctx->ctx_lock);
@@ -296,7 +293,21 @@ static void free_ioctx(struct kioctx *ctx)
spin_unlock_irq(&ctx->ctx_lock);
- wait_event(ctx->wait, !atomic_read(&ctx->reqs_active));
+ ring = kmap_atomic(info->ring_pages[0]);
+ head = ring->head;
+ kunmap_atomic(ring);
+
+ while (atomic_read(&ctx->reqs_active) > 0) {
+ wait_event(ctx->wait, head != info->tail);
+
+ avail = (head < info->tail ? info->tail : info->nr) - head;
+
+ atomic_sub(avail, &ctx->reqs_active);
+ head += avail;
+ head %= info->nr;
+ }
+
+ WARN_ON(atomic_read(&ctx->reqs_active) < 0);
aio_free_ring(ctx);
@@ -513,7 +524,6 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
unsigned short allocated, to_alloc;
long avail;
struct kiocb *req, *n;
- struct aio_ring *ring;
to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
for (allocated = 0; allocated < to_alloc; allocated++) {
@@ -528,9 +538,8 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
goto out;
spin_lock_irq(&ctx->ctx_lock);
- ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
- avail = aio_ring_avail(&ctx->ring_info, ring) - atomic_read(&ctx->reqs_active);
+ avail = ctx->ring_info.nr - atomic_read(&ctx->reqs_active);
BUG_ON(avail < 0);
if (avail < allocated) {
/* Trim back the number of requests. */
@@ -545,7 +554,6 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
batch->count -= allocated;
atomic_add(allocated, &ctx->reqs_active);
- kunmap_atomic(ring);
spin_unlock_irq(&ctx->ctx_lock);
out:
@@ -654,8 +662,11 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
* cancelled requests don't get events, userland was given one
* when the event got cancelled.
*/
- if (test_and_set_bit(KIF_CANCELLED, &iocb->ki_flags))
+ if (test_and_set_bit(KIF_CANCELLED, &iocb->ki_flags)) {
+ atomic_dec(&ctx->reqs_active);
+ /* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
+ }
/*
* Add a completion event to the ring buffer. Must be done holding
@@ -706,7 +717,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
put_rq:
/* everything turned out well, dispose of the aiocb. */
aio_put_req(iocb);
- atomic_dec(&ctx->reqs_active);
/*
* We have to order our ring_info tail store above and test
@@ -779,6 +789,8 @@ static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
ring->head = head;
kunmap_atomic(ring);
+ atomic_sub(ret, &ctx->reqs_active);
+
pr_debug("%d h%u t%u\n", ret, head, info->tail);
out:
mutex_unlock(&info->ring_lock);
--
1.7.12
Cancelling kiocbs requires adding them to a per kioctx linked list,
which is one of the few things we need to take the kioctx lock for in
the fast path. But most kiocbs can't be cancelled - so if we just do
this lazily, we can avoid quite a bit of locking overhead.
Signed-off-by: Kent Overstreet <[email protected]>
---
drivers/usb/gadget/inode.c | 3 +-
fs/aio.c | 88 +++++++++++++++++++++++++++-------------------
include/linux/aio.h | 10 ++++--
3 files changed, 59 insertions(+), 42 deletions(-)
diff --git a/drivers/usb/gadget/inode.c b/drivers/usb/gadget/inode.c
index 7640e01..3bf0c35 100644
--- a/drivers/usb/gadget/inode.c
+++ b/drivers/usb/gadget/inode.c
@@ -534,7 +534,6 @@ static int ep_aio_cancel(struct kiocb *iocb, struct io_event *e)
local_irq_disable();
epdata = priv->epdata;
// spin_lock(&epdata->dev->lock);
- kiocbSetCancelled(iocb);
if (likely(epdata && epdata->ep && priv->req))
value = usb_ep_dequeue (epdata->ep, priv->req);
else
@@ -664,7 +663,7 @@ fail:
goto fail;
}
- iocb->ki_cancel = ep_aio_cancel;
+ kiocb_set_cancel_fn(iocb, ep_aio_cancel);
get_ep(epdata);
priv->epdata = epdata;
priv->actual = 0;
diff --git a/fs/aio.c b/fs/aio.c
index de255d8..f1f2345 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -96,6 +96,8 @@ struct kioctx {
unsigned max_reqs;
struct aio_ring_info ring_info;
+
+ spinlock_t completion_lock;
};
/*------ sysctl variables----*/
@@ -232,25 +234,43 @@ static int aio_setup_ring(struct kioctx *ctx)
kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
} while(0)
+void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)
+{
+ if (!req->ki_list.next) {
+ struct kioctx *ctx = req->ki_ctx;
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->ctx_lock, flags);
+ list_add(&req->ki_list, &ctx->active_reqs);
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ }
+
+ req->ki_cancel = cancel;
+}
+EXPORT_SYMBOL(kiocb_set_cancel_fn);
+
static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
struct io_event *res)
{
- int (*cancel)(struct kiocb *, struct io_event *);
+ kiocb_cancel_fn *cancel;
int ret = -EINVAL;
cancel = kiocb->ki_cancel;
- kiocbSetCancelled(kiocb);
- if (cancel) {
- atomic_inc(&kiocb->ki_users);
- spin_unlock_irq(&ctx->ctx_lock);
+ if (!cancel)
+ return ret;
- memset(res, 0, sizeof(*res));
- res->obj = (u64) kiocb->ki_obj.user;
- res->data = kiocb->ki_user_data;
- ret = cancel(kiocb, res);
+ if (test_and_set_bit(KIF_CANCELLED, &kiocb->ki_flags))
+ return ret;
- spin_lock_irq(&ctx->ctx_lock);
- }
+ atomic_inc(&kiocb->ki_users);
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ memset(res, 0, sizeof(*res));
+ res->obj = (u64) kiocb->ki_obj.user;
+ res->data = kiocb->ki_user_data;
+ ret = cancel(kiocb, res);
+
+ spin_lock_irq(&ctx->ctx_lock);
return ret;
}
@@ -324,6 +344,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_set(&ctx->users, 2);
spin_lock_init(&ctx->ctx_lock);
+ spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);
@@ -440,20 +461,12 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
{
struct kiocb *req = NULL;
- req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
+ req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
if (unlikely(!req))
return NULL;
- req->ki_flags = 0;
atomic_set(&req->ki_users, 2);
- req->ki_key = 0;
req->ki_ctx = ctx;
- req->ki_cancel = NULL;
- req->ki_retry = NULL;
- req->ki_dtor = NULL;
- req->private = NULL;
- req->ki_iovec = NULL;
- req->ki_eventfd = NULL;
return req;
}
@@ -530,10 +543,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
}
batch->count -= allocated;
- list_for_each_entry(req, &batch->head, ki_batch) {
- list_add(&req->ki_list, &ctx->active_reqs);
- atomic_inc(&ctx->reqs_active);
- }
+ atomic_add(allocated, &ctx->reqs_active);
kunmap_atomic(ring);
spin_unlock_irq(&ctx->ctx_lock);
@@ -627,25 +637,33 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
info = &ctx->ring_info;
/*
- * Add a completion event to the ring buffer. Must be done holding
- * ctx->ctx_lock to prevent other code from messing with the tail
- * pointer since we might be called from irq context.
- *
* Take rcu_read_lock() in case the kioctx is being destroyed, as we
* need to issue a wakeup after decrementing reqs_active.
*/
rcu_read_lock();
- spin_lock_irqsave(&ctx->ctx_lock, flags);
- list_del(&iocb->ki_list); /* remove from active_reqs */
+ if (iocb->ki_list.next) {
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->ctx_lock, flags);
+ list_del(&iocb->ki_list);
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ }
/*
* cancelled requests don't get events, userland was given one
* when the event got cancelled.
*/
- if (kiocbIsCancelled(iocb))
+ if (test_and_set_bit(KIF_CANCELLED, &iocb->ki_flags))
goto put_rq;
+ /*
+ * Add a completion event to the ring buffer. Must be done holding
+ * ctx->ctx_lock to prevent other code from messing with the tail
+ * pointer since we might be called from irq context.
+ */
+ spin_lock_irqsave(&ctx->completion_lock, flags);
+
ring = kmap_atomic(info->ring_pages[0]);
tail = info->tail;
@@ -673,6 +691,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
put_aio_ring_event(event);
kunmap_atomic(ring);
+ spin_unlock_irqrestore(&ctx->completion_lock, flags);
+
pr_debug("added to ring %p at [%lu]\n", iocb, tail);
/*
@@ -699,7 +719,6 @@ put_rq:
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
rcu_read_unlock();
}
EXPORT_SYMBOL(aio_complete);
@@ -1190,7 +1209,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
req->ki_opcode = iocb->aio_lio_opcode;
ret = aio_setup_iocb(req, compat);
-
if (ret)
goto out_put_req;
@@ -1214,10 +1232,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0;
out_put_req:
- spin_lock_irq(&ctx->ctx_lock);
- list_del(&req->ki_list);
- spin_unlock_irq(&ctx->ctx_lock);
-
atomic_dec(&ctx->reqs_active);
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 294b659..ea048ee 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -10,18 +10,19 @@
#include <linux/atomic.h>
struct kioctx;
+struct kiocb;
#define KIOCB_SYNC_KEY (~0U)
/* ki_flags bits */
#define KIF_CANCELLED 2
-#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
-
#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags)
#define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+typedef int (kiocb_cancel_fn)(struct kiocb *, struct io_event *);
+
/* is there a better place to document function pointer methods? */
/**
* ki_retry - iocb forward progress callback
@@ -55,7 +56,7 @@ struct kiocb {
struct file *ki_filp;
struct kioctx *ki_ctx; /* may be NULL for sync ops */
- int (*ki_cancel)(struct kiocb *, struct io_event *);
+ kiocb_cancel_fn *ki_cancel;
ssize_t (*ki_retry)(struct kiocb *);
void (*ki_dtor)(struct kiocb *);
@@ -113,6 +114,7 @@ struct mm_struct;
extern void exit_aio(struct mm_struct *mm);
extern long do_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user *__user *iocbpp, bool compat);
+void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel);
#else
static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
static inline void aio_put_req(struct kiocb *iocb) { }
@@ -122,6 +124,8 @@ static inline void exit_aio(struct mm_struct *mm) { }
static inline long do_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user * __user *iocbpp,
bool compat) { return 0; }
+static inline void kiocb_set_cancel_fn(struct kiocb *req,
+ kiocb_cancel_fn *cancel) { }
#endif /* CONFIG_AIO */
static inline struct kiocb *list_kiocb(struct list_head *h)
--
1.7.12
Freeing a kiocb needed to touch the kioctx for three things:
* Pull it off the reqs_active list
* Decrementing reqs_active
* Issuing a wakeup, if the kioctx was in the process of being freed.
This patch moves these to aio_complete(), for a couple reasons:
* aio_complete() already has to issue the wakeup, so if we drop the
kioctx refcount before aio_complete does its wakeup we don't have to
do it twice.
* aio_complete currently has to take the kioctx lock, so it makes sense
for it to pull the kiocb off the reqs_active list too.
* A later patch is going to change reqs_active to include unreaped
completions - this will mean allocating a kiocb doesn't have to look
at the ringbuffer. So taking the decrement of reqs_active out of
kiocb_free() is useful prep work for that patch.
This doesn't really affect cancellation, since existing (usb) code that
implements a cancel function still calls aio_complete() - we just have
to make sure that aio_complete does the necessary teardown for cancelled
kiocbs.
It does affect code paths where we free kiocbs that were never
submitted; they need to decrement reqs_active and pull the kiocb off the
reqs_active list. This occurs in two places: kiocb_batch_free(), which
is going away in a later patch, and the error path in io_submit_one.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 85 +++++++++++++++++++++--------------------------------
include/linux/aio.h | 4 +--
2 files changed, 35 insertions(+), 54 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 200abff..9a60f13 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -89,7 +89,7 @@ struct kioctx {
spinlock_t ctx_lock;
- int reqs_active;
+ atomic_t reqs_active;
struct list_head active_reqs; /* used for cancellation */
/* sys_io_setup currently limits this to an unsigned int */
@@ -247,7 +247,7 @@ static void ctx_rcu_free(struct rcu_head *head)
static void __put_ioctx(struct kioctx *ctx)
{
unsigned nr_events = ctx->max_reqs;
- BUG_ON(ctx->reqs_active);
+ BUG_ON(atomic_read(&ctx->reqs_active));
aio_free_ring(ctx);
if (nr_events) {
@@ -281,7 +281,7 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
cancel = kiocb->ki_cancel;
kiocbSetCancelled(kiocb);
if (cancel) {
- kiocb->ki_users++;
+ atomic_inc(&kiocb->ki_users);
spin_unlock_irq(&ctx->ctx_lock);
memset(res, 0, sizeof(*res));
@@ -379,12 +379,12 @@ static void kill_ctx(struct kioctx *ctx)
kiocb_cancel(ctx, iocb, &res);
}
- if (!ctx->reqs_active)
+ if (!atomic_read(&ctx->reqs_active))
goto out;
add_wait_queue(&ctx->wait, &wait);
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- while (ctx->reqs_active) {
+ while (atomic_read(&ctx->reqs_active)) {
spin_unlock_irq(&ctx->ctx_lock);
io_schedule();
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
@@ -402,9 +402,9 @@ out:
*/
ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
{
- while (iocb->ki_users) {
+ while (atomic_read(&iocb->ki_users)) {
set_current_state(TASK_UNINTERRUPTIBLE);
- if (!iocb->ki_users)
+ if (!atomic_read(&iocb->ki_users))
break;
io_schedule();
}
@@ -434,7 +434,7 @@ void exit_aio(struct mm_struct *mm)
printk(KERN_DEBUG
"exit_aio:ioctx still alive: %d %d %d\n",
atomic_read(&ctx->users), ctx->dead,
- ctx->reqs_active);
+ atomic_read(&ctx->reqs_active));
/*
* We don't need to bother with munmap() here -
* exit_mmap(mm) is coming and it'll unmap everything.
@@ -449,11 +449,11 @@ void exit_aio(struct mm_struct *mm)
}
/* aio_get_req
- * Allocate a slot for an aio request. Increments the users count
+ * Allocate a slot for an aio request. Increments the ki_users count
* of the kioctx so that the kioctx stays around until all requests are
* complete. Returns NULL if no requests are free.
*
- * Returns with kiocb->users set to 2. The io submit code path holds
+ * Returns with kiocb->ki_users set to 2. The io submit code path holds
* an extra reference while submitting the i/o.
* This prevents races between the aio code path referencing the
* req (after submitting it) and aio_complete() freeing the req.
@@ -467,7 +467,7 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
return NULL;
req->ki_flags = 0;
- req->ki_users = 2;
+ atomic_set(&req->ki_users, 2);
req->ki_key = 0;
req->ki_ctx = ctx;
req->ki_cancel = NULL;
@@ -508,9 +508,9 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
list_del(&req->ki_batch);
list_del(&req->ki_list);
kmem_cache_free(kiocb_cachep, req);
- ctx->reqs_active--;
+ atomic_dec(&ctx->reqs_active);
}
- if (unlikely(!ctx->reqs_active && ctx->dead))
+ if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
wake_up_all(&ctx->wait);
spin_unlock_irq(&ctx->ctx_lock);
}
@@ -541,7 +541,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
spin_lock_irq(&ctx->ctx_lock);
ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
- avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
+ avail = aio_ring_avail(&ctx->ring_info, ring) - atomic_read(&ctx->reqs_active);
BUG_ON(avail < 0);
if (avail < allocated) {
/* Trim back the number of requests. */
@@ -556,7 +556,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
batch->count -= allocated;
list_for_each_entry(req, &batch->head, ki_batch) {
list_add(&req->ki_list, &ctx->active_reqs);
- ctx->reqs_active++;
+ atomic_inc(&ctx->reqs_active);
}
kunmap_atomic(ring);
@@ -579,10 +579,8 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx,
return req;
}
-static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
+static void kiocb_free(struct kiocb *req)
{
- assert_spin_locked(&ctx->ctx_lock);
-
if (req->ki_filp)
fput(req->ki_filp);
if (req->ki_eventfd != NULL)
@@ -592,40 +590,12 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
if (req->ki_iovec != &req->ki_inline_vec)
kfree(req->ki_iovec);
kmem_cache_free(kiocb_cachep, req);
- ctx->reqs_active--;
-
- if (unlikely(!ctx->reqs_active && ctx->dead))
- wake_up_all(&ctx->wait);
}
-/* __aio_put_req
- * Returns true if this put was the last user of the request.
- */
-static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
-{
- assert_spin_locked(&ctx->ctx_lock);
-
- req->ki_users--;
- BUG_ON(req->ki_users < 0);
- if (likely(req->ki_users))
- return;
- list_del(&req->ki_list); /* remove from active_reqs */
- req->ki_cancel = NULL;
- req->ki_retry = NULL;
-
- really_put_req(ctx, req);
-}
-
-/* aio_put_req
- * Returns true if this put was the last user of the kiocb,
- * false if the request is still in use.
- */
void aio_put_req(struct kiocb *req)
{
- struct kioctx *ctx = req->ki_ctx;
- spin_lock_irq(&ctx->ctx_lock);
- __aio_put_req(ctx, req);
- spin_unlock_irq(&ctx->ctx_lock);
+ if (atomic_dec_and_test(&req->ki_users))
+ kiocb_free(req);
}
EXPORT_SYMBOL(aio_put_req);
@@ -676,9 +646,9 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
* - the sync task helpfully left a reference to itself in the iocb
*/
if (is_sync_kiocb(iocb)) {
- BUG_ON(iocb->ki_users != 1);
+ BUG_ON(atomic_read(&iocb->ki_users) != 1);
iocb->ki_user_data = res;
- iocb->ki_users = 0;
+ atomic_set(&iocb->ki_users, 0);
wake_up_process(iocb->ki_obj.tsk);
return;
}
@@ -693,6 +663,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/
spin_lock_irqsave(&ctx->ctx_lock, flags);
+ list_del(&iocb->ki_list); /* remove from active_reqs */
+
/*
* cancelled requests don't get events, userland was given one
* when the event got cancelled.
@@ -739,7 +711,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
put_rq:
/* everything turned out well, dispose of the aiocb. */
- __aio_put_req(ctx, iocb);
+ aio_put_req(iocb);
+ atomic_dec(&ctx->reqs_active);
/*
* We have to order our ring_info tail store above and test
@@ -904,7 +877,7 @@ static int read_events(struct kioctx *ctx,
break;
/* Try to only show up in io wait if there are ops
* in flight */
- if (ctx->reqs_active)
+ if (atomic_read(&ctx->reqs_active))
io_schedule();
else
schedule();
@@ -1363,6 +1336,14 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return 0;
out_put_req:
+ spin_lock_irq(&ctx->ctx_lock);
+ list_del(&req->ki_list);
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ atomic_dec(&ctx->reqs_active);
+ if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
+ wake_up_all(&ctx->wait);
+
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
diff --git a/include/linux/aio.h b/include/linux/aio.h
index f29de1f..294b659 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -50,7 +50,7 @@ struct kioctx;
struct kiocb {
struct list_head ki_run_list;
unsigned long ki_flags;
- int ki_users;
+ atomic_t ki_users;
unsigned ki_key; /* id of this request */
struct file *ki_filp;
@@ -97,7 +97,7 @@ static inline bool is_sync_kiocb(struct kiocb *kiocb)
static inline void init_sync_kiocb(struct kiocb *kiocb, struct file *filp)
{
*kiocb = (struct kiocb) {
- .ki_users = 1,
+ .ki_users = ATOMIC_INIT(1),
.ki_key = KIOCB_SYNC_KEY,
.ki_filp = filp,
.ki_obj.tsk = current,
--
1.7.12
aio_get_req() will fail if we have the maximum number of requests
outstanding, which depending on the application may not be uncommon. So
avoid doing an unnecessary fget().
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 22 +++++++++-------------
1 file changed, 9 insertions(+), 13 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 0ed14e8..200abff 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -583,6 +583,8 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
{
assert_spin_locked(&ctx->ctx_lock);
+ if (req->ki_filp)
+ fput(req->ki_filp);
if (req->ki_eventfd != NULL)
eventfd_ctx_put(req->ki_eventfd);
if (req->ki_dtor)
@@ -601,9 +603,6 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
*/
static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
{
- pr_debug("(%p): f_count=%ld\n",
- req, atomic_long_read(&req->ki_filp->f_count));
-
assert_spin_locked(&ctx->ctx_lock);
req->ki_users--;
@@ -614,8 +613,6 @@ static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
req->ki_cancel = NULL;
req->ki_retry = NULL;
- fput(req->ki_filp);
- req->ki_filp = NULL;
really_put_req(ctx, req);
}
@@ -1265,7 +1262,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
bool compat)
{
struct kiocb *req;
- struct file *file;
ssize_t ret;
/* enforce forwards compatibility on users */
@@ -1284,16 +1280,16 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
return -EINVAL;
}
- file = fget(iocb->aio_fildes);
- if (unlikely(!file))
- return -EBADF;
-
req = aio_get_req(ctx, batch); /* returns with 2 references to req */
- if (unlikely(!req)) {
- fput(file);
+ if (unlikely(!req))
return -EAGAIN;
+
+ req->ki_filp = fget(iocb->aio_fildes);
+ if (unlikely(!req->ki_filp)) {
+ ret = -EBADF;
+ goto out_put_req;
}
- req->ki_filp = file;
+
if (iocb->aio_flags & IOCB_FLAG_RESFD) {
/*
* If the IOCB_FLAG_RESFD flag of aio_flags is set, get an
--
1.7.12
From: Zach Brown <[email protected]>
This removes the retry-based AIO infrastructure now that nothing in tree
is using it.
We want to remove retry-based AIO because it is fundemantally unsafe.
It retries IO submission from a kernel thread that has only assumed the
mm of the submitting task. All other task_struct references in the IO
submission path will see the kernel thread, not the submitting task.
This design flaw means that nothing of any meaningful complexity can use
retry-based AIO.
This removes all the code and data associated with the retry machinery.
The most significant benefit of this is the removal of the locking
around the unused run list in the submission path.
This has only been compiled.
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 348 ++++----------------------------------------------
fs/ocfs2/dlmglue.c | 2 +-
fs/read_write.c | 34 +----
include/linux/aio.h | 21 ---
include/linux/errno.h | 1 -
5 files changed, 29 insertions(+), 377 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 71f613c..1de4f78 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -54,11 +54,6 @@ unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio request
static struct kmem_cache *kiocb_cachep;
static struct kmem_cache *kioctx_cachep;
-static struct workqueue_struct *aio_wq;
-
-static void aio_kick_handler(struct work_struct *);
-static void aio_queue_work(struct kioctx *);
-
/* aio_setup
* Creates the slab caches used by the aio routines, panic on
* failure as this is done early during the boot sequence.
@@ -68,9 +63,6 @@ static int __init aio_setup(void)
kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
- aio_wq = alloc_workqueue("aio", 0, 1); /* used to limit concurrency */
- BUG_ON(!aio_wq);
-
pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
return 0;
@@ -86,7 +78,6 @@ static void aio_free_ring(struct kioctx *ctx)
put_page(info->ring_pages[i]);
if (info->mmap_size) {
- BUG_ON(ctx->mm != current->mm);
vm_munmap(info->mmap_base, info->mmap_size);
}
@@ -101,6 +92,7 @@ static int aio_setup_ring(struct kioctx *ctx)
struct aio_ring *ring;
struct aio_ring_info *info = &ctx->ring_info;
unsigned nr_events = ctx->max_reqs;
+ struct mm_struct *mm = current->mm;
unsigned long size;
int nr_pages;
@@ -126,22 +118,21 @@ static int aio_setup_ring(struct kioctx *ctx)
info->mmap_size = nr_pages * PAGE_SIZE;
dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
- down_write(&ctx->mm->mmap_sem);
+ down_write(&mm->mmap_sem);
info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size,
PROT_READ|PROT_WRITE,
MAP_ANONYMOUS|MAP_PRIVATE, 0);
if (IS_ERR((void *)info->mmap_base)) {
- up_write(&ctx->mm->mmap_sem);
+ up_write(&mm->mmap_sem);
info->mmap_size = 0;
aio_free_ring(ctx);
return -EAGAIN;
}
dprintk("mmap address: 0x%08lx\n", info->mmap_base);
- info->nr_pages = get_user_pages(current, ctx->mm,
- info->mmap_base, nr_pages,
+ info->nr_pages = get_user_pages(current, mm, info->mmap_base, nr_pages,
1, 0, info->ring_pages, NULL);
- up_write(&ctx->mm->mmap_sem);
+ up_write(&mm->mmap_sem);
if (unlikely(info->nr_pages != nr_pages)) {
aio_free_ring(ctx);
@@ -203,10 +194,7 @@ static void __put_ioctx(struct kioctx *ctx)
unsigned nr_events = ctx->max_reqs;
BUG_ON(ctx->reqs_active);
- cancel_delayed_work_sync(&ctx->wq);
aio_free_ring(ctx);
- mmdrop(ctx->mm);
- ctx->mm = NULL;
if (nr_events) {
spin_lock(&aio_nr_lock);
BUG_ON(aio_nr - nr_events > aio_nr);
@@ -234,7 +222,7 @@ static inline void put_ioctx(struct kioctx *kioctx)
*/
static struct kioctx *ioctx_alloc(unsigned nr_events)
{
- struct mm_struct *mm;
+ struct mm_struct *mm = current->mm;
struct kioctx *ctx;
int err = -ENOMEM;
@@ -253,8 +241,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
return ERR_PTR(-ENOMEM);
ctx->max_reqs = nr_events;
- mm = ctx->mm = current->mm;
- atomic_inc(&mm->mm_count);
atomic_set(&ctx->users, 2);
spin_lock_init(&ctx->ctx_lock);
@@ -262,8 +248,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
init_waitqueue_head(&ctx->wait);
INIT_LIST_HEAD(&ctx->active_reqs);
- INIT_LIST_HEAD(&ctx->run_list);
- INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);
if (aio_setup_ring(ctx) < 0)
goto out_freectx;
@@ -284,14 +268,13 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
spin_unlock(&mm->ioctx_lock);
dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
- ctx, ctx->user_id, current->mm, ctx->ring_info.nr);
+ ctx, ctx->user_id, mm, ctx->ring_info.nr);
return ctx;
out_cleanup:
err = -EAGAIN;
aio_free_ring(ctx);
out_freectx:
- mmdrop(mm);
kmem_cache_free(kioctx_cachep, ctx);
dprintk("aio: error allocating ioctx %d\n", err);
return ERR_PTR(err);
@@ -388,8 +371,6 @@ void exit_aio(struct mm_struct *mm)
* as indicator that it needs to unmap the area,
* just set it to 0; aio_free_ring() is the only
* place that uses ->mmap_size, so it's safe.
- * That way we get all munmap done to current->mm -
- * all other callers have ctx->mm == current->mm.
*/
ctx->ring_info.mmap_size = 0;
put_ioctx(ctx);
@@ -423,7 +404,6 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
req->ki_dtor = NULL;
req->private = NULL;
req->ki_iovec = NULL;
- INIT_LIST_HEAD(&req->ki_run_list);
req->ki_eventfd = NULL;
return req;
@@ -609,281 +589,6 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
return ret;
}
-/*
- * Queue up a kiocb to be retried. Assumes that the kiocb
- * has already been marked as kicked, and places it on
- * the retry run list for the corresponding ioctx, if it
- * isn't already queued. Returns 1 if it actually queued
- * the kiocb (to tell the caller to activate the work
- * queue to process it), or 0, if it found that it was
- * already queued.
- */
-static inline int __queue_kicked_iocb(struct kiocb *iocb)
-{
- struct kioctx *ctx = iocb->ki_ctx;
-
- assert_spin_locked(&ctx->ctx_lock);
-
- if (list_empty(&iocb->ki_run_list)) {
- list_add_tail(&iocb->ki_run_list,
- &ctx->run_list);
- return 1;
- }
- return 0;
-}
-
-/* aio_run_iocb
- * This is the core aio execution routine. It is
- * invoked both for initial i/o submission and
- * subsequent retries via the aio_kick_handler.
- * Expects to be invoked with iocb->ki_ctx->lock
- * already held. The lock is released and reacquired
- * as needed during processing.
- *
- * Calls the iocb retry method (already setup for the
- * iocb on initial submission) for operation specific
- * handling, but takes care of most of common retry
- * execution details for a given iocb. The retry method
- * needs to be non-blocking as far as possible, to avoid
- * holding up other iocbs waiting to be serviced by the
- * retry kernel thread.
- *
- * The trickier parts in this code have to do with
- * ensuring that only one retry instance is in progress
- * for a given iocb at any time. Providing that guarantee
- * simplifies the coding of individual aio operations as
- * it avoids various potential races.
- */
-static ssize_t aio_run_iocb(struct kiocb *iocb)
-{
- struct kioctx *ctx = iocb->ki_ctx;
- ssize_t (*retry)(struct kiocb *);
- ssize_t ret;
-
- if (!(retry = iocb->ki_retry)) {
- printk("aio_run_iocb: iocb->ki_retry = NULL\n");
- return 0;
- }
-
- /*
- * We don't want the next retry iteration for this
- * operation to start until this one has returned and
- * updated the iocb state. However, wait_queue functions
- * can trigger a kick_iocb from interrupt context in the
- * meantime, indicating that data is available for the next
- * iteration. We want to remember that and enable the
- * next retry iteration _after_ we are through with
- * this one.
- *
- * So, in order to be able to register a "kick", but
- * prevent it from being queued now, we clear the kick
- * flag, but make the kick code *think* that the iocb is
- * still on the run list until we are actually done.
- * When we are done with this iteration, we check if
- * the iocb was kicked in the meantime and if so, queue
- * it up afresh.
- */
-
- kiocbClearKicked(iocb);
-
- /*
- * This is so that aio_complete knows it doesn't need to
- * pull the iocb off the run list (We can't just call
- * INIT_LIST_HEAD because we don't want a kick_iocb to
- * queue this on the run list yet)
- */
- iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
- spin_unlock_irq(&ctx->ctx_lock);
-
- /* Quit retrying if the i/o has been cancelled */
- if (kiocbIsCancelled(iocb)) {
- ret = -EINTR;
- aio_complete(iocb, ret, 0);
- /* must not access the iocb after this */
- goto out;
- }
-
- /*
- * Now we are all set to call the retry method in async
- * context.
- */
- ret = retry(iocb);
-
- if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
- /*
- * There's no easy way to restart the syscall since other AIO's
- * may be already running. Just fail this IO with EINTR.
- */
- if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
- ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
- ret = -EINTR;
- aio_complete(iocb, ret, 0);
- }
-out:
- spin_lock_irq(&ctx->ctx_lock);
-
- if (-EIOCBRETRY == ret) {
- /*
- * OK, now that we are done with this iteration
- * and know that there is more left to go,
- * this is where we let go so that a subsequent
- * "kick" can start the next iteration
- */
-
- /* will make __queue_kicked_iocb succeed from here on */
- INIT_LIST_HEAD(&iocb->ki_run_list);
- /* we must queue the next iteration ourselves, if it
- * has already been kicked */
- if (kiocbIsKicked(iocb)) {
- __queue_kicked_iocb(iocb);
-
- /*
- * __queue_kicked_iocb will always return 1 here, because
- * iocb->ki_run_list is empty at this point so it should
- * be safe to unconditionally queue the context into the
- * work queue.
- */
- aio_queue_work(ctx);
- }
- }
- return ret;
-}
-
-/*
- * __aio_run_iocbs:
- * Process all pending retries queued on the ioctx
- * run list.
- * Assumes it is operating within the aio issuer's mm
- * context.
- */
-static int __aio_run_iocbs(struct kioctx *ctx)
-{
- struct kiocb *iocb;
- struct list_head run_list;
-
- assert_spin_locked(&ctx->ctx_lock);
-
- list_replace_init(&ctx->run_list, &run_list);
- while (!list_empty(&run_list)) {
- iocb = list_entry(run_list.next, struct kiocb,
- ki_run_list);
- list_del(&iocb->ki_run_list);
- /*
- * Hold an extra reference while retrying i/o.
- */
- iocb->ki_users++; /* grab extra reference */
- aio_run_iocb(iocb);
- __aio_put_req(ctx, iocb);
- }
- if (!list_empty(&ctx->run_list))
- return 1;
- return 0;
-}
-
-static void aio_queue_work(struct kioctx * ctx)
-{
- unsigned long timeout;
- /*
- * if someone is waiting, get the work started right
- * away, otherwise, use a longer delay
- */
- smp_mb();
- if (waitqueue_active(&ctx->wait))
- timeout = 1;
- else
- timeout = HZ/10;
- queue_delayed_work(aio_wq, &ctx->wq, timeout);
-}
-
-/*
- * aio_run_all_iocbs:
- * Process all pending retries queued on the ioctx
- * run list, and keep running them until the list
- * stays empty.
- * Assumes it is operating within the aio issuer's mm context.
- */
-static inline void aio_run_all_iocbs(struct kioctx *ctx)
-{
- spin_lock_irq(&ctx->ctx_lock);
- while (__aio_run_iocbs(ctx))
- ;
- spin_unlock_irq(&ctx->ctx_lock);
-}
-
-/*
- * aio_kick_handler:
- * Work queue handler triggered to process pending
- * retries on an ioctx. Takes on the aio issuer's
- * mm context before running the iocbs, so that
- * copy_xxx_user operates on the issuer's address
- * space.
- * Run on aiod's context.
- */
-static void aio_kick_handler(struct work_struct *work)
-{
- struct kioctx *ctx = container_of(work, struct kioctx, wq.work);
- mm_segment_t oldfs = get_fs();
- struct mm_struct *mm;
- int requeue;
-
- set_fs(USER_DS);
- use_mm(ctx->mm);
- spin_lock_irq(&ctx->ctx_lock);
- requeue =__aio_run_iocbs(ctx);
- mm = ctx->mm;
- spin_unlock_irq(&ctx->ctx_lock);
- unuse_mm(mm);
- set_fs(oldfs);
- /*
- * we're in a worker thread already; no point using non-zero delay
- */
- if (requeue)
- queue_delayed_work(aio_wq, &ctx->wq, 0);
-}
-
-
-/*
- * Called by kick_iocb to queue the kiocb for retry
- * and if required activate the aio work queue to process
- * it
- */
-static void try_queue_kicked_iocb(struct kiocb *iocb)
-{
- struct kioctx *ctx = iocb->ki_ctx;
- unsigned long flags;
- int run = 0;
-
- spin_lock_irqsave(&ctx->ctx_lock, flags);
- /* set this inside the lock so that we can't race with aio_run_iocb()
- * testing it and putting the iocb on the run list under the lock */
- if (!kiocbTryKick(iocb))
- run = __queue_kicked_iocb(iocb);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
- if (run)
- aio_queue_work(ctx);
-}
-
-/*
- * kick_iocb:
- * Called typically from a wait queue callback context
- * to trigger a retry of the iocb.
- * The retry is usually executed by aio workqueue
- * threads (See aio_kick_handler).
- */
-void kick_iocb(struct kiocb *iocb)
-{
- /* sync iocbs are easy: they can only ever be executing from a
- * single context. */
- if (is_sync_kiocb(iocb)) {
- kiocbSetKicked(iocb);
- wake_up_process(iocb->ki_obj.tsk);
- return;
- }
-
- try_queue_kicked_iocb(iocb);
-}
-EXPORT_SYMBOL(kick_iocb);
-
/* aio_complete
* Called when the io request on the given iocb is complete.
* Returns true if this is the last user of the request. The
@@ -924,9 +629,6 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
*/
spin_lock_irqsave(&ctx->ctx_lock, flags);
- if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
- list_del_init(&iocb->ki_run_list);
-
/*
* cancelled requests don't get events, userland was given one
* when the event got cancelled.
@@ -1081,13 +783,11 @@ static int read_events(struct kioctx *ctx,
int i = 0;
struct io_event ent;
struct aio_timeout to;
- int retry = 0;
/* needed to zero any padding within an entry (there shouldn't be
* any, but C is fun!
*/
memset(&ent, 0, sizeof(ent));
-retry:
ret = 0;
while (likely(i < nr)) {
ret = aio_read_evt(ctx, &ent);
@@ -1117,13 +817,6 @@ retry:
/* End fast path */
- /* racey check, but it gets redone */
- if (!retry && unlikely(!list_empty(&ctx->run_list))) {
- retry = 1;
- aio_run_all_iocbs(ctx);
- goto retry;
- }
-
init_timeout(&to);
if (timeout) {
struct timespec ts;
@@ -1343,7 +1036,7 @@ static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
/* If we managed to write some out we return that, rather than
* the eventual error. */
if (opcode == IOCB_CMD_PWRITEV
- && ret < 0 && ret != -EIOCBQUEUED && ret != -EIOCBRETRY
+ && ret < 0 && ret != -EIOCBQUEUED
&& iocb->ki_nbytes - iocb->ki_left)
ret = iocb->ki_nbytes - iocb->ki_left;
@@ -1585,18 +1278,27 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
* don't see ctx->dead set here, io_destroy() waits for our IO to
* finish.
*/
- if (ctx->dead) {
- spin_unlock_irq(&ctx->ctx_lock);
+ if (ctx->dead)
ret = -EINVAL;
+ spin_unlock_irq(&ctx->ctx_lock);
+ if (ret)
goto out_put_req;
+
+ if (unlikely(kiocbIsCancelled(req))) {
+ ret = -EINTR;
+ } else {
+ ret = req->ki_retry(req);
}
- aio_run_iocb(req);
- if (!list_empty(&ctx->run_list)) {
- /* drain the run list */
- while (__aio_run_iocbs(ctx))
- ;
+ if (ret != -EIOCBQUEUED) {
+ /*
+ * There's no easy way to restart the syscall since other AIO's
+ * may be already running. Just fail this IO with EINTR.
+ */
+ if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
+ ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
+ ret = -EINTR;
+ aio_complete(req, ret, 0);
}
- spin_unlock_irq(&ctx->ctx_lock);
aio_put_req(req); /* drop extra ref to req */
return 0;
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 4f7795f..bad2583 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -2322,7 +2322,7 @@ int ocfs2_inode_lock_full_nested(struct inode *inode,
status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
arg_flags, subclass, _RET_IP_);
if (status < 0) {
- if (status != -EAGAIN && status != -EIOCBRETRY)
+ if (status != -EAGAIN)
mlog_errno(status);
goto bail;
}
diff --git a/fs/read_write.c b/fs/read_write.c
index d065348..7347732 100644
--- a/fs/read_write.c
+++ b/fs/read_write.c
@@ -318,16 +318,6 @@ int rw_verify_area(int read_write, struct file *file, loff_t *ppos, size_t count
return count > MAX_RW_COUNT ? MAX_RW_COUNT : count;
}
-static void wait_on_retry_sync_kiocb(struct kiocb *iocb)
-{
- set_current_state(TASK_UNINTERRUPTIBLE);
- if (!kiocbIsKicked(iocb))
- schedule();
- else
- kiocbClearKicked(iocb);
- __set_current_state(TASK_RUNNING);
-}
-
ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos)
{
struct iovec iov = { .iov_base = buf, .iov_len = len };
@@ -339,13 +329,7 @@ ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *pp
kiocb.ki_left = len;
kiocb.ki_nbytes = len;
- for (;;) {
- ret = filp->f_op->aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
- if (ret != -EIOCBRETRY)
- break;
- wait_on_retry_sync_kiocb(&kiocb);
- }
-
+ ret = filp->f_op->aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
if (-EIOCBQUEUED == ret)
ret = wait_on_sync_kiocb(&kiocb);
*ppos = kiocb.ki_pos;
@@ -395,13 +379,7 @@ ssize_t do_sync_write(struct file *filp, const char __user *buf, size_t len, lof
kiocb.ki_left = len;
kiocb.ki_nbytes = len;
- for (;;) {
- ret = filp->f_op->aio_write(&kiocb, &iov, 1, kiocb.ki_pos);
- if (ret != -EIOCBRETRY)
- break;
- wait_on_retry_sync_kiocb(&kiocb);
- }
-
+ ret = filp->f_op->aio_write(&kiocb, &iov, 1, kiocb.ki_pos);
if (-EIOCBQUEUED == ret)
ret = wait_on_sync_kiocb(&kiocb);
*ppos = kiocb.ki_pos;
@@ -568,13 +546,7 @@ ssize_t do_sync_readv_writev(struct file *filp, const struct iovec *iov,
kiocb.ki_left = len;
kiocb.ki_nbytes = len;
- for (;;) {
- ret = fn(&kiocb, iov, nr_segs, kiocb.ki_pos);
- if (ret != -EIOCBRETRY)
- break;
- wait_on_retry_sync_kiocb(&kiocb);
- }
-
+ ret = fn(&kiocb, iov, nr_segs, kiocb.ki_pos);
if (ret == -EIOCBQUEUED)
ret = wait_on_sync_kiocb(&kiocb);
*ppos = kiocb.ki_pos;
diff --git a/include/linux/aio.h b/include/linux/aio.h
index b46a09f..387dad0 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -14,18 +14,12 @@ struct kioctx;
#define KIOCB_SYNC_KEY (~0U)
/* ki_flags bits */
-#define KIF_KICKED 1
#define KIF_CANCELLED 2
-#define kiocbTryKick(iocb) test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags)
-
-#define kiocbSetKicked(iocb) set_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
-#define kiocbClearKicked(iocb) clear_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags)
-#define kiocbIsKicked(iocb) test_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags)
/* is there a better place to document function pointer methods? */
@@ -52,15 +46,6 @@ struct kioctx;
* not ask the method again -- ki_retry must ensure forward progress.
* aio_complete() must be called once and only once in the future, multiple
* calls may result in undefined behaviour.
- *
- * If ki_retry returns -EIOCBRETRY it has made a promise that kick_iocb()
- * will be called on the kiocb pointer in the future. This may happen
- * through generic helpers that associate kiocb->ki_wait with a wait
- * queue head that ki_retry uses via current->io_wait. It can also happen
- * with custom tracking and manual calls to kick_iocb(), though that is
- * discouraged. In either case, kick_iocb() must be called once and only
- * once. ki_retry must ensure forward progress, the AIO core will wait
- * indefinitely for kick_iocb() to be called.
*/
struct kiocb {
struct list_head ki_run_list;
@@ -160,7 +145,6 @@ static inline unsigned aio_ring_avail(struct aio_ring_info *info,
struct kioctx {
atomic_t users;
int dead;
- struct mm_struct *mm;
/* This needs improving */
unsigned long user_id;
@@ -172,15 +156,12 @@ struct kioctx {
int reqs_active;
struct list_head active_reqs; /* used for cancellation */
- struct list_head run_list; /* used for kicked reqs */
/* sys_io_setup currently limits this to an unsigned int */
unsigned max_reqs;
struct aio_ring_info ring_info;
- struct delayed_work wq;
-
struct rcu_head rcu_head;
};
@@ -188,7 +169,6 @@ struct kioctx {
#ifdef CONFIG_AIO
extern ssize_t wait_on_sync_kiocb(struct kiocb *iocb);
extern int aio_put_req(struct kiocb *iocb);
-extern void kick_iocb(struct kiocb *iocb);
extern int aio_complete(struct kiocb *iocb, long res, long res2);
struct mm_struct;
extern void exit_aio(struct mm_struct *mm);
@@ -197,7 +177,6 @@ extern long do_io_submit(aio_context_t ctx_id, long nr,
#else
static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
static inline int aio_put_req(struct kiocb *iocb) { return 0; }
-static inline void kick_iocb(struct kiocb *iocb) { }
static inline int aio_complete(struct kiocb *iocb, long res, long res2) { return 0; }
struct mm_struct;
static inline void exit_aio(struct mm_struct *mm) { }
diff --git a/include/linux/errno.h b/include/linux/errno.h
index f6bf082..89627b9 100644
--- a/include/linux/errno.h
+++ b/include/linux/errno.h
@@ -28,6 +28,5 @@
#define EBADTYPE 527 /* Type not supported by server */
#define EJUKEBOX 528 /* Request initiated, but will not complete before timeout */
#define EIOCBQUEUED 529 /* iocb queued, will get completion event */
-#define EIOCBRETRY 530 /* iocb queued, will trigger a retry */
#endif
--
1.7.12
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 57 ++++++++++++++++++++++++---------------------------------
1 file changed, 24 insertions(+), 33 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 4dcc02f..0ed14e8 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -8,6 +8,8 @@
*
* See ../COPYING for licensing terms.
*/
+#define pr_fmt(fmt) "%s: " fmt, __func__
+
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
@@ -18,8 +20,6 @@
#include <linux/backing-dev.h>
#include <linux/uio.h>
-#define DEBUG 0
-
#include <linux/sched.h>
#include <linux/fs.h>
#include <linux/file.h>
@@ -39,12 +39,6 @@
#include <asm/kmap_types.h>
#include <asm/uaccess.h>
-#if DEBUG > 1
-#define dprintk printk
-#else
-#define dprintk(x...) do { ; } while (0)
-#endif
-
#define AIO_RING_MAGIC 0xa10a10a1
#define AIO_RING_COMPAT_FEATURES 1
#define AIO_RING_INCOMPAT_FEATURES 0
@@ -124,7 +118,7 @@ static int __init aio_setup(void)
kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
- pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
+ pr_debug("sizeof(struct page) = %zu\n", sizeof(struct page));
return 0;
}
@@ -178,7 +172,7 @@ static int aio_setup_ring(struct kioctx *ctx)
}
info->mmap_size = nr_pages * PAGE_SIZE;
- dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
+ pr_debug("attempting mmap of %lu bytes\n", info->mmap_size);
down_write(&mm->mmap_sem);
info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size,
PROT_READ|PROT_WRITE,
@@ -190,7 +184,7 @@ static int aio_setup_ring(struct kioctx *ctx)
return -EAGAIN;
}
- dprintk("mmap address: 0x%08lx\n", info->mmap_base);
+ pr_debug("mmap address: 0x%08lx\n", info->mmap_base);
info->nr_pages = get_user_pages(current, mm, info->mmap_base, nr_pages,
1, 0, info->ring_pages, NULL);
up_write(&mm->mmap_sem);
@@ -262,7 +256,7 @@ static void __put_ioctx(struct kioctx *ctx)
aio_nr -= nr_events;
spin_unlock(&aio_nr_lock);
}
- pr_debug("__put_ioctx: freeing %p\n", ctx);
+ pr_debug("freeing %p\n", ctx);
call_rcu(&ctx->rcu_head, ctx_rcu_free);
}
@@ -351,7 +345,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
spin_unlock(&mm->ioctx_lock);
- dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
+ pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
ctx, ctx->user_id, mm, ctx->ring_info.nr);
return ctx;
@@ -360,7 +354,7 @@ out_cleanup:
aio_free_ring(ctx);
out_freectx:
kmem_cache_free(kioctx_cachep, ctx);
- dprintk("aio: error allocating ioctx %d\n", err);
+ pr_debug("error allocating ioctx %d\n", err);
return ERR_PTR(err);
}
@@ -607,8 +601,8 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
*/
static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
{
- dprintk(KERN_DEBUG "aio_put(%p): f_count=%ld\n",
- req, atomic_long_read(&req->ki_filp->f_count));
+ pr_debug("(%p): f_count=%ld\n",
+ req, atomic_long_read(&req->ki_filp->f_count));
assert_spin_locked(&ctx->ctx_lock);
@@ -721,9 +715,9 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
event->res = res;
event->res2 = res2;
- dprintk("aio_complete: %p[%lu]: %p: %p %Lx %lx %lx\n",
- ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
- res, res2);
+ pr_debug("%p[%lu]: %p: %p %Lx %lx %lx\n",
+ ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
+ res, res2);
/* after flagging the request as done, we
* must never even look at it again
@@ -779,9 +773,7 @@ static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
int ret = 0;
ring = kmap_atomic(info->ring_pages[0]);
- dprintk("in aio_read_evt h%lu t%lu m%lu\n",
- (unsigned long)ring->head, (unsigned long)ring->tail,
- (unsigned long)ring->nr);
+ pr_debug("h%u t%u m%u\n", ring->head, ring->tail, ring->nr);
if (ring->head == ring->tail)
goto out;
@@ -802,8 +794,7 @@ static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
out:
kunmap_atomic(ring);
- dprintk("leaving aio_read_evt: %d h%lu t%lu\n", ret,
- (unsigned long)ring->head, (unsigned long)ring->tail);
+ pr_debug("%d h%u t%u\n", ret, ring->head, ring->tail);
return ret;
}
@@ -866,13 +857,13 @@ static int read_events(struct kioctx *ctx,
if (unlikely(ret <= 0))
break;
- dprintk("read event: %Lx %Lx %Lx %Lx\n",
- ent.data, ent.obj, ent.res, ent.res2);
+ pr_debug("%Lx %Lx %Lx %Lx\n",
+ ent.data, ent.obj, ent.res, ent.res2);
/* Could we split the check in two? */
ret = -EFAULT;
if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- dprintk("aio: lost an event due to EFAULT.\n");
+ pr_debug("lost an event due to EFAULT.\n");
break;
}
ret = 0;
@@ -935,7 +926,7 @@ static int read_events(struct kioctx *ctx,
ret = -EFAULT;
if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- dprintk("aio: lost an event due to EFAULT.\n");
+ pr_debug("lost an event due to EFAULT.\n");
break;
}
@@ -966,7 +957,7 @@ static void io_destroy(struct kioctx *ioctx)
hlist_del_rcu(&ioctx->list);
spin_unlock(&mm->ioctx_lock);
- dprintk("aio_release(%p)\n", ioctx);
+ pr_debug("(%p)\n", ioctx);
if (likely(!was_dead))
put_ioctx(ioctx); /* twice for the list */
@@ -1259,7 +1250,7 @@ static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
kiocb->ki_retry = aio_fsync;
break;
default:
- dprintk("EINVAL: io_submit: no operation provided\n");
+ pr_debug("EINVAL: no operation provided\n");
ret = -EINVAL;
}
@@ -1279,7 +1270,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
/* enforce forwards compatibility on users */
if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) {
- pr_debug("EINVAL: io_submit: reserve field set\n");
+ pr_debug("EINVAL: reserve field set\n");
return -EINVAL;
}
@@ -1320,7 +1311,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
ret = put_user(req->ki_key, &user_iocb->aio_key);
if (unlikely(ret)) {
- dprintk("EFAULT: aio_key\n");
+ pr_debug("EFAULT: aio_key\n");
goto out_put_req;
}
@@ -1401,7 +1392,7 @@ long do_io_submit(aio_context_t ctx_id, long nr,
ctx = lookup_ioctx(ctx_id);
if (unlikely(!ctx)) {
- pr_debug("EINVAL: io_submit: invalid context id\n");
+ pr_debug("EINVAL: invalid context id\n");
return -EINVAL;
}
--
1.7.12
From: Zach Brown <[email protected]>
These are handy for measuring the cost of the aio infrastructure with
operations that do very little and complete immediately.
Signed-off-by: Zach Brown <[email protected]>
Signed-off-by: Kent Overstreet <[email protected]>
---
drivers/char/mem.c | 35 +++++++++++++++++++++++++++++++++++
1 file changed, 35 insertions(+)
diff --git a/drivers/char/mem.c b/drivers/char/mem.c
index 0537903..968ae6e 100644
--- a/drivers/char/mem.c
+++ b/drivers/char/mem.c
@@ -627,6 +627,18 @@ static ssize_t write_null(struct file *file, const char __user *buf,
return count;
}
+static ssize_t aio_read_null(struct kiocb *iocb, const struct iovec *iov,
+ unsigned long nr_segs, loff_t pos)
+{
+ return 0;
+}
+
+static ssize_t aio_write_null(struct kiocb *iocb, const struct iovec *iov,
+ unsigned long nr_segs, loff_t pos)
+{
+ return iov_length(iov, nr_segs);
+}
+
static int pipe_to_null(struct pipe_inode_info *info, struct pipe_buffer *buf,
struct splice_desc *sd)
{
@@ -670,6 +682,24 @@ static ssize_t read_zero(struct file *file, char __user *buf,
return written ? written : -EFAULT;
}
+static ssize_t aio_read_zero(struct kiocb *iocb, const struct iovec *iov,
+ unsigned long nr_segs, loff_t pos)
+{
+ size_t written = 0;
+ unsigned long i;
+ ssize_t ret;
+
+ for (i = 0; i < nr_segs; i++) {
+ ret = read_zero(iocb->ki_filp, iov[i].iov_base, iov[i].iov_len,
+ &pos);
+ if (ret < 0)
+ break;
+ written += ret;
+ }
+
+ return written ? written : -EFAULT;
+}
+
static int mmap_zero(struct file *file, struct vm_area_struct *vma)
{
#ifndef CONFIG_MMU
@@ -738,6 +768,7 @@ static int open_port(struct inode * inode, struct file * filp)
#define full_lseek null_lseek
#define write_zero write_null
#define read_full read_zero
+#define aio_write_zero aio_write_null
#define open_mem open_port
#define open_kmem open_mem
#define open_oldmem open_mem
@@ -766,6 +797,8 @@ static const struct file_operations null_fops = {
.llseek = null_lseek,
.read = read_null,
.write = write_null,
+ .aio_read = aio_read_null,
+ .aio_write = aio_write_null,
.splice_write = splice_write_null,
};
@@ -782,6 +815,8 @@ static const struct file_operations zero_fops = {
.llseek = zero_lseek,
.read = read_zero,
.write = write_zero,
+ .aio_read = aio_read_zero,
+ .aio_write = aio_write_zero,
.mmap = mmap_zero,
};
--
1.7.12
Minor refactoring, to get rid of some duplicated code
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 72 +++++++++++++++++++++++++++++++++++-----------------------------
1 file changed, 39 insertions(+), 33 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 91879d4..a993234 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -217,6 +217,29 @@ static inline void put_ioctx(struct kioctx *kioctx)
__put_ioctx(kioctx);
}
+static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
+ struct io_event *res)
+{
+ int (*cancel)(struct kiocb *, struct io_event *);
+ int ret = -EINVAL;
+
+ cancel = kiocb->ki_cancel;
+ kiocbSetCancelled(kiocb);
+ if (cancel) {
+ kiocb->ki_users++;
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ memset(res, 0, sizeof(*res));
+ res->obj = (u64) kiocb->ki_obj.user;
+ res->data = kiocb->ki_user_data;
+ ret = cancel(kiocb, res);
+
+ spin_lock_irq(&ctx->ctx_lock);
+ }
+
+ return ret;
+}
+
/* ioctx_alloc
* Allocates and initializes an ioctx. Returns an ERR_PTR if it failed.
*/
@@ -287,7 +310,6 @@ out_freectx:
*/
static void kill_ctx(struct kioctx *ctx)
{
- int (*cancel)(struct kiocb *, struct io_event *);
struct task_struct *tsk = current;
DECLARE_WAITQUEUE(wait, tsk);
struct io_event res;
@@ -298,14 +320,8 @@ static void kill_ctx(struct kioctx *ctx)
struct list_head *pos = ctx->active_reqs.next;
struct kiocb *iocb = list_kiocb(pos);
list_del_init(&iocb->ki_list);
- cancel = iocb->ki_cancel;
- kiocbSetCancelled(iocb);
- if (cancel) {
- iocb->ki_users++;
- spin_unlock_irq(&ctx->ctx_lock);
- cancel(iocb, &res);
- spin_lock_irq(&ctx->ctx_lock);
- }
+
+ kiocb_cancel(ctx, iocb, &res);
}
if (!ctx->reqs_active)
@@ -1411,7 +1427,7 @@ static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
struct io_event __user *, result)
{
- int (*cancel)(struct kiocb *iocb, struct io_event *res);
+ struct io_event res;
struct kioctx *ctx;
struct kiocb *kiocb;
u32 key;
@@ -1426,32 +1442,22 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
return -EINVAL;
spin_lock_irq(&ctx->ctx_lock);
- ret = -EAGAIN;
+
kiocb = lookup_kiocb(ctx, iocb, key);
- if (kiocb && kiocb->ki_cancel) {
- cancel = kiocb->ki_cancel;
- kiocb->ki_users ++;
- kiocbSetCancelled(kiocb);
- } else
- cancel = NULL;
+ if (kiocb)
+ ret = kiocb_cancel(ctx, kiocb, &res);
+ else
+ ret = -EAGAIN;
+
spin_unlock_irq(&ctx->ctx_lock);
- if (NULL != cancel) {
- struct io_event tmp;
- pr_debug("calling cancel\n");
- memset(&tmp, 0, sizeof(tmp));
- tmp.obj = (u64)(unsigned long)kiocb->ki_obj.user;
- tmp.data = kiocb->ki_user_data;
- ret = cancel(kiocb, &tmp);
- if (!ret) {
- /* Cancellation succeeded -- copy the result
- * into the user's buffer.
- */
- if (copy_to_user(result, &tmp, sizeof(tmp)))
- ret = -EFAULT;
- }
- } else
- ret = -EINVAL;
+ if (!ret) {
+ /* Cancellation succeeded -- copy the result
+ * into the user's buffer.
+ */
+ if (copy_to_user(result, &res, sizeof(res)))
+ ret = -EFAULT;
+ }
put_ioctx(ctx);
--
1.7.12
Nothing used the return value, and it probably wasn't possible to use it
safely for the locked versions (aio_complete(), aio_put_req()). Just
kill it.
Acked-by: Zach Brown <[email protected]>
Signed-off-by: Kent Overstreet <[email protected]>
---
fs/aio.c | 19 +++++++------------
include/linux/aio.h | 8 ++++----
2 files changed, 11 insertions(+), 16 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index 1de4f78..91879d4 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -528,7 +528,7 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
/* __aio_put_req
* Returns true if this put was the last user of the request.
*/
-static int __aio_put_req(struct kioctx *ctx, struct kiocb *req)
+static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
{
dprintk(KERN_DEBUG "aio_put(%p): f_count=%ld\n",
req, atomic_long_read(&req->ki_filp->f_count));
@@ -538,7 +538,7 @@ static int __aio_put_req(struct kioctx *ctx, struct kiocb *req)
req->ki_users--;
BUG_ON(req->ki_users < 0);
if (likely(req->ki_users))
- return 0;
+ return;
list_del(&req->ki_list); /* remove from active_reqs */
req->ki_cancel = NULL;
req->ki_retry = NULL;
@@ -546,21 +546,18 @@ static int __aio_put_req(struct kioctx *ctx, struct kiocb *req)
fput(req->ki_filp);
req->ki_filp = NULL;
really_put_req(ctx, req);
- return 1;
}
/* aio_put_req
* Returns true if this put was the last user of the kiocb,
* false if the request is still in use.
*/
-int aio_put_req(struct kiocb *req)
+void aio_put_req(struct kiocb *req)
{
struct kioctx *ctx = req->ki_ctx;
- int ret;
spin_lock_irq(&ctx->ctx_lock);
- ret = __aio_put_req(ctx, req);
+ __aio_put_req(ctx, req);
spin_unlock_irq(&ctx->ctx_lock);
- return ret;
}
EXPORT_SYMBOL(aio_put_req);
@@ -594,7 +591,7 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
* Returns true if this is the last user of the request. The
* only other user of the request can be the cancellation code.
*/
-int aio_complete(struct kiocb *iocb, long res, long res2)
+void aio_complete(struct kiocb *iocb, long res, long res2)
{
struct kioctx *ctx = iocb->ki_ctx;
struct aio_ring_info *info;
@@ -602,7 +599,6 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
struct io_event *event;
unsigned long flags;
unsigned long tail;
- int ret;
/*
* Special case handling for sync iocbs:
@@ -616,7 +612,7 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
iocb->ki_user_data = res;
iocb->ki_users = 0;
wake_up_process(iocb->ki_obj.tsk);
- return 1;
+ return;
}
info = &ctx->ring_info;
@@ -675,7 +671,7 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
put_rq:
/* everything turned out well, dispose of the aiocb. */
- ret = __aio_put_req(ctx, iocb);
+ __aio_put_req(ctx, iocb);
/*
* We have to order our ring_info tail store above and test
@@ -689,7 +685,6 @@ put_rq:
wake_up(&ctx->wait);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
- return ret;
}
EXPORT_SYMBOL(aio_complete);
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 387dad0..0ce99e8 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -168,16 +168,16 @@ struct kioctx {
/* prototypes */
#ifdef CONFIG_AIO
extern ssize_t wait_on_sync_kiocb(struct kiocb *iocb);
-extern int aio_put_req(struct kiocb *iocb);
-extern int aio_complete(struct kiocb *iocb, long res, long res2);
+extern void aio_put_req(struct kiocb *iocb);
+extern void aio_complete(struct kiocb *iocb, long res, long res2);
struct mm_struct;
extern void exit_aio(struct mm_struct *mm);
extern long do_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user *__user *iocbpp, bool compat);
#else
static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
-static inline int aio_put_req(struct kiocb *iocb) { return 0; }
-static inline int aio_complete(struct kiocb *iocb, long res, long res2) { return 0; }
+static inline void aio_put_req(struct kiocb *iocb) { }
+static inline void aio_complete(struct kiocb *iocb, long res, long res2) { }
struct mm_struct;
static inline void exit_aio(struct mm_struct *mm) { }
static inline long do_io_submit(aio_context_t ctx_id, long nr,
--
1.7.12
From: Zach Brown <[email protected]>
This removes the only in-tree user of aio retry. This will let us
remove the retry code from the aio core.
Removing retry is relatively easy as the USB gadget wasn't using it to
retry IOs at all. It always fully submitted the IO in the context of
the initial io_submit() call. It only used the AIO retry facility to
get the submitter's mm context for copying the result of a read back to
user space. This is easy to implement with use_mm() and a work struct,
much like kvm does with async_pf_execute() for get_user_pages().
Signed-off-by: Zach Brown <[email protected]>
Signed-off-by: Kent Overstreet <[email protected]>
---
drivers/usb/gadget/inode.c | 38 +++++++++++++++++++++++++++++---------
1 file changed, 29 insertions(+), 9 deletions(-)
diff --git a/drivers/usb/gadget/inode.c b/drivers/usb/gadget/inode.c
index 76494ca..2a3f001 100644
--- a/drivers/usb/gadget/inode.c
+++ b/drivers/usb/gadget/inode.c
@@ -24,6 +24,7 @@
#include <linux/sched.h>
#include <linux/slab.h>
#include <linux/poll.h>
+#include <linux/mmu_context.h>
#include <linux/device.h>
#include <linux/moduleparam.h>
@@ -514,6 +515,9 @@ static long ep_ioctl(struct file *fd, unsigned code, unsigned long value)
struct kiocb_priv {
struct usb_request *req;
struct ep_data *epdata;
+ struct kiocb *iocb;
+ struct mm_struct *mm;
+ struct work_struct work;
void *buf;
const struct iovec *iv;
unsigned long nr_segs;
@@ -541,15 +545,12 @@ static int ep_aio_cancel(struct kiocb *iocb, struct io_event *e)
return value;
}
-static ssize_t ep_aio_read_retry(struct kiocb *iocb)
+static ssize_t ep_copy_to_user(struct kiocb_priv *priv)
{
- struct kiocb_priv *priv = iocb->private;
ssize_t len, total;
void *to_copy;
int i;
- /* we "retry" to get the right mm context for this: */
-
/* copy stuff into user buffers */
total = priv->actual;
len = 0;
@@ -569,9 +570,26 @@ static ssize_t ep_aio_read_retry(struct kiocb *iocb)
if (total == 0)
break;
}
+
+ return len;
+}
+
+static void ep_user_copy_worker(struct work_struct *work)
+{
+ struct kiocb_priv *priv = container_of(work, struct kiocb_priv, work);
+ struct mm_struct *mm = priv->mm;
+ struct kiocb *iocb = priv->iocb;
+ size_t ret;
+
+ use_mm(mm);
+ ret = ep_copy_to_user(priv);
+ unuse_mm(mm);
+
+ /* completing the iocb can drop the ctx and mm, don't touch mm after */
+ aio_complete(iocb, ret, ret);
+
kfree(priv->buf);
kfree(priv);
- return len;
}
static void ep_aio_complete(struct usb_ep *ep, struct usb_request *req)
@@ -597,14 +615,14 @@ static void ep_aio_complete(struct usb_ep *ep, struct usb_request *req)
aio_complete(iocb, req->actual ? req->actual : req->status,
req->status);
} else {
- /* retry() won't report both; so we hide some faults */
+ /* ep_copy_to_user() won't report both; we hide some faults */
if (unlikely(0 != req->status))
DBG(epdata->dev, "%s fault %d len %d\n",
ep->name, req->status, req->actual);
priv->buf = req->buf;
priv->actual = req->actual;
- kick_iocb(iocb);
+ schedule_work(&priv->work);
}
spin_unlock(&epdata->dev->lock);
@@ -634,8 +652,10 @@ fail:
return value;
}
iocb->private = priv;
+ priv->iocb = iocb;
priv->iv = iv;
priv->nr_segs = nr_segs;
+ INIT_WORK(&priv->work, ep_user_copy_worker);
value = get_ready_ep(iocb->ki_filp->f_flags, epdata);
if (unlikely(value < 0)) {
@@ -647,6 +667,7 @@ fail:
get_ep(epdata);
priv->epdata = epdata;
priv->actual = 0;
+ priv->mm = current->mm; /* mm teardown waits for iocbs in exit_aio() */
/* each kiocb is coupled to one usb_request, but we can't
* allocate or submit those if the host disconnected.
@@ -675,7 +696,7 @@ fail:
kfree(priv);
put_ep(epdata);
} else
- value = (iv ? -EIOCBRETRY : -EIOCBQUEUED);
+ value = -EIOCBQUEUED;
return value;
}
@@ -693,7 +714,6 @@ ep_aio_read(struct kiocb *iocb, const struct iovec *iov,
if (unlikely(!buf))
return -ENOMEM;
- iocb->ki_retry = ep_aio_read_retry;
return ep_aio_rwtail(iocb, buf, iocb->ki_left, epdata, iov, nr_segs);
}
--
1.7.12
From: Zach Brown <[email protected]>
Signed-off-by: Zach Brown <[email protected]>
Signed-off-by: Kent Overstreet <[email protected]>
---
include/linux/aio.h | 24 ------------------------
1 file changed, 24 deletions(-)
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 31ff6db..b46a09f 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -9,44 +9,22 @@
#include <linux/atomic.h>
-#define AIO_MAXSEGS 4
-#define AIO_KIOGRP_NR_ATOMIC 8
-
struct kioctx;
-/* Notes on cancelling a kiocb:
- * If a kiocb is cancelled, aio_complete may return 0 to indicate
- * that cancel has not yet disposed of the kiocb. All cancel
- * operations *must* call aio_put_req to dispose of the kiocb
- * to guard against races with the completion code.
- */
-#define KIOCB_C_CANCELLED 0x01
-#define KIOCB_C_COMPLETE 0x02
-
#define KIOCB_SYNC_KEY (~0U)
/* ki_flags bits */
-/*
- * This may be used for cancel/retry serialization in the future, but
- * for now it's unused and we probably don't want modules to even
- * think they can use it.
- */
-/* #define KIF_LOCKED 0 */
#define KIF_KICKED 1
#define KIF_CANCELLED 2
-#define kiocbTryLock(iocb) test_and_set_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbTryKick(iocb) test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags)
-#define kiocbSetLocked(iocb) set_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbSetKicked(iocb) set_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
-#define kiocbClearLocked(iocb) clear_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbClearKicked(iocb) clear_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags)
-#define kiocbIsLocked(iocb) test_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbIsKicked(iocb) test_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags)
@@ -207,8 +185,6 @@ struct kioctx {
};
/* prototypes */
-extern unsigned aio_max_size;
-
#ifdef CONFIG_AIO
extern ssize_t wait_on_sync_kiocb(struct kiocb *iocb);
extern int aio_put_req(struct kiocb *iocb);
--
1.7.12
On Wed, Nov 28, 2012 at 08:43:24AM -0800, Kent Overstreet wrote:
> Bunch of performance improvements and cleanups Zach Brown and I have
> been working on. The code should be pretty solid at this point, though
> it could of course use more review and testing.
Thanks for sending these out. I have some initial review comments
that'll follow, but I'm running out of steam today. I'll continue
tomorrow.
> The results in my testing are pretty impressive, particularly when an
> ioctx is being shared between multiple threads. In my crappy synthetic
> benchmark, with 4 threads submitting and one thread reaping completions,
> I saw overhead in the aio code go from ~50% (mostly ioctx lock
> contention) to low single digits. Performance with ioctx per thread
> improved too, but I'd have to rerun those benchmarks.
You should probably mention that those four threads were *spinning* on
io_submit() :). I'm still guessing that this unreasonably inflated the
contention amongst submitters and that without this inflation we might
not find the per-cpu ioctx refcounts worth the trouble.
> Performance wise, the end result of this patch series is that submitting
> a kiocb writes to _no_ shared cachelines - the penalty for sharing an
> ioctx is gone there. There's still going to be some cacheline contention
> when we deliver the completions to the aio ringbuffer (at least if you
> have interrupts being delivered on multiple cores, which for high end
> stuff you do) but I have a couple more patches not in this series that
> implement coalescing for that (by taking advantage of interrupt
> coalescing). With that, there's basically no bottlenecks or performance
> issues to speak of in the aio code.
Yeah, this is good stuff. Thanks for pushing it.
We should mention Jens' omnibus patch that also took on these problems:
http://git.kernel.dk/?p=linux-block.git;a=commit;h=6b6723fc3e4f24dbd80526df935ca115ead578c6
- z
On Wed, Nov 28, 2012 at 08:43:31AM -0800, Kent Overstreet wrote:
> Minor refactoring, to get rid of some duplicated code
A minor nit:
> spin_lock_irq(&ctx->ctx_lock);
> - ret = -EAGAIN;
> +
> kiocb = lookup_kiocb(ctx, iocb, key);
> - if (kiocb && kiocb->ki_cancel) {
> - cancel = kiocb->ki_cancel;
> - kiocb->ki_users ++;
> - kiocbSetCancelled(kiocb);
> - } else
> - cancel = NULL;
...
> - if (NULL != cancel) {
> - } else
> - ret = -EINVAL;
In the old code it'd return -EINVAL for a NULL kiocb, despite that
misleading unused EAGAIN.
> + if (kiocb)
> + ret = kiocb_cancel(ctx, kiocb, &res);
> + else
> + ret = -EAGAIN;
But now it returns -EAGAIN.
I bet we want to err on the side of caution and maintain behaviour, no
matter how funky.
- z
> struct kioctx {
> atomic_t users;
> - int dead;
> + atomic_t dead;
Do we want to be paranoid and atomic_set() that to 0 when the ioctx is
allocated?
> + while (!list_empty(&ctx->active_reqs)) {
> + struct list_head *pos = ctx->active_reqs.next;
> + struct kiocb *iocb = list_kiocb(pos);
I'd use list_first_entry() and ignore the list_kiocb() wrapper, I think.
> + if (!atomic_xchg(&ctx->dead, 1)) {
> + hlist_del_rcu(&ctx->list);
> + synchronize_rcu();
> + hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
> + if (ctx->user_id == ctx_id){
> + BUG_ON(atomic_read(&ctx->dead));
Hmm. Won't it be possible to race lookup and io_destroy() to hit this
BUG?
- z
> - int i = 0;
> + DEFINE_WAIT(wait);
> + struct hrtimer_sleeper t;
> + size_t i = 0;
Changing i to size_t is kind of surprising. Is that on purpose?
> - set_task_state(tsk, TASK_RUNNING);
> - remove_wait_queue(&ctx->wait, &wait);
> -
> if (unlikely(ret <= 0))
> break;
>
> @@ -879,11 +844,10 @@ static int read_events(struct kioctx *ctx,
> event ++;
> i ++;
> }
> -
> - if (timeout)
> - clear_timeout(&to);
> out:
> - destroy_timer_on_stack(&to.timer);
> + hrtimer_cancel(&t.timer);
> + destroy_hrtimer_on_stack(&t.timer);
> + finish_wait(&ctx->wait, &wait);
I'd move the finish_wait() up to where TASK_RUNNING was set previously
so that we can't call copy_to_user() while still set to
TASK_INTERRUPTIBLE.
- z
> We can't use cmpxchg() on the ring buffer's head pointer directly, since
> it's modded to nr_events and would be susceptible to ABA. So instead we
> maintain a shadow head that uses the full 32 bits, and cmpxchg() that
> and then updated the real head pointer.
Time to update this comment to reflect the mutex instead of the shadow
head? :)
> + ev = kmap(page);
> + copy_ret = copy_to_user(event + ret, ev + pos, sizeof(*ev) * i);
> + kunmap(page);
For lack of a better time: do we want to bring up the missing
flush_dcache_page() calls around the kernel mappings of ring pages that
are also mapped to user addresses?
> prepare_to_wait_exclusive(&ctx->wait, &wait,
> TASK_INTERRUPTIBLE);
>
> + ret = aio_read_events(ctx, event + i, nr - i);
> + if (ret < 0)
> break;
As mentioned offlist: we don't want to be blocking under
TASK_INTERRUPTIBLE. Is the plan to do a non-blocking check and pop
outside the wait loop to do a blocking copy?
- z
On Wed, Nov 28, 2012 at 04:07:53PM -0800, Zach Brown wrote:
> On Wed, Nov 28, 2012 at 08:43:31AM -0800, Kent Overstreet wrote:
> > Minor refactoring, to get rid of some duplicated code
>
> A minor nit:
>
> > spin_lock_irq(&ctx->ctx_lock);
> > - ret = -EAGAIN;
> > +
> > kiocb = lookup_kiocb(ctx, iocb, key);
> > - if (kiocb && kiocb->ki_cancel) {
> > - cancel = kiocb->ki_cancel;
> > - kiocb->ki_users ++;
> > - kiocbSetCancelled(kiocb);
> > - } else
> > - cancel = NULL;
> ...
> > - if (NULL != cancel) {
> > - } else
> > - ret = -EINVAL;
>
> In the old code it'd return -EINVAL for a NULL kiocb, despite that
> misleading unused EAGAIN.
Oh damn, that's evil. Hadn't noticed that before.
>
> > + if (kiocb)
> > + ret = kiocb_cancel(ctx, kiocb, &res);
> > + else
> > + ret = -EAGAIN;
>
> But now it returns -EAGAIN.
>
> I bet we want to err on the side of caution and maintain behaviour, no
> matter how funky.
Agreed. Fixed patch below:
fatal: Not a git repository (or any of the parent directories): .git
commit 7ad9d0a1914e9563021c0f5f7fc55f20e41999e7
Author: Kent Overstreet <[email protected]>
Date: Wed Nov 28 16:57:48 2012 -0800
aio: kiocb_cancel()
Minor refactoring, to get rid of some duplicated code
Signed-off-by: Kent Overstreet <[email protected]>
diff --git a/fs/aio.c b/fs/aio.c
index 91879d4..786c904 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -217,6 +217,29 @@ static inline void put_ioctx(struct kioctx *kioctx)
__put_ioctx(kioctx);
}
+static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
+ struct io_event *res)
+{
+ int (*cancel)(struct kiocb *, struct io_event *);
+ int ret = -EINVAL;
+
+ cancel = kiocb->ki_cancel;
+ kiocbSetCancelled(kiocb);
+ if (cancel) {
+ kiocb->ki_users++;
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ memset(res, 0, sizeof(*res));
+ res->obj = (u64) kiocb->ki_obj.user;
+ res->data = kiocb->ki_user_data;
+ ret = cancel(kiocb, res);
+
+ spin_lock_irq(&ctx->ctx_lock);
+ }
+
+ return ret;
+}
+
/* ioctx_alloc
* Allocates and initializes an ioctx. Returns an ERR_PTR if it failed.
*/
@@ -287,7 +310,6 @@ out_freectx:
*/
static void kill_ctx(struct kioctx *ctx)
{
- int (*cancel)(struct kiocb *, struct io_event *);
struct task_struct *tsk = current;
DECLARE_WAITQUEUE(wait, tsk);
struct io_event res;
@@ -298,14 +320,8 @@ static void kill_ctx(struct kioctx *ctx)
struct list_head *pos = ctx->active_reqs.next;
struct kiocb *iocb = list_kiocb(pos);
list_del_init(&iocb->ki_list);
- cancel = iocb->ki_cancel;
- kiocbSetCancelled(iocb);
- if (cancel) {
- iocb->ki_users++;
- spin_unlock_irq(&ctx->ctx_lock);
- cancel(iocb, &res);
- spin_lock_irq(&ctx->ctx_lock);
- }
+
+ kiocb_cancel(ctx, iocb, &res);
}
if (!ctx->reqs_active)
@@ -1411,7 +1427,7 @@ static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
struct io_event __user *, result)
{
- int (*cancel)(struct kiocb *iocb, struct io_event *res);
+ struct io_event res;
struct kioctx *ctx;
struct kiocb *kiocb;
u32 key;
@@ -1426,32 +1442,22 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
return -EINVAL;
spin_lock_irq(&ctx->ctx_lock);
- ret = -EAGAIN;
+
kiocb = lookup_kiocb(ctx, iocb, key);
- if (kiocb && kiocb->ki_cancel) {
- cancel = kiocb->ki_cancel;
- kiocb->ki_users ++;
- kiocbSetCancelled(kiocb);
- } else
- cancel = NULL;
+ if (kiocb)
+ ret = kiocb_cancel(ctx, kiocb, &res);
+ else
+ ret = -EINVAL;
+
spin_unlock_irq(&ctx->ctx_lock);
- if (NULL != cancel) {
- struct io_event tmp;
- pr_debug("calling cancel\n");
- memset(&tmp, 0, sizeof(tmp));
- tmp.obj = (u64)(unsigned long)kiocb->ki_obj.user;
- tmp.data = kiocb->ki_user_data;
- ret = cancel(kiocb, &tmp);
- if (!ret) {
- /* Cancellation succeeded -- copy the result
- * into the user's buffer.
- */
- if (copy_to_user(result, &tmp, sizeof(tmp)))
- ret = -EFAULT;
- }
- } else
- ret = -EINVAL;
+ if (!ret) {
+ /* Cancellation succeeded -- copy the result
+ * into the user's buffer.
+ */
+ if (copy_to_user(result, &res, sizeof(res)))
+ ret = -EFAULT;
+ }
put_ioctx(ctx);
On Wed, Nov 28, 2012 at 04:24:36PM -0800, Zach Brown wrote:
> > - int i = 0;
> > + DEFINE_WAIT(wait);
> > + struct hrtimer_sleeper t;
> > + size_t i = 0;
>
> Changing i to size_t is kind of surprising. Is that on purpose?
I doubt it matters due to limits on ringbuffer size elsewhere but size_t
is more correct, since we're counting objects in an array.
> > - set_task_state(tsk, TASK_RUNNING);
> > - remove_wait_queue(&ctx->wait, &wait);
> > -
> > if (unlikely(ret <= 0))
> > break;
> >
> > @@ -879,11 +844,10 @@ static int read_events(struct kioctx *ctx,
> > event ++;
> > i ++;
> > }
> > -
> > - if (timeout)
> > - clear_timeout(&to);
> > out:
> > - destroy_timer_on_stack(&to.timer);
> > + hrtimer_cancel(&t.timer);
> > + destroy_hrtimer_on_stack(&t.timer);
> > + finish_wait(&ctx->wait, &wait);
>
> I'd move the finish_wait() up to where TASK_RUNNING was set previously
> so that we can't call copy_to_user() while still set to
> TASK_INTERRUPTIBLE.
Slightly less efficient, but yeah that's more correct and this code all
changes later anyways.
On Wed, Nov 28, 2012 at 04:17:59PM -0800, Zach Brown wrote:
>
> > struct kioctx {
> > atomic_t users;
> > - int dead;
> > + atomic_t dead;
>
> Do we want to be paranoid and atomic_set() that to 0 when the ioctx is
> allocated?
I suppose we should, yeah.
> > + while (!list_empty(&ctx->active_reqs)) {
> > + struct list_head *pos = ctx->active_reqs.next;
> > + struct kiocb *iocb = list_kiocb(pos);
>
> I'd use list_first_entry() and ignore the list_kiocb() wrapper, I think.
Fine by me.
>
> > + if (!atomic_xchg(&ctx->dead, 1)) {
> > + hlist_del_rcu(&ctx->list);
> > + synchronize_rcu();
>
> > + hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
> > + if (ctx->user_id == ctx_id){
> > + BUG_ON(atomic_read(&ctx->dead));
>
> Hmm. Won't it be possible to race lookup and io_destroy() to hit this
> BUG?
Ouch, yes. Dunno what I was thinking when I added that.
Hi Kent,
On Wed, Nov 28, 2012 at 08:43:36AM -0800, Kent Overstreet wrote:
> + * now it's safe to cancel any that need to be.
> + */
> +static void free_ioctx(struct kioctx *ctx)
...
> + aio_nr -= ctx->max_reqs;
> + spin_unlock(&aio_nr_lock);
> +
> + synchronize_rcu();
> +
> + pr_debug("freeing %p\n", ctx);
> + kmem_cache_free(kioctx_cachep, ctx);
> +}
As mentioned on irc, we probably want to avoid the synchronize_rcu()
overhead, since delays here will impact the time it takes for a task to
exit. Cheers,
-ben
--
"Thought is the essence of where you are now."
On Wed, Nov 28, 2012 at 07:46:31PM -0500, Benjamin LaHaise wrote:
> Hi Kent,
>
> On Wed, Nov 28, 2012 at 08:43:36AM -0800, Kent Overstreet wrote:
> > + * now it's safe to cancel any that need to be.
> > + */
> > +static void free_ioctx(struct kioctx *ctx)
> ...
> > + aio_nr -= ctx->max_reqs;
> > + spin_unlock(&aio_nr_lock);
> > +
> > + synchronize_rcu();
> > +
> > + pr_debug("freeing %p\n", ctx);
> > + kmem_cache_free(kioctx_cachep, ctx);
> > +}
>
> As mentioned on irc, we probably want to avoid the synchronize_rcu()
> overhead, since delays here will impact the time it takes for a task to
> exit. Cheers,
Yeah, suppose you're right. Have an updated patch below, which also
documents exactly what the rcu usage is for:
I'm not a big fan of the contortions where kill_ioctx() does one thing
and exit_aio() does a slightly different thing so the vm_munmap()
happens in the right context, but oh well.
commit 8f3f71c5e9ae0a76bcf019a8f00510076ecc052e
Author: Kent Overstreet <[email protected]>
Date: Wed Nov 28 17:27:19 2012 -0800
aio: Refcounting cleanup
The usage of ctx->dead was fubar - it makes no sense to explicitly
check it all over the place, especially when we're already using RCU.
Now, ctx->dead only indicates whether we've dropped the initial
refcount. The new teardown sequence is:
set ctx->dead
hlist_del_rcu();
synchronize_rcu();
Now we know no system calls can take a new ref, and it's safe to drop
the initial ref:
put_ioctx();
We also need to ensure there are no more outstanding kiocbs. This was
done incorrectly - it was being done in kill_ctx(), and before dropping
the initial refcount. At this point, other syscalls may still be
submitting kiocbs!
Now, we cancel and wait for outstanding kiocbs in free_ioctx(), after
kioctx->users has dropped to 0 and we know no more iocbs could be
submitted.
Signed-off-by: Kent Overstreet <[email protected]>
diff --git a/fs/aio.c b/fs/aio.c
index 4c9a5bf..7b75590 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -79,7 +79,7 @@ static inline unsigned aio_ring_avail(struct aio_ring_info *info,
struct kioctx {
atomic_t users;
- int dead;
+ atomic_t dead;
/* This needs improving */
unsigned long user_id;
@@ -98,6 +98,7 @@ struct kioctx {
struct aio_ring_info ring_info;
struct rcu_head rcu_head;
+ struct work_struct rcu_work;
};
/*------ sysctl variables----*/
@@ -234,44 +235,6 @@ static int aio_setup_ring(struct kioctx *ctx)
kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
} while(0)
-static void ctx_rcu_free(struct rcu_head *head)
-{
- struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
- kmem_cache_free(kioctx_cachep, ctx);
-}
-
-/* __put_ioctx
- * Called when the last user of an aio context has gone away,
- * and the struct needs to be freed.
- */
-static void __put_ioctx(struct kioctx *ctx)
-{
- unsigned nr_events = ctx->max_reqs;
- BUG_ON(atomic_read(&ctx->reqs_active));
-
- aio_free_ring(ctx);
- if (nr_events) {
- spin_lock(&aio_nr_lock);
- BUG_ON(aio_nr - nr_events > aio_nr);
- aio_nr -= nr_events;
- spin_unlock(&aio_nr_lock);
- }
- pr_debug("freeing %p\n", ctx);
- call_rcu(&ctx->rcu_head, ctx_rcu_free);
-}
-
-static inline int try_get_ioctx(struct kioctx *kioctx)
-{
- return atomic_inc_not_zero(&kioctx->users);
-}
-
-static inline void put_ioctx(struct kioctx *kioctx)
-{
- BUG_ON(atomic_read(&kioctx->users) <= 0);
- if (unlikely(atomic_dec_and_test(&kioctx->users)))
- __put_ioctx(kioctx);
-}
-
static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
struct io_event *res)
{
@@ -295,6 +258,61 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
return ret;
}
+static void free_ioctx_rcu(struct rcu_head *head)
+{
+ struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
+ kmem_cache_free(kioctx_cachep, ctx);
+}
+
+/*
+ * When this function runs, the kioctx has been removed from the "hash table"
+ * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
+ * now it's safe to cancel any that need to be.
+ */
+static void free_ioctx(struct kioctx *ctx)
+{
+ struct io_event res;
+ struct kiocb *iocb;
+
+ spin_lock_irq(&ctx->ctx_lock);
+
+ while (!list_empty(&ctx->active_reqs)) {
+ iocb = list_first_entry(&ctx->active_reqs,
+ struct kiocb, ki_list);
+
+ list_del_init(&iocb->ki_list);
+ kiocb_cancel(ctx, iocb, &res);
+ }
+
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ wait_event(ctx->wait, !atomic_read(&ctx->reqs_active));
+
+ aio_free_ring(ctx);
+
+ spin_lock(&aio_nr_lock);
+ BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
+ aio_nr -= ctx->max_reqs;
+ spin_unlock(&aio_nr_lock);
+
+ pr_debug("freeing %p\n", ctx);
+
+ /*
+ * Here the call_rcu() is between the wait_event() for reqs_active to
+ * hit 0, and freeing the ioctx.
+ *
+ * aio_complete() decrements reqs_active, but it has to touch the ioctx
+ * after to issue a wakeup so we use rcu.
+ */
+ call_rcu(&ctx->rcu_head, free_ioctx_rcu);
+}
+
+static void put_ioctx(struct kioctx *ctx)
+{
+ if (unlikely(atomic_dec_and_test(&ctx->users)))
+ free_ioctx(ctx);
+}
+
/* ioctx_alloc
* Allocates and initializes an ioctx. Returns an ERR_PTR if it failed.
*/
@@ -321,6 +339,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
ctx->max_reqs = nr_events;
atomic_set(&ctx->users, 2);
+ atomic_set(&ctx->dead, 0);
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);
@@ -358,43 +377,43 @@ out_freectx:
return ERR_PTR(err);
}
-/* kill_ctx
- * Cancels all outstanding aio requests on an aio context. Used
- * when the processes owning a context have all exited to encourage
- * the rapid destruction of the kioctx.
- */
-static void kill_ctx(struct kioctx *ctx)
+static void kill_ioctx_work(struct work_struct *work)
{
- struct task_struct *tsk = current;
- DECLARE_WAITQUEUE(wait, tsk);
- struct io_event res;
+ struct kioctx *ctx = container_of(work, struct kioctx, rcu_work);
- spin_lock_irq(&ctx->ctx_lock);
- ctx->dead = 1;
- while (!list_empty(&ctx->active_reqs)) {
- struct list_head *pos = ctx->active_reqs.next;
- struct kiocb *iocb = list_kiocb(pos);
- list_del_init(&iocb->ki_list);
+ wake_up_all(&ctx->wait);
+ put_ioctx(ctx);
+}
- kiocb_cancel(ctx, iocb, &res);
- }
+static void kill_ioctx_rcu(struct rcu_head *head)
+{
+ struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
- if (!atomic_read(&ctx->reqs_active))
- goto out;
+ INIT_WORK(&ctx->rcu_work, kill_ioctx_work);
+ schedule_work(&ctx->rcu_work);
+}
- add_wait_queue(&ctx->wait, &wait);
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- while (atomic_read(&ctx->reqs_active)) {
- spin_unlock_irq(&ctx->ctx_lock);
- io_schedule();
- set_task_state(tsk, TASK_UNINTERRUPTIBLE);
- spin_lock_irq(&ctx->ctx_lock);
- }
- __set_task_state(tsk, TASK_RUNNING);
- remove_wait_queue(&ctx->wait, &wait);
+/* kill_ioctx
+ * Cancels all outstanding aio requests on an aio context. Used
+ * when the processes owning a context have all exited to encourage
+ * the rapid destruction of the kioctx.
+ */
+static void kill_ioctx(struct kioctx *ctx)
+{
+ if (!atomic_xchg(&ctx->dead, 1)) {
+ hlist_del_rcu(&ctx->list);
+ /* Between hlist_del_rcu() and dropping the initial ref */
+ synchronize_rcu();
-out:
- spin_unlock_irq(&ctx->ctx_lock);
+ /*
+ * We can't punt to workqueue here because put_ioctx() ->
+ * free_ioctx() will unmap the ringbuffer, and that has to be
+ * done in the original process's context. kill_ioctx_rcu/work()
+ * exist for exit_aio(), as in that path free_ioctx() won't do
+ * the unmap.
+ */
+ kill_ioctx_work(&ctx->rcu_work);
+ }
}
/* wait_on_sync_kiocb:
@@ -413,27 +432,25 @@ ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
}
EXPORT_SYMBOL(wait_on_sync_kiocb);
-/* exit_aio: called when the last user of mm goes away. At this point,
- * there is no way for any new requests to be submited or any of the
- * io_* syscalls to be called on the context. However, there may be
- * outstanding requests which hold references to the context; as they
- * go away, they will call put_ioctx and release any pinned memory
- * associated with the request (held via struct page * references).
+/*
+ * exit_aio: called when the last user of mm goes away. At this point, there is
+ * no way for any new requests to be submited or any of the io_* syscalls to be
+ * called on the context.
+ *
+ * There may be outstanding kiocbs, but free_ioctx() will explicitly wait on
+ * them.
*/
void exit_aio(struct mm_struct *mm)
{
struct kioctx *ctx;
+ struct hlist_node *p, *n;
- while (!hlist_empty(&mm->ioctx_list)) {
- ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
- hlist_del_rcu(&ctx->list);
-
- kill_ctx(ctx);
-
+ hlist_for_each_entry_safe(ctx, p, n, &mm->ioctx_list, list) {
if (1 != atomic_read(&ctx->users))
printk(KERN_DEBUG
"exit_aio:ioctx still alive: %d %d %d\n",
- atomic_read(&ctx->users), ctx->dead,
+ atomic_read(&ctx->users),
+ atomic_read(&ctx->dead),
atomic_read(&ctx->reqs_active));
/*
* We don't need to bother with munmap() here -
@@ -444,7 +461,11 @@ void exit_aio(struct mm_struct *mm)
* place that uses ->mmap_size, so it's safe.
*/
ctx->ring_info.mmap_size = 0;
- put_ioctx(ctx);
+
+ if (!atomic_xchg(&ctx->dead, 1)) {
+ hlist_del_rcu(&ctx->list);
+ call_rcu(&ctx->rcu_head, kill_ioctx_rcu);
+ }
}
}
@@ -510,8 +531,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
kmem_cache_free(kiocb_cachep, req);
atomic_dec(&ctx->reqs_active);
}
- if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
- wake_up_all(&ctx->wait);
spin_unlock_irq(&ctx->ctx_lock);
}
@@ -607,18 +626,12 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
rcu_read_lock();
- hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
- /*
- * RCU protects us against accessing freed memory but
- * we have to be careful not to get a reference when the
- * reference count already dropped to 0 (ctx->dead test
- * is unreliable because of races).
- */
- if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
+ hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
+ if (ctx->user_id == ctx_id){
+ atomic_inc(&ctx->users);
ret = ctx;
break;
}
- }
rcu_read_unlock();
return ret;
@@ -655,12 +668,15 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
info = &ctx->ring_info;
- /* add a completion event to the ring buffer.
- * must be done holding ctx->ctx_lock to prevent
- * other code from messing with the tail
- * pointer since we might be called from irq
- * context.
+ /*
+ * Add a completion event to the ring buffer. Must be done holding
+ * ctx->ctx_lock to prevent other code from messing with the tail
+ * pointer since we might be called from irq context.
+ *
+ * Take rcu_read_lock() in case the kioctx is being destroyed, as we
+ * need to issue a wakeup after decrementing reqs_active.
*/
+ rcu_read_lock();
spin_lock_irqsave(&ctx->ctx_lock, flags);
list_del(&iocb->ki_list); /* remove from active_reqs */
@@ -726,6 +742,7 @@ put_rq:
wake_up(&ctx->wait);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ rcu_read_unlock();
}
EXPORT_SYMBOL(aio_complete);
@@ -869,7 +886,7 @@ static int read_events(struct kioctx *ctx,
break;
if (min_nr <= i)
break;
- if (unlikely(ctx->dead)) {
+ if (unlikely(atomic_read(&ctx->dead))) {
ret = -EINVAL;
break;
}
@@ -912,35 +929,6 @@ out:
return i ? i : ret;
}
-/* Take an ioctx and remove it from the list of ioctx's. Protects
- * against races with itself via ->dead.
- */
-static void io_destroy(struct kioctx *ioctx)
-{
- struct mm_struct *mm = current->mm;
- int was_dead;
-
- /* delete the entry from the list is someone else hasn't already */
- spin_lock(&mm->ioctx_lock);
- was_dead = ioctx->dead;
- ioctx->dead = 1;
- hlist_del_rcu(&ioctx->list);
- spin_unlock(&mm->ioctx_lock);
-
- pr_debug("(%p)\n", ioctx);
- if (likely(!was_dead))
- put_ioctx(ioctx); /* twice for the list */
-
- kill_ctx(ioctx);
-
- /*
- * Wake up any waiters. The setting of ctx->dead must be seen
- * by other CPUs at this point. Right now, we rely on the
- * locking done by the above calls to ensure this consistency.
- */
- wake_up_all(&ioctx->wait);
-}
-
/* sys_io_setup:
* Create an aio_context capable of receiving at least nr_events.
* ctxp must not point to an aio_context that already exists, and
@@ -976,7 +964,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
if (!IS_ERR(ioctx)) {
ret = put_user(ioctx->user_id, ctxp);
if (ret)
- io_destroy(ioctx);
+ kill_ioctx(ioctx);
put_ioctx(ioctx);
}
@@ -994,7 +982,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
{
struct kioctx *ioctx = lookup_ioctx(ctx);
if (likely(NULL != ioctx)) {
- io_destroy(ioctx);
+ kill_ioctx(ioctx);
put_ioctx(ioctx);
return 0;
}
@@ -1297,25 +1285,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
if (ret)
goto out_put_req;
- spin_lock_irq(&ctx->ctx_lock);
- /*
- * We could have raced with io_destroy() and are currently holding a
- * reference to ctx which should be destroyed. We cannot submit IO
- * since ctx gets freed as soon as io_submit() puts its reference. The
- * check here is reliable: io_destroy() sets ctx->dead before waiting
- * for outstanding IO and the barrier between these two is realized by
- * unlock of mm->ioctx_lock and lock of ctx->ctx_lock. Analogously we
- * increment ctx->reqs_active before checking for ctx->dead and the
- * barrier is realized by unlock and lock of ctx->ctx_lock. Thus if we
- * don't see ctx->dead set here, io_destroy() waits for our IO to
- * finish.
- */
- if (ctx->dead)
- ret = -EINVAL;
- spin_unlock_irq(&ctx->ctx_lock);
- if (ret)
- goto out_put_req;
-
if (unlikely(kiocbIsCancelled(req))) {
ret = -EINTR;
} else {
@@ -1341,9 +1310,6 @@ out_put_req:
spin_unlock_irq(&ctx->ctx_lock);
atomic_dec(&ctx->reqs_active);
- if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
- wake_up_all(&ctx->wait);
-
aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */
return ret;
Kent Overstreet <[email protected]> writes:
> This implements a refcount with similar semantics to
> atomic_get()/atomic_dec_and_test(), that starts out as just an atomic_t
> but dynamically switches to per cpu refcounting when the rate of
> gets/puts becomes too high.
This will only work if you put on the same CPU as you get, right?
In this case I would rather use RCU. It's clearly unusable for anything
blocking (or not get_cpu) Normally RCU already handles the "ref count for short non
blocking case"
Is that really true for AIO? It seems dubious.
-Andi
--
[email protected] -- Speaking for myself only
On Thu, Nov 29, 2012 at 10:45:04AM -0800, Andi Kleen wrote:
> Kent Overstreet <[email protected]> writes:
>
> > This implements a refcount with similar semantics to
> > atomic_get()/atomic_dec_and_test(), that starts out as just an atomic_t
> > but dynamically switches to per cpu refcounting when the rate of
> > gets/puts becomes too high.
>
> This will only work if you put on the same CPU as you get, right?
Nope, no such restriction.
> In this case I would rather use RCU. It's clearly unusable for anything
> blocking (or not get_cpu) Normally RCU already handles the "ref count for short non
> blocking case"
The kioctx refcount isn't held for short nonblocking duration,
io_getevents() holds it and may block for arbitrarily long. Maybe SRCU
could be made to work for it (I havent' really looked at the RCU
variants) but it doesn't seem like a good idea.
This thing really is just a refcount, the percpu part isn't exposed to
the user at all.
On Thu, Nov 29, 2012 at 10:57:20AM -0800, Kent Overstreet wrote:
> On Thu, Nov 29, 2012 at 10:45:04AM -0800, Andi Kleen wrote:
> > Kent Overstreet <[email protected]> writes:
> >
> > > This implements a refcount with similar semantics to
> > > atomic_get()/atomic_dec_and_test(), that starts out as just an atomic_t
> > > but dynamically switches to per cpu refcounting when the rate of
> > > gets/puts becomes too high.
> >
> > This will only work if you put on the same CPU as you get, right?
>
> Nope, no such restriction.
I don't see how you ensure you're doing the __this_cpu_dec on the same
CPU as you did the get
-Andi
On Wed, Nov 28, 2012 at 04:03:03PM -0800, Zach Brown wrote:
> On Wed, Nov 28, 2012 at 08:43:24AM -0800, Kent Overstreet wrote:
> > Bunch of performance improvements and cleanups Zach Brown and I have
> > been working on. The code should be pretty solid at this point, though
> > it could of course use more review and testing.
>
> Thanks for sending these out. I have some initial review comments
> that'll follow, but I'm running out of steam today. I'll continue
> tomorrow.
>
> > The results in my testing are pretty impressive, particularly when an
> > ioctx is being shared between multiple threads. In my crappy synthetic
> > benchmark, with 4 threads submitting and one thread reaping completions,
> > I saw overhead in the aio code go from ~50% (mostly ioctx lock
> > contention) to low single digits. Performance with ioctx per thread
> > improved too, but I'd have to rerun those benchmarks.
>
> You should probably mention that those four threads were *spinning* on
> io_submit() :). I'm still guessing that this unreasonably inflated the
> contention amongst submitters and that without this inflation we might
> not find the per-cpu ioctx refcounts worth the trouble.
Yeah, should've mentioned that :) It was intentionally a worst case
scenario for aio.
> > Performance wise, the end result of this patch series is that submitting
> > a kiocb writes to _no_ shared cachelines - the penalty for sharing an
> > ioctx is gone there. There's still going to be some cacheline contention
> > when we deliver the completions to the aio ringbuffer (at least if you
> > have interrupts being delivered on multiple cores, which for high end
> > stuff you do) but I have a couple more patches not in this series that
> > implement coalescing for that (by taking advantage of interrupt
> > coalescing). With that, there's basically no bottlenecks or performance
> > issues to speak of in the aio code.
>
> Yeah, this is good stuff. Thanks for pushing it.
>
> We should mention Jens' omnibus patch that also took on these problems:
>
> http://git.kernel.dk/?p=linux-block.git;a=commit;h=6b6723fc3e4f24dbd80526df935ca115ead578c6
Oh yeah. I think this patch series solves everything Jens was working on
in the aio code, but there's still dio stuff in that patch that's worth
looking at.
On Thu, Nov 29, 2012 at 07:59:53PM +0100, Andi Kleen wrote:
> On Thu, Nov 29, 2012 at 10:57:20AM -0800, Kent Overstreet wrote:
> > On Thu, Nov 29, 2012 at 10:45:04AM -0800, Andi Kleen wrote:
> > > Kent Overstreet <[email protected]> writes:
> > >
> > > > This implements a refcount with similar semantics to
> > > > atomic_get()/atomic_dec_and_test(), that starts out as just an atomic_t
> > > > but dynamically switches to per cpu refcounting when the rate of
> > > > gets/puts becomes too high.
> > >
> > > This will only work if you put on the same CPU as you get, right?
> >
> > Nope, no such restriction.
>
> I don't see how you ensure you're doing the __this_cpu_dec on the same
> CPU as you did the get
I'm not.
(I probably should've documented this a bit more before I sent it
out...)
The trick is that we don't watch for the refcount hitting 0 until we're
shutting down - so this only works if you keep track of your initial
refcount. As long as we're not shutting down, we know the refcount can't
hit 0 because we haven't released the initial refcount.
When we do want to shutdown, the user calls percpu_ref_kill() which
converts the percpu ref back to a single atomic ref, calls
synchronize_rcu(), then sets the ref's state to PCPU_REF_DEAD.
Only then does the caller drop the initial ref, and percpu_ref_put()
only does atomic_dec_and_test() when the ref is dead - otherwise it's
just doing a decrement.
Also, with the percpu refs - you can have all your gets happening on one
cpu, and all your puts happening on another - the percpu refs are
unsigned ints so overflow isn't undefined, and if they wrap they'll
still sum to the right value when we go to shut things down in
percpu_ref_kill().
> The trick is that we don't watch for the refcount hitting 0 until we're
> shutting down - so this only works if you keep track of your initial
> refcount. As long as we're not shutting down, we know the refcount can't
> hit 0 because we haven't released the initial refcount.
This seems dangerous to me: assume you have one CPU which always
does get and another does put. So there may be 2^32 such operations
without a kill and you wrap for real in a way that does not get
corrected.
Normally this can only happen if you have a lot of objects or CPUs
are limited.
But you don't have any limit on getting out-of-sync.
You could make it 64bit, but then wraps could happen.
-Andi
On Thu, Nov 29, 2012 at 08:20:03PM +0100, Andi Kleen wrote:
> > The trick is that we don't watch for the refcount hitting 0 until we're
> > shutting down - so this only works if you keep track of your initial
> > refcount. As long as we're not shutting down, we know the refcount can't
> > hit 0 because we haven't released the initial refcount.
>
> This seems dangerous to me: assume you have one CPU which always
> does get and another does put. So there may be 2^32 such operations
> without a kill and you wrap for real in a way that does not get
> corrected.
I don't know how to write a proof that it works (and I should... I
haven't done any real math in ages, argh) but try working out some
examples to see what happens:
cpu 0 does 2^32 gets, cpu 1 does 2^32 - 1 puts, actual ref should be 1:
cpu 0 ref: 0
cpu 1 ref: 1 (it started at 0, and subtracted 1 2^32 - 1 times)
cpu 0 does 2^32 + 1 gets, cpu 1 does 2^32 puts, again ref should be 1:
cpu 0 ref: 1
cpu 1 ref: 0
There's some kind of symmetry going on here, and if I'd been awake more
in college I could probably say exactly why it works, but it does.
On Wed, Nov 28, 2012 at 04:38:16PM -0800, Zach Brown wrote:
> > We can't use cmpxchg() on the ring buffer's head pointer directly, since
> > it's modded to nr_events and would be susceptible to ABA. So instead we
> > maintain a shadow head that uses the full 32 bits, and cmpxchg() that
> > and then updated the real head pointer.
>
> Time to update this comment to reflect the mutex instead of the shadow
> head? :)
Yeah :P
> > + ev = kmap(page);
> > + copy_ret = copy_to_user(event + ret, ev + pos, sizeof(*ev) * i);
> > + kunmap(page);
>
> For lack of a better time: do we want to bring up the missing
> flush_dcache_page() calls around the kernel mappings of ring pages that
> are also mapped to user addresses?
Well, it's mainly aio_complete() that needs to be doing
flush_dcache_page()... technically io_getevents should be too, but only
because it's updating the head pointer. Should probably do it now and
not forget about it again, but I think it should be its own patch.
> > prepare_to_wait_exclusive(&ctx->wait, &wait,
> > TASK_INTERRUPTIBLE);
> >
> > + ret = aio_read_events(ctx, event + i, nr - i);
> > + if (ret < 0)
> > break;
>
> As mentioned offlist: we don't want to be blocking under
> TASK_INTERRUPTIBLE. Is the plan to do a non-blocking check and pop
> outside the wait loop to do a blocking copy?
Argh. This thing is getting irritating. Gonna mull this over on irc
more.
On Thu, Nov 29, 2012 at 11:29:25AM -0800, Kent Overstreet wrote:
> There's some kind of symmetry going on here, and if I'd been awake more
> in college I could probably say exactly why it works, but it does.
I think the catch is that using only a 32 bit counter is something the
user could arbitrarily control the sum of all parts. I think a 64 bit
counter may be required to ensure no overflow occurs. Otherwise, an
overflow could result in a premature free when there are still 2^32
objects active thanks to a malicious user (possible on systems with lots
of memory these days -- remote, but possible).
-ben
--
"Thought is the essence of where you are now."
On Thu, Nov 29, 2012 at 02:34:52PM -0500, Benjamin LaHaise wrote:
> On Thu, Nov 29, 2012 at 11:29:25AM -0800, Kent Overstreet wrote:
> > There's some kind of symmetry going on here, and if I'd been awake more
> > in college I could probably say exactly why it works, but it does.
>
> I think the catch is that using only a 32 bit counter is something the
> user could arbitrarily control the sum of all parts. I think a 64 bit
> counter may be required to ensure no overflow occurs. Otherwise, an
> overflow could result in a premature free when there are still 2^32
> objects active thanks to a malicious user (possible on systems with lots
> of memory these days -- remote, but possible).
That's no different from regular atomic_t - but you're right, we
should be using size_t for anything userspace can manipulate.
Not gonna worry about it in this patch though because the refcount was
an atomic_t before and userspace can only do one get per thread.
Kent Overstreet <[email protected]> writes:
> On Thu, Nov 29, 2012 at 02:34:52PM -0500, Benjamin LaHaise wrote:
>> On Thu, Nov 29, 2012 at 11:29:25AM -0800, Kent Overstreet wrote:
>> > There's some kind of symmetry going on here, and if I'd been awake more
>> > in college I could probably say exactly why it works, but it does.
>>
>> I think the catch is that using only a 32 bit counter is something the
>> user could arbitrarily control the sum of all parts. I think a 64 bit
>> counter may be required to ensure no overflow occurs. Otherwise, an
>> overflow could result in a premature free when there are still 2^32
>> objects active thanks to a malicious user (possible on systems with lots
>> of memory these days -- remote, but possible).
>
> That's no different from regular atomic_t - but you're right, we
> should be using size_t for anything userspace can manipulate.
The regular atomic_t is limited in ways that you are not.
See my original mail.
-Andi
--
[email protected] -- Speaking for myself only
On Thu, Nov 29, 2012 at 12:42:17PM -0800, Andi Kleen wrote:
> Kent Overstreet <[email protected]> writes:
>
> > On Thu, Nov 29, 2012 at 02:34:52PM -0500, Benjamin LaHaise wrote:
> >> On Thu, Nov 29, 2012 at 11:29:25AM -0800, Kent Overstreet wrote:
> >> > There's some kind of symmetry going on here, and if I'd been awake more
> >> > in college I could probably say exactly why it works, but it does.
> >>
> >> I think the catch is that using only a 32 bit counter is something the
> >> user could arbitrarily control the sum of all parts. I think a 64 bit
> >> counter may be required to ensure no overflow occurs. Otherwise, an
> >> overflow could result in a premature free when there are still 2^32
> >> objects active thanks to a malicious user (possible on systems with lots
> >> of memory these days -- remote, but possible).
> >
> > That's no different from regular atomic_t - but you're right, we
> > should be using size_t for anything userspace can manipulate.
>
> The regular atomic_t is limited in ways that you are not.
> See my original mail.
I don't follow, can you explain?
> > The regular atomic_t is limited in ways that you are not.
> > See my original mail.
>
> I don't follow, can you explain?
For most cases the reference count is tied to some object, which are
naturally limited by memory size or other physical resources.
But in the assymetric CPU case with your ref count no such limiter
exists.
-Andi
--
[email protected] -- Speaking for myself only.
On Thu, Nov 29, 2012 at 09:54:47PM +0100, Andi Kleen wrote:
> > > The regular atomic_t is limited in ways that you are not.
> > > See my original mail.
> >
> > I don't follow, can you explain?
>
> For most cases the reference count is tied to some object, which are
> naturally limited by memory size or other physical resources.
>
> But in the assymetric CPU case with your ref count no such limiter
> exists.
It's got exactly the same limit as the old code which used the atomic_t
- we're limited by the number of threads that can be issuing aio
syscalls at a time.
The assymetry you're talking about _doesn't matter_, individual cpu
counters wrapping does not affect what the counters all sum to when we
go to tear down.
A coworker at lunch actually pointed out to me that the reason this is
true is just that modular arithmatic is still associative with addition
and subtraction.
Kent Overstreet wrote:
> On Thu, Nov 29, 2012 at 09:54:47PM +0100, Andi Kleen wrote:
> > > > The regular atomic_t is limited in ways that you are not.
> > > > See my original mail.
> > >
> > > I don't follow, can you explain?
> >
> > For most cases the reference count is tied to some object, which are
> > naturally limited by memory size or other physical resources.
> >
> > But in the assymetric CPU case with your ref count no such limiter
> > exists.
>
> It's got exactly the same limit as the old code which used the atomic_t
> - we're limited by the number of threads that can be issuing aio
> syscalls at a time.
>
> The assymetry you're talking about _doesn't matter_, individual cpu
> counters wrapping does not affect what the counters all sum to when we
> go to tear down.
>
> A coworker at lunch actually pointed out to me that the reason this is
> true is just that modular arithmatic is still associative with addition
> and subtraction.
It's just like jiffies. Everyone understands jiffies arithmetic I hope.
-- Jamie
On Wed, Nov 28, 2012 at 04:38:16PM -0800, Zach Brown wrote:
> As mentioned offlist: we don't want to be blocking under
> TASK_INTERRUPTIBLE. Is the plan to do a non-blocking check and pop
> outside the wait loop to do a blocking copy?
Here's the latest version that I posted on irc earlier:
commit 913ff32bbd4de15a87b07a87ac196e978bc29e17
Author: Kent Overstreet <[email protected]>
Date: Thu Nov 29 14:12:40 2012 -0800
aio: Make aio_read_evt() more efficient
Previously, aio_read_event() pulled a single completion off the
ringbuffer at a time, locking and unlocking each time.
Changed it to pull off as many events as it can at a time, and copy them
directly to userspace.
This also fixes a bug where if copying the event to userspace failed,
we'd lose the event.
Signed-off-by: Kent Overstreet <[email protected]>
diff --git a/fs/aio.c b/fs/aio.c
index 46e6d30..5eca2a4 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -63,7 +63,7 @@ struct aio_ring_info {
unsigned long mmap_size;
struct page **ring_pages;
- spinlock_t ring_lock;
+ struct mutex ring_lock;
long nr_pages;
unsigned nr, tail;
@@ -341,7 +341,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_set(&ctx->users, 2);
atomic_set(&ctx->dead, 0);
spin_lock_init(&ctx->ctx_lock);
- spin_lock_init(&ctx->ring_info.ring_lock);
+ mutex_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);
INIT_LIST_HEAD(&ctx->active_reqs);
@@ -746,149 +746,138 @@ put_rq:
}
EXPORT_SYMBOL(aio_complete);
-/* aio_read_evt
- * Pull an event off of the ioctx's event ring. Returns the number of
- * events fetched (0 or 1 ;-)
- * FIXME: make this use cmpxchg.
- * TODO: make the ringbuffer user mmap()able (requires FIXME).
+/* aio_read_events
+ * Pull an event off of the ioctx's event ring. Returns the number of
+ * events fetched
*/
-static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
+static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
+ long nr, unsigned *head)
{
- struct aio_ring_info *info = &ioctx->ring_info;
+ struct aio_ring_info *info = &ctx->ring_info;
struct aio_ring *ring;
- unsigned long head;
- int ret = 0;
+ unsigned pos;
+ int ret = 0, copy_ret;
- ring = kmap_atomic(info->ring_pages[0]);
- pr_debug("h%u t%u m%u\n", ring->head, ring->tail, ring->nr);
+ pr_debug("h%u t%u m%u\n", *head, info->tail, info->nr);
- if (ring->head == ring->tail)
- goto out;
+ while (ret < nr) {
+ unsigned i = (*head < info->tail ? info->tail : info->nr) - *head;
+ struct io_event *ev;
+ struct page *page;
+
+ if (*head == info->tail)
+ break;
+
+ i = min_t(int, i, nr - ret);
+ i = min_t(int, i, AIO_EVENTS_PER_PAGE -
+ ((*head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE));
+
+ pos = *head + AIO_EVENTS_OFFSET;
+ page = info->ring_pages[pos / AIO_EVENTS_PER_PAGE];
+ pos %= AIO_EVENTS_PER_PAGE;
- spin_lock(&info->ring_lock);
-
- head = ring->head % info->nr;
- if (head != ring->tail) {
- struct io_event *evp = aio_ring_event(info, head);
- *ent = *evp;
- head = (head + 1) % info->nr;
- smp_mb(); /* finish reading the event before updatng the head */
- ring->head = head;
- ret = 1;
- put_aio_ring_event(evp);
+ ev = kmap(page);
+ copy_ret = copy_to_user(event + ret, ev + pos, sizeof(*ev) * i);
+ kunmap(page);
+
+ if (unlikely(copy_ret))
+ return -EFAULT;
+
+ ret += i;
+ *head += i;
+ *head %= info->nr;
}
- spin_unlock(&info->ring_lock);
-out:
+ smp_mb(); /* finish reading the event before updating the head */
+
+ ring = kmap_atomic(info->ring_pages[0]);
+ ring->head = *head;
kunmap_atomic(ring);
- pr_debug("%d h%u t%u\n", ret, ring->head, ring->tail);
+
+ pr_debug("%d h%u t%u\n", ret, *head, info->tail);
+
return ret;
}
static int read_events(struct kioctx *ctx,
- long min_nr, long nr,
- struct io_event __user *event,
- struct timespec __user *timeout)
+ long min_nr, long nr,
+ struct io_event __user *event,
+ struct timespec __user *timeout)
{
DEFINE_WAIT(wait);
+ struct aio_ring_info *info = &ctx->ring_info;
+ struct aio_ring *ring;
struct hrtimer_sleeper t;
+ unsigned head;
size_t i = 0;
- int ret;
- struct io_event ent;
+ int ret = 0;
- /* needed to zero any padding within an entry (there shouldn't be
- * any, but C is fun!
- */
- memset(&ent, 0, sizeof(ent));
- ret = 0;
- while (likely(i < nr)) {
- ret = aio_read_evt(ctx, &ent);
- if (unlikely(ret <= 0))
- break;
+ hrtimer_init_on_stack(&t.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+ hrtimer_init_sleeper(&t, current);
- pr_debug("%Lx %Lx %Lx %Lx\n",
- ent.data, ent.obj, ent.res, ent.res2);
+ mutex_lock(&info->ring_lock);
- /* Could we split the check in two? */
- ret = -EFAULT;
- if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- pr_debug("lost an event due to EFAULT.\n");
+ while (i < nr) {
+ ring = kmap_atomic(info->ring_pages[0]);
+ head = ring->head;
+ kunmap_atomic(ring);
+retry:
+ ret = aio_read_events(ctx, event + i, nr - i, &head);
+ if (ret < 0)
break;
- }
- ret = 0;
- /* Good, event copied to userland, update counts. */
- event ++;
- i ++;
- }
-
- if (min_nr <= i)
- return i;
- if (ret)
- return ret;
-
- /* End fast path */
+ i += ret;
+ if (i >= min_nr)
+ break;
+ if (unlikely(atomic_read(&ctx->dead))) {
+ ret = -EINVAL;
+ break;
+ }
+ if (!t.task) /* Only check after read evt */
+ break;
- hrtimer_init_on_stack(&t.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
- hrtimer_init_sleeper(&t, current);
+ if (timeout) {
+ struct timespec ts;
- if (timeout) {
- struct timespec ts;
+ if (unlikely(copy_from_user(&ts, timeout, sizeof(ts)))) {
+ ret = -EFAULT;
+ break;
+ }
- if (unlikely(copy_from_user(&ts, timeout, sizeof(ts)))) {
- ret = -EFAULT;
- goto out;
+ timeout = NULL;
+ hrtimer_start_range_ns(&t.timer, timespec_to_ktime(ts),
+ current->timer_slack_ns,
+ HRTIMER_MODE_REL);
}
- hrtimer_start_range_ns(&t.timer, timespec_to_ktime(ts),
- current->timer_slack_ns, HRTIMER_MODE_REL);
- }
-
- while (likely(i < nr)) {
prepare_to_wait_exclusive(&ctx->wait, &wait,
TASK_INTERRUPTIBLE);
- do {
- ret = aio_read_evt(ctx, &ent);
- if (ret)
- break;
- if (min_nr <= i)
- break;
- if (unlikely(atomic_read(&ctx->dead))) {
- ret = -EINVAL;
- break;
- }
- if (!t.task) /* Only check after read evt */
- break;
- /* Try to only show up in io wait if there are ops
- * in flight */
- if (atomic_read(&ctx->reqs_active))
- io_schedule();
- else
- schedule();
- if (signal_pending(current)) {
- ret = -EINTR;
- break;
- }
- /*ret = aio_read_evt(ctx, &ent);*/
- } while (1) ;
+ if (head != info->tail) {
+ __set_current_state(TASK_RUNNING);
+ goto retry;
+ }
- finish_wait(&ctx->wait, &wait);
+ mutex_unlock(&info->ring_lock);
- if (unlikely(ret <= 0))
- break;
+ /* Try to only show up in io wait if there are ops in flight */
+ if (atomic_read(&ctx->reqs_active))
+ io_schedule();
+ else
+ schedule();
- ret = -EFAULT;
- if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- pr_debug("lost an event due to EFAULT.\n");
- break;
+ if (signal_pending(current)) {
+ ret = -EINTR;
+ goto out;
}
- /* Good, event copied to userland, update counts. */
- event ++;
- i ++;
+ __set_current_state(TASK_RUNNING);
+ mutex_lock(&info->ring_lock);
}
+
+ mutex_unlock(&info->ring_lock);
out:
+ finish_wait(&ctx->wait, &wait);
hrtimer_cancel(&t.timer);
destroy_hrtimer_on_stack(&t.timer);
return i ? i : ret;