2003-03-05 09:02:49

by Suparna Bhattacharya

[permalink] [raw]
Subject: [RFC][Patch] Retry based aio read for filesystems

For the last few days I've been playing with prototyping
a particular flavour of a retry based implementation for
filesystem aio read.

It is rather experimental at this point, with only
limited bits of testing so far. [There are some potential
races which seem to show up in the case of large reads
requiring faulting in of the user space buffer. I've
chosen to leave some printks on now ]

Am posting these initial patches as responses to this
mail for early comments/feedback on the approach and
to improve chances of unearthing any potential
gotchas sooner rather than later :)

A few alternative variations are possible around the
basic retry-driven theme that Ben LaHaise has designed -
each with their pros and cons, so I am hoping that sharing
experiences on what we're trying out may be a step
towards some insights in determining what works best.

Its in two parts:
aioretry.patch : Core aio infrastructure modifications
aioread.patch : Modifications for aio read in
particular (generic_file_aio_read)

I was working with 2.5.62, and haven't yet tried
upgrading to 2.5.64.

The idea was to :
- Keep things as simple as possible with minimal changes
to the current filesystem read/write paths (which I
guess we don't want to disturb too much at this stage).
The way to do this is to make the base aio infrastructure
handle most of the async (retry) complexities.
- Take incremental steps towards making an operation async
- start with blocking->async conversions for the major
blocking points.
- Keep in mind Dave Miller's concern about effect of aio
additions (extra parameters) on sync i/o behaviour. So
retaining sync i/o performance as far as possible gets
precedence over aio performance and latency right now.
- Take incremental steps towards tuning aio performance,
and optimizing specific paths for aio.

A retry-based model implies that each time we make as
much progress as is possible in a non-blocking manner,
and then defer a restart of the operation from where we
left off, at the next opportunity ... and so on, until
we finish. To make sure that a next opportunity does
indeed arise, each time we do _more_ than what we'd do
for simple non-blocking i/o -- we initiate some steps
towards progress without actually waiting for it. Then
we try to mark ourselves to be notified when enough is
ready to try the next restart-from-where-we-left-off
attempt.

3 decisions characterise this particular variation:

1. At what level to retry ?

This patch does it at the highest level (fops level),
i.e. high level aio code retries the generic read
operation passing in the remaining parts of the buffer
to be read.

No retries happen in the sync case at the moment,
though the option exists.

2. What kind of state needs to be saved across retries
and who maintains that ?

The high level aio code keeps adjusting the parameters
to the read as retries progress (this is done by the
routine calling the fops)

There is, however, room for optimizations when
low-level paths can be modified to reuse state
across aio-retries.

3. When to trigger a retry ?

This is the tricky part. This variation uses async
wait queue functions instead of the blocking wait, to
trigger a retry (kick_iocb) when data becomes available.
So the synchronous lock_page, wait_on_page_bit have
their async variations which do an async wait and
return -EIOCBQUEUED (which is propogated up).

(BTW, the races that I'm running into are mostly
related to this area - avoiding simoultaneous retries,
dealing with completion while retry is going on
etc. We need to audit the code and think about this
more closely. I wanted to keep the async wakeup
as simple as possible ... and deal with the races
in some other way)

Ben,

Is this anywhere close to what you had in mind, or
have played with, or were you aiming for retries
at a different (lower) level ? How's your experience
been with other variations that you've tried ?

Would be interesting to take a look and compare notes,
if possible.
Any chance you'll be putting something out ?

I guess the retry state you maintain may be a little more
specific to the target/type of aio, or is implemented in
a way lets state already updated by lower level functions
to be reused/carried over directly, rather than recomputed
by the aio code ... - does that work out to be more
optimal ? Or were you working on a different technique
altogether ?

There are a couple of fixes that I made in the aio code
as part of the patch.
- The kiocbClearXXX were doing a set_bit instead of
clear_bit
- Sync iocbs were not woken up when iocb->ki_users = 1
(dio takes a different path for sync and async iocbs,
so maybe that's why we weren't seeing the problem yet)

Hope that's okay.

Regards
Suparna
--
Suparna Bhattacharya ([email protected])
Linux Technology Center
IBM Software Labs, India


2003-03-05 09:11:05

by Suparna Bhattacharya

[permalink] [raw]
Subject: Re: [Patch 1/2] Retry based aio read - core aio changes

On Wed, Mar 05, 2003 at 02:47:54PM +0530, Suparna Bhattacharya wrote:
> For the last few days I've been playing with prototyping
> a particular flavour of a retry based implementation for
> filesystem aio read.


# aioretry.patch : Core aio infrastructure modifications
for high-level retry based aio

Includes couple of fixes in the aio code
-The kiocbClearXXX were doing a set_bit instead of
clear_bit
-Sync iocbs were not woken up when iocb->ki_users = 1
(dio takes a different path for sync and async iocbs,
so maybe that's why we weren't seeing the problem yet)

Regards
Suparna

----------------------------------------------------

diff -ur linux-2.5.62/fs/aio.c linux-2.5.62-aio/fs/aio.c
--- linux-2.5.62/fs/aio.c Tue Feb 18 04:26:14 2003
+++ linux-2.5.62-aio/fs/aio.c Tue Mar 4 19:54:24 2003
@@ -314,14 +314,15 @@
*/
ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
{
- while (iocb->ki_users) {
+ printk("wait_on_sync_iocb\n");
+ while ((iocb->ki_users) && !kiocbIsKicked(iocb)) {
set_current_state(TASK_UNINTERRUPTIBLE);
- if (!iocb->ki_users)
+ if (!iocb->ki_users || kiocbIsKicked(iocb))
break;
schedule();
}
__set_current_state(TASK_RUNNING);
- return iocb->ki_user_data;
+ return iocb->ki_users ? -EIOCBQUEUED : iocb->ki_user_data;
}

/* exit_aio: called when the last user of mm goes away. At this point,
@@ -395,6 +396,7 @@
req->ki_cancel = NULL;
req->ki_retry = NULL;
req->ki_user_obj = NULL;
+ INIT_LIST_HEAD(&req->ki_run_list);

/* Check if the completion queue has enough free space to
* accept an event from this io.
@@ -558,15 +560,20 @@
enter_lazy_tlb(mm, current, smp_processor_id());
}

-/* Run on kevent's context. FIXME: needs to be per-cpu and warn if an
- * operation blocks.
- */
-static void aio_kick_handler(void *data)
+static inline int __queue_kicked_iocb(struct kiocb *iocb)
{
- struct kioctx *ctx = data;
+ struct kioctx *ctx = iocb->ki_ctx;

- use_mm(ctx->mm);
+ if (list_empty(&iocb->ki_run_list)) {
+ list_add_tail(&iocb->ki_run_list,
+ &ctx->run_list);
+ return 1;
+ }
+ return 0;
+}

+static void aio_run_iocbs(struct kioctx *ctx)
+{
spin_lock_irq(&ctx->ctx_lock);
while (!list_empty(&ctx->run_list)) {
struct kiocb *iocb;
@@ -574,30 +581,67 @@

iocb = list_entry(ctx->run_list.next, struct kiocb,
ki_run_list);
- list_del(&iocb->ki_run_list);
+ list_del_init(&iocb->ki_run_list);
iocb->ki_users ++;
- spin_unlock_irq(&ctx->ctx_lock);

- kiocbClearKicked(iocb);
- ret = iocb->ki_retry(iocb);
- if (-EIOCBQUEUED != ret) {
- aio_complete(iocb, ret, 0);
- iocb = NULL;
+ if (!kiocbTryStart(iocb)) {
+ kiocbClearKicked(iocb);
+ spin_unlock_irq(&ctx->ctx_lock);
+ ret = iocb->ki_retry(iocb);
+ if (-EIOCBQUEUED != ret) {
+ if (list_empty(&iocb->ki_wait.task_list))
+ aio_complete(iocb, ret, 0);
+ else
+ printk("can't delete iocb in use\n");
+ }
+ spin_lock_irq(&ctx->ctx_lock);
+ kiocbClearStarted(iocb);
+ if (kiocbIsKicked(iocb))
+ __queue_kicked_iocb(iocb);
+ } else {
+ printk("iocb already started\n");
}
-
- spin_lock_irq(&ctx->ctx_lock);
if (NULL != iocb)
__aio_put_req(ctx, iocb);
}
spin_unlock_irq(&ctx->ctx_lock);

+}
+
+/* Run on aiod/kevent's context. FIXME: needs to be per-cpu and warn if an
+ * operation blocks.
+ */
+static void aio_kick_handler(void *data)
+{
+ struct kioctx *ctx = data;
+
+ use_mm(ctx->mm);
+ aio_run_iocbs(ctx);
unuse_mm(ctx->mm);
}

-void kick_iocb(struct kiocb *iocb)
+
+void queue_kicked_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
+ unsigned long flags;
+ int run = 0;

+ WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
+
+ spin_lock_irqsave(&ctx->ctx_lock, flags);
+ run = __queue_kicked_iocb(iocb);
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ if (run) {
+ if (waitqueue_active(&ctx->wait))
+ wake_up(&ctx->wait);
+ else
+ queue_work(aio_wq, &ctx->wq);
+ }
+}
+
+void kick_iocb(struct kiocb *iocb)
+{
/* sync iocbs are easy: they can only ever be executing from a
* single context. */
if (is_sync_kiocb(iocb)) {
@@ -606,12 +650,11 @@
return;
}

- if (!kiocbTryKick(iocb)) {
- unsigned long flags;
- spin_lock_irqsave(&ctx->ctx_lock, flags);
- list_add_tail(&iocb->ki_run_list, &ctx->run_list);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
- schedule_work(&ctx->wq);
+
+ if (!kiocbTryKick(iocb) && !kiocbIsStarted(iocb)) {
+ queue_kicked_iocb(iocb);
+ } else {
+ pr_debug("iocb already kicked or in progress\n");
}
}

@@ -642,13 +685,13 @@
iocb->ki_user_data = res;
if (iocb->ki_users == 1) {
iocb->ki_users = 0;
- return 1;
+ ret = 1;
+ } else {
+ spin_lock_irq(&ctx->ctx_lock);
+ iocb->ki_users--;
+ ret = (0 == iocb->ki_users);
+ spin_unlock_irq(&ctx->ctx_lock);
}
- spin_lock_irq(&ctx->ctx_lock);
- iocb->ki_users--;
- ret = (0 == iocb->ki_users);
- spin_unlock_irq(&ctx->ctx_lock);
-
/* sync iocbs put the task here for us */
wake_up_process(iocb->ki_user_obj);
return ret;
@@ -664,6 +707,9 @@
*/
spin_lock_irqsave(&ctx->ctx_lock, flags);

+ if (!list_empty(&iocb->ki_run_list))
+ list_del_init(&iocb->ki_run_list);
+
ring = kmap_atomic(info->ring_pages[0], KM_IRQ1);

tail = info->tail;
@@ -865,6 +911,8 @@
ret = 0;
if (to.timed_out) /* Only check after read evt */
break;
+ /* accelerate kicked iocbs for this ctx */
+ aio_run_iocbs(ctx);
schedule();
if (signal_pending(tsk)) {
ret = -EINTR;
@@ -984,6 +1032,114 @@
return -EINVAL;
}

+/* Called during initial submission and subsequent retry operations */
+long aio_process_iocb(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret = 0;
+
+ if (iocb->ki_retried++ > 1024*1024) {
+ printk("Maximal retry count. Bytes done %d\n",
+ iocb->ki_nbytes - iocb->ki_left);
+ return -EAGAIN;
+ }
+
+ if (!(iocb->ki_retried & 0xff)) {
+ printk("%ld aio retries completed %d bytes of %d\n",
+ iocb->ki_retried,
+ iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes);
+ }
+
+ BUG_ON(current->iocb != NULL);
+
+ current->iocb = iocb;
+
+ switch (iocb->ki_opcode) {
+ case IOCB_CMD_PREAD:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_READ)))
+ goto out;
+ ret = -EFAULT;
+ if (unlikely(!access_ok(VERIFY_WRITE, iocb->ki_buf,
+ iocb->ki_left)))
+ goto out;
+ ret = -EINVAL;
+ if (file->f_op->aio_read)
+ ret = file->f_op->aio_read(iocb, iocb->ki_buf,
+ iocb->ki_left, iocb->ki_pos);
+ break;
+ case IOCB_CMD_PWRITE:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_WRITE)))
+ goto out;
+ ret = -EFAULT;
+ if (unlikely(!access_ok(VERIFY_READ, iocb->ki_buf,
+ iocb->ki_left)))
+ goto out;
+ ret = -EINVAL;
+ if (file->f_op->aio_write)
+ ret = file->f_op->aio_write(iocb, iocb->ki_buf,
+ iocb->ki_left, iocb->ki_pos);
+ break;
+ case IOCB_CMD_FDSYNC:
+ ret = -EINVAL;
+ if (file->f_op->aio_fsync)
+ ret = file->f_op->aio_fsync(iocb, 1);
+ break;
+ case IOCB_CMD_FSYNC:
+ ret = -EINVAL;
+ if (file->f_op->aio_fsync)
+ ret = file->f_op->aio_fsync(iocb, 0);
+ break;
+ default:
+ dprintk("EINVAL: io_submit: no operation provided\n");
+ ret = -EINVAL;
+ }
+
+ pr_debug("aio_process_iocb: fop ret %d\n", ret);
+ if (likely(-EIOCBQUEUED == ret)) {
+ blk_run_queues();
+ goto out;
+ }
+ if (ret > 0) {
+ iocb->ki_buf += ret;
+ iocb->ki_left -= ret;
+
+ /* Not done yet or a short read, or.. */
+ if (iocb->ki_left
+ /* may have copied out data but not completed writing */
+ || ((iocb->ki_left == 0) &&
+ (iocb->ki_opcode = IOCB_CMD_PWRITE)) ){
+ /* FIXME:Can we find a better way to handle this ? */
+ /* Force an extra retry to determine if we're done */
+ ret = -EIOCBQUEUED;
+ goto out;
+ }
+
+ }
+
+ if (ret >= 0)
+ ret = iocb->ki_nbytes - iocb->ki_left;
+
+out:
+ pr_debug("ki_pos = %llu\n", iocb->ki_pos);
+ current->iocb = NULL;
+ if ((-EIOCBQUEUED == ret) && list_empty(&iocb->ki_wait.task_list)) {
+ kiocbSetKicked(iocb);
+ }
+
+ return ret;
+}
+
+int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync)
+{
+ struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
+
+ list_del_init(&wait->task_list);
+ kick_iocb(iocb);
+ return 1;
+}
+
static int FASTCALL(io_submit_one(struct kioctx *ctx, struct iocb *user_iocb,
struct iocb *iocb));
static int io_submit_one(struct kioctx *ctx, struct iocb *user_iocb,
@@ -991,8 +1147,7 @@
{
struct kiocb *req;
struct file *file;
- ssize_t ret;
- char *buf;
+ long ret;

/* enforce forwards compatibility on users */
if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 ||
@@ -1033,50 +1188,34 @@
req->ki_user_data = iocb->aio_data;
req->ki_pos = iocb->aio_offset;

- buf = (char *)(unsigned long)iocb->aio_buf;
+ req->ki_buf = (char *)(unsigned long)iocb->aio_buf;
+ req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
+ req->ki_opcode = iocb->aio_lio_opcode;
+ req->ki_retry = aio_process_iocb;
+ init_waitqueue_func_entry(&req->ki_wait, aio_wake_function);
+ INIT_LIST_HEAD(&req->ki_wait.task_list);
+ req->ki_retried = 0;
+ kiocbSetStarted(req);

- switch (iocb->aio_lio_opcode) {
- case IOCB_CMD_PREAD:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_READ)))
- goto out_put_req;
- ret = -EFAULT;
- if (unlikely(!access_ok(VERIFY_WRITE, buf, iocb->aio_nbytes)))
- goto out_put_req;
- ret = -EINVAL;
- if (file->f_op->aio_read)
- ret = file->f_op->aio_read(req, buf,
- iocb->aio_nbytes, req->ki_pos);
- break;
- case IOCB_CMD_PWRITE:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_WRITE)))
- goto out_put_req;
- ret = -EFAULT;
- if (unlikely(!access_ok(VERIFY_READ, buf, iocb->aio_nbytes)))
- goto out_put_req;
- ret = -EINVAL;
- if (file->f_op->aio_write)
- ret = file->f_op->aio_write(req, buf,
- iocb->aio_nbytes, req->ki_pos);
- break;
- case IOCB_CMD_FDSYNC:
- ret = -EINVAL;
- if (file->f_op->aio_fsync)
- ret = file->f_op->aio_fsync(req, 1);
- break;
- case IOCB_CMD_FSYNC:
- ret = -EINVAL;
- if (file->f_op->aio_fsync)
- ret = file->f_op->aio_fsync(req, 0);
- break;
- default:
- dprintk("EINVAL: io_submit: no operation provided\n");
- ret = -EINVAL;
- }
+ ret = aio_process_iocb(req);
+
+ if (likely(-EIOCBQUEUED == ret)) {
+ int run = 0;
+
+ spin_lock_irq(&ctx->ctx_lock);
+ kiocbClearStarted(req);
+ if (kiocbIsKicked(req))
+ run =__queue_kicked_iocb(req);
+ spin_unlock_irq(&ctx->ctx_lock);
+ if (run)
+ queue_work(aio_wq, &ctx->wq);

- if (likely(-EIOCBQUEUED == ret))
return 0;
+ }
+
+ if ((-EBADF == ret) || (-EFAULT == ret))
+ goto out_put_req;
+
aio_complete(req, ret, 0);
return 0;

diff -ur linux-2.5.62/include/linux/aio.h linux-2.5.62-aio/include/linux/aio.h
--- linux-2.5.62/include/linux/aio.h Tue Feb 18 04:25:50 2003
+++ linux-2.5.62-aio/include/linux/aio.h Mon Mar 3 12:17:12 2003
@@ -29,21 +29,26 @@
#define KIF_LOCKED 0
#define KIF_KICKED 1
#define KIF_CANCELLED 2
+#define KIF_STARTED 3

#define kiocbTryLock(iocb) test_and_set_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbTryKick(iocb) test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags)
+#define kiocbTryStart(iocb) test_and_set_bit(KIF_STARTED, &(iocb)->ki_flags)

#define kiocbSetLocked(iocb) set_bit(KIF_LOCKED, &(iocb)->ki_flags)
#define kiocbSetKicked(iocb) set_bit(KIF_KICKED, &(iocb)->ki_flags)
#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+#define kiocbSetStarted(iocb) set_bit(KIF_STARTED, &(iocb)->ki_flags)

-#define kiocbClearLocked(iocb) set_bit(KIF_LOCKED, &(iocb)->ki_flags)
-#define kiocbClearKicked(iocb) set_bit(KIF_KICKED, &(iocb)->ki_flags)
-#define kiocbClearCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+#define kiocbClearLocked(iocb) clear_bit(KIF_LOCKED, &(iocb)->ki_flags)
+#define kiocbClearKicked(iocb) clear_bit(KIF_KICKED, &(iocb)->ki_flags)
+#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+#define kiocbClearStarted(iocb) clear_bit(KIF_STARTED, &(iocb)->ki_flags)

#define kiocbIsLocked(iocb) test_bit(0, &(iocb)->ki_flags)
#define kiocbIsKicked(iocb) test_bit(1, &(iocb)->ki_flags)
#define kiocbIsCancelled(iocb) test_bit(2, &(iocb)->ki_flags)
+#define kiocbIsStarted(iocb) test_bit(3, &(iocb)->ki_flags)

struct kiocb {
struct list_head ki_run_list;
@@ -62,6 +67,14 @@
void *ki_user_obj; /* pointer to userland's iocb */
__u64 ki_user_data; /* user's data for completion */
loff_t ki_pos;
+
+ /* State that we remember to be able to restart/retry */
+ unsigned short ki_opcode;
+ size_t ki_nbytes; /* copy of iocb->aio_nbytes */
+ char *ki_buf; /* remaining iocb->aio_buf */
+ size_t ki_left; /* remaining bytes */
+ wait_queue_t ki_wait;
+ long ki_retried; /* just for testing */

char private[KIOCB_PRIVATE_SIZE];
};
@@ -77,6 +90,8 @@
(x)->ki_ctx = &tsk->active_mm->default_kioctx; \
(x)->ki_cancel = NULL; \
(x)->ki_user_obj = tsk; \
+ (x)->ki_user_data = 0; \
+ init_wait((&(x)->ki_wait)); \
} while (0)

#define AIO_RING_MAGIC 0xa10a10a1
diff -ur linux-2.5.62/include/linux/init_task.h linux-2.5.62-aio/include/linux/init_task.h
--- linux-2.5.62/include/linux/init_task.h Tue Feb 18 04:25:53 2003
+++ linux-2.5.62-aio/include/linux/init_task.h Thu Feb 27 19:01:39 2003
@@ -101,6 +101,7 @@
.alloc_lock = SPIN_LOCK_UNLOCKED, \
.switch_lock = SPIN_LOCK_UNLOCKED, \
.journal_info = NULL, \
+ .iocb = NULL, \
}


diff -ur linux-2.5.62/include/linux/sched.h linux-2.5.62-aio/include/linux/sched.h
--- linux-2.5.62/include/linux/sched.h Tue Feb 18 04:25:53 2003
+++ linux-2.5.62-aio/include/linux/sched.h Thu Feb 27 19:01:39 2003
@@ -418,6 +418,8 @@

unsigned long ptrace_message;
siginfo_t *last_siginfo; /* For ptrace use. */
+/* current aio handle */
+ struct kiocb *iocb;
};

extern void __put_task_struct(struct task_struct *tsk);
diff -ur linux-2.5.62/kernel/fork.c linux-2.5.62-aio/kernel/fork.c
--- linux-2.5.62/kernel/fork.c Tue Feb 18 04:25:53 2003
+++ linux-2.5.62-aio/kernel/fork.c Tue Mar 4 14:58:44 2003
@@ -128,6 +128,10 @@
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list))
__add_wait_queue(q, wait);
+ else {
+ if (current->iocb && (wait == &current->iocb->ki_wait))
+ printk("prepare_to_wait: iocb->ki_wait in use\n");
+ }
spin_unlock_irqrestore(&q->lock, flags);
}

@@ -834,6 +838,7 @@
p->lock_depth = -1; /* -1 = no lock */
p->start_time = get_jiffies_64();
p->security = NULL;
+ p->iocb = NULL;

retval = -ENOMEM;
if (security_task_alloc(p))

2003-03-05 09:14:52

by Suparna Bhattacharya

[permalink] [raw]
Subject: Re: [Patch 2/2] Retry based aio read - filesystem read changes

On Wed, Mar 05, 2003 at 02:47:54PM +0530, Suparna Bhattacharya wrote:
> For the last few days I've been playing with prototyping
> a particular flavour of a retry based implementation for
> filesystem aio read.

# aioread.patch : Modifications for aio read in
# particular (generic_file_aio_read)


diff -ur linux-2.5.62/include/linux/pagemap.h linux-2.5.62-aio/include/linux/pagemap.h
--- linux-2.5.62/include/linux/pagemap.h Tue Feb 18 04:25:49 2003
+++ linux-2.5.62-aio/include/linux/pagemap.h Thu Feb 27 19:01:39 2003
@@ -93,6 +93,16 @@
if (TestSetPageLocked(page))
__lock_page(page);
}
+
+extern int FASTCALL(__aio_lock_page(struct page *page));
+static inline int aio_lock_page(struct page *page)
+{
+ if (TestSetPageLocked(page))
+ return __aio_lock_page(page);
+ else
+ return 0;
+}
+

/*
* This is exported only for wait_on_page_locked/wait_on_page_writeback.
@@ -113,6 +123,15 @@
wait_on_page_bit(page, PG_locked);
}

+extern int FASTCALL(aio_wait_on_page_bit(struct page *page, int bit_nr));
+static inline int aio_wait_on_page_locked(struct page *page)
+{
+ if (PageLocked(page))
+ return aio_wait_on_page_bit(page, PG_locked);
+ else
+ return 0;
+}
+
/*
* Wait for a page to complete writeback
*/
diff -ur linux-2.5.62/mm/filemap.c linux-2.5.62-aio/mm/filemap.c
--- linux-2.5.62/mm/filemap.c Tue Feb 18 04:26:11 2003
+++ linux-2.5.62-aio/mm/filemap.c Mon Mar 3 19:32:40 2003
@@ -268,6 +268,43 @@
}
EXPORT_SYMBOL(wait_on_page_bit);

+int aio_schedule(void)
+{
+ if (!current->iocb) {
+ io_schedule();
+ return 0;
+ } else {
+ pr_debug("aio schedule");
+ return -EIOCBQUEUED;
+ }
+}
+
+int aio_wait_on_page_bit(struct page *page, int bit_nr)
+{
+ wait_queue_head_t *waitqueue = page_waitqueue(page);
+ DEFINE_WAIT(sync_wait);
+ wait_queue_t *wait = &sync_wait;
+ int state = TASK_UNINTERRUPTIBLE;
+
+ if (current->iocb) {
+ wait = &current->iocb->ki_wait;
+ state = TASK_RUNNING;
+ }
+
+ do {
+ prepare_to_wait(waitqueue, wait, state);
+ if (test_bit(bit_nr, &page->flags)) {
+ sync_page(page);
+ if (-EIOCBQUEUED == aio_schedule())
+ return -EIOCBQUEUED;
+ }
+ } while (test_bit(bit_nr, &page->flags));
+ finish_wait(waitqueue, wait);
+
+ return 0;
+}
+EXPORT_SYMBOL(aio_wait_on_page_bit);
+
/**
* unlock_page() - unlock a locked page
*
@@ -336,6 +373,31 @@
}
EXPORT_SYMBOL(__lock_page);

+int __aio_lock_page(struct page *page)
+{
+ wait_queue_head_t *wqh = page_waitqueue(page);
+ DEFINE_WAIT(sync_wait);
+ wait_queue_t *wait = &sync_wait;
+ int state = TASK_UNINTERRUPTIBLE;
+
+ if (current->iocb) {
+ wait = &current->iocb->ki_wait;
+ state = TASK_RUNNING;
+ }
+
+ while (TestSetPageLocked(page)) {
+ prepare_to_wait(wqh, wait, state);
+ if (PageLocked(page)) {
+ sync_page(page);
+ if (-EIOCBQUEUED == aio_schedule())
+ return -EIOCBQUEUED;
+ }
+ }
+ finish_wait(wqh, wait);
+ return 0;
+}
+EXPORT_SYMBOL(__aio_lock_page);
+
/*
* a rather lightweight function, finding and getting a reference to a
* hashed page atomically.
@@ -614,7 +676,13 @@
goto page_ok;

/* Get exclusive access to the page ... */
- lock_page(page);
+
+ if (aio_lock_page(page)) {
+ pr_debug("queued lock page \n");
+ error = -EIOCBQUEUED;
+ /* TBD: should we hold on to the cached page ? */
+ goto sync_error;
+ }

/* Did it get unhashed before we got the lock? */
if (!page->mapping) {
@@ -636,12 +704,18 @@
if (!error) {
if (PageUptodate(page))
goto page_ok;
- wait_on_page_locked(page);
+ if (aio_wait_on_page_locked(page)) {
+ pr_debug("queued wait_on_page \n");
+ error = -EIOCBQUEUED;
+ /*TBD:should we hold on to the cached page ?*/
+ goto sync_error;
+ }
if (PageUptodate(page))
goto page_ok;
error = -EIO;
}

+sync_error:
/* UHHUH! A synchronous read error occurred. Report it */
desc->error = error;
page_cache_release(page);
@@ -813,6 +887,7 @@
ssize_t ret;

init_sync_kiocb(&kiocb, filp);
+ BUG_ON(current->iocb != NULL);
ret = __generic_file_aio_read(&kiocb, &local_iov, 1, ppos);
if (-EIOCBQUEUED == ret)
ret = wait_on_sync_kiocb(&kiocb);
@@ -844,6 +919,7 @@
{
read_descriptor_t desc;

+ BUG_ON(current->iocb != NULL);
if (!count)
return 0;

2003-03-05 10:32:00

by Andrew Morton

[permalink] [raw]
Subject: Re: [Patch 2/2] Retry based aio read - filesystem read changes

Suparna Bhattacharya <[email protected]> wrote:
>
> +extern int FASTCALL(aio_wait_on_page_bit(struct page *page, int bit_nr));
> +static inline int aio_wait_on_page_locked(struct page *page)

Oh boy.

There are soooo many more places where we can block:

- write() -> down(&inode->i_sem)

- read()/write() -> read indirect block -> wait_on_buffer()

- read()/write() -> read bitmap block -> wait_on_buffer()

- write() -> allocate block -> mark_buffer_dirty() ->
balance_dirty_pages() -> writer throttling

- write() -> allocate block -> journal_get_write_access()

- read()/write() -> update_a/b/ctime() -> journal_get_write_access()

- ditto for other journalling filesystems

- read()/write() -> anything -> get_request_wait()
(This one can be avoided by polling the backing_dev_info congestion
flags)

- read()/write() -> page allocator -> blk_congestion_wait()

- write() -> balance_dirty_pages() -> writer throttling

- probably others.

Now, I assume that what you're looking for here is an 80% solution, but it
seems that a lot more changes will be needed to get even that far.

And given that a single kernel thread per spindle can easily keep that
spindle saturated all the time, one does wonder "why try to do it this way at
all"?

2003-03-05 11:59:13

by Suparna Bhattacharya

[permalink] [raw]
Subject: Re: [Patch 2/2] Retry based aio read - filesystem read changes

On Wed, Mar 05, 2003 at 02:42:54AM -0800, Andrew Morton wrote:
> Suparna Bhattacharya <[email protected]> wrote:
> >
> > +extern int FASTCALL(aio_wait_on_page_bit(struct page *page, int bit_nr));
> > +static inline int aio_wait_on_page_locked(struct page *page)
>
> Oh boy.
>
> There are soooo many more places where we can block:

Oh yes, there are lots (I'm simply shutting my eyes
to them till we've conquered at least a few :))

What I'm trying to do, is to look at them one at a time,
from the ones that have the greatest potential of benefits/
improvement based on impact and complexity.

Otherwise it'd be just too hard to get anywhere !

Actually that was one reason why I decided to post
this early. Want to catch the most important ones
and make sure we can deal with them at least.

Read is the easier case, which is why I started with
it. Will come back to write case sometime later after
I've played with a bit.

Even with read there is one more case besides your
list. The copy_to_user or fault_in_writable_pages can
block too ;) (that's what I'm running into) ..

However, the general idea is that any point of blocking
where it is possible for things to just work if we return
instead of waiting, and issue a retry that would
continue from the point where it left off, can be
handled within the basic framework. What we need to do
is to look at these on a case by case basis and see
if that is doable. My hunch is that for congestion and
throttling points it should be possible. And we already
have a lot of pipelining and restartability built
into the VFS now.

Mostly we just need to be able to make sure the
-EIOCBQUEUED returns can be propagated all the way up,
without breaking anything.

Its really the meta-data related waits that are a
black box for me, and wasn't planning on tackling yet ...
More so as I guess it could mean getting into very
filesystem specific territory so doing it consistently
may not be that easy.


>
> - write() -> down(&inode->i_sem)
>
> - read()/write() -> read indirect block -> wait_on_buffer()
>
> - read()/write() -> read bitmap block -> wait_on_buffer()
>
> - write() -> allocate block -> mark_buffer_dirty() ->
> balance_dirty_pages() -> writer throttling
>
> - write() -> allocate block -> journal_get_write_access()
>
> - read()/write() -> update_a/b/ctime() -> journal_get_write_access()
>
> - ditto for other journalling filesystems
>
> - read()/write() -> anything -> get_request_wait()
> (This one can be avoided by polling the backing_dev_info congestion
> flags)

I was thinking of a get_request_async_wait() that unplugs
the queue, and returns -EIOCBQUEUED after queueing the
async waiter to just retry the operation.

Of course this would work if the caller is able to push
back -EIOCBQUEUED without breaking anything in a
non-startable way.

>
> - read()/write() -> page allocator -> blk_congestion_wait()
>
> - write() -> balance_dirty_pages() -> writer throttling
>
> - probably others.
>
> Now, I assume that what you're looking for here is an 80% solution, but it
> seems that a lot more changes will be needed to get even that far.

If we don't bother about meta-data and indirect blocks
just yet, wouldn't the gains we get otherwise not be
worth it ?

>
> And given that a single kernel thread per spindle can easily keep that
> spindle saturated all the time, one does wonder "why try to do it this way at
> all"?

Need some more explanation on how/where you really break up
a generic_file_aio_read operation (with the page_cache_read,
readpage, copy_to_user) on a per-spindle basis. Aren't we at
a higher level of abstraction compared to disk at this stage ?
I can visualize delegating all readpage calls to a worker
thread per-backing device or something like that (forgetting
LVM/RAID for a while), but what about the rest of the parts ?

Are you suggesting restructuring the generic_file_aio_read
code to separate out these stages, so it can identify where
it is and handoff itself accordingly to the right worker ?

Regards
Suparna

--
Suparna Bhattacharya ([email protected])
Linux Technology Center
IBM Software Labs, India

2003-03-05 23:14:42

by Janet Morgan

[permalink] [raw]
Subject: Re: [RFC][Patch] Retry based aio read for filesystems

Suparna Bhattacharya wrote:

> For the last few days I've been playing with prototyping
> a particular flavour of a retry based implementation for
> filesystem aio read.

Hi Suparna,

I ran an aio-enabled version of fsx on your patches and no errors were
reported. I plan on using gcov to determine the test coverage I
obtained.

I also did a quick performance test using wall time to compare sync and
async read operations with your patches applied. For the sync case I
ran
10 processes in parallel, each reading 1GB in 1MB chunks from a
per-process
dedicated device. I compared that to a single aio process iteratively
calling
io_submit/io_getevents for up to 10 iocbs/events where each iocb
specified a
1MB read to its dedicated device until 1GB was read. Whew!

The result was that wall time for the sync and async testcases were
consistently identical, i.e., 1m30s:

# sync_test
start time: Wed Mar 5 14:02:05 PST 2003
end time: Wed Mar 5 14:03:35 PST 2003

# aio_test
start time: Wed Mar 5 13:52:04 PST 2003
end time: Wed Mar 5 13:53:34 PST 2003

syncio vmstat:
procs memory swap io
system cpu
r b w swpd free buff cache si so bi bo in cs us
sy id
4 6 2 3324 3888 11356 3668848 0 0 151296 0 2216 2395 0
100 0
4 6 1 3324 3888 11368 3668820 0 0 151744 5 2216 2368 0
100 0
1 9 1 3324 3952 11368 3668860 0 0 151209 1 2213 2356 0
100 0
5 5 1 3324 3940 11344 3668864 0 2 148948 3 2215 2387 0
100 0
1 9 1 3348 3936 11264 3668968 0 6 150767 7 2209 2345 0
100 0
6 4 1 3480 3920 11192 3669364 0 33 151456 33 2218 2340 0
100 0
4 6 2 3568 3896 11316 3669352 0 21 151887 21 2218 2385 0
100 0
7 3 1 3704 3820 11364 3669428 31 34 148687 35 2222 2344
1 99 0

aio vmstat:
procs memory swap io
system cpu
r b w swpd free buff cache si so bi bo in cs us
sy id
2 0 1 4016 4040 11192 3669644 0 17 133152 25 2073 502
0 40 60
1 0 1 4016 4104 11196 3669716 0 0 132288 1 2073 537
0 40 60
2 0 2 4016 4104 11196 3669764 0 0 132416 0 2067 511
0 40 60
1 0 1 4016 4104 11200 3669788 0 0 133088 1 2075 523
0 41 59
1 0 1 4016 5576 11200 3668240 0 0 132384 0 2066 526
0 40 60
1 0 1 4036 4092 11200 3669756 0 5 135116 5 2094 492
0 46 54
1 0 1 4180 4016 11192 3669944 0 36 135968 40 2111 499
0 46 54
2 0 2 4180 4060 11176 3669832 0 0 137152 0 2119 463
1 46 53
1 0 1 4180 7060 11180 3666980 0 0 136384 2 2107 498
0 44 56


-Janet

2003-03-14 13:07:36

by Suparna Bhattacharya

[permalink] [raw]
Subject: Re: [Patch 1/2] Retry based aio read - core aio changes

On Wed, Mar 05, 2003 at 02:56:33PM +0530, Suparna Bhattacharya wrote:
> On Wed, Mar 05, 2003 at 02:47:54PM +0530, Suparna Bhattacharya wrote:
> > For the last few days I've been playing with prototyping
> > a particular flavour of a retry based implementation for
> > filesystem aio read.
>
>
> # aioretry.patch : Core aio infrastructure modifications
> for high-level retry based aio

Ben pointed out that at least, we shouldn't be duplicating
the switch on every retry. So here's another take which
cleans things up a bit as well.

It is still very high-level (would like to experiment with
it a little further to find out how it behaves/performs in
practice).

(I've checked that that patch applies to 2.5.64-bk8)

Earlier today I posted separate patches for the
fixes listed below, so they are no longer part of this
patch.

Comments/feedback welcome.

>
> Includes couple of fixes in the aio code
> -The kiocbClearXXX were doing a set_bit instead of
> clear_bit
> -Sync iocbs were not woken up when iocb->ki_users = 1
> (dio takes a different path for sync and async iocbs,
> so maybe that's why we weren't seeing the problem yet)
>

Regards
Suparna


diff -ur linux-2.5.62/fs/aio.c linux-2.5.62-aio/fs/aio.c
--- linux-2.5.62/fs/aio.c Tue Feb 18 04:26:14 2003
+++ linux-2.5.62-aio/fs/aio.c Tue Mar 11 21:07:35 2003
@@ -395,6 +396,7 @@
req->ki_cancel = NULL;
req->ki_retry = NULL;
req->ki_user_obj = NULL;
+ INIT_LIST_HEAD(&req->ki_run_list);

/* Check if the completion queue has enough free space to
* accept an event from this io.
@@ -558,46 +560,121 @@
enter_lazy_tlb(mm, current, smp_processor_id());
}

-/* Run on kevent's context. FIXME: needs to be per-cpu and warn if an
- * operation blocks.
- */
-static void aio_kick_handler(void *data)
+static inline int __queue_kicked_iocb(struct kiocb *iocb)
{
- struct kioctx *ctx = data;
+ struct kioctx *ctx = iocb->ki_ctx;

- use_mm(ctx->mm);
+ if (list_empty(&iocb->ki_run_list)) {
+ list_add_tail(&iocb->ki_run_list,
+ &ctx->run_list);
+ return 1;
+ }
+ return 0;
+}

- spin_lock_irq(&ctx->ctx_lock);
- while (!list_empty(&ctx->run_list)) {
- struct kiocb *iocb;
- long ret;
+/* Expects to be called with iocb->ki_ctx->lock held */
+static ssize_t aio_run_iocb(struct kiocb *iocb)
+{
+ struct kioctx *ctx = iocb->ki_ctx;
+ ssize_t (*retry)(struct kiocb *);
+ ssize_t ret;

- iocb = list_entry(ctx->run_list.next, struct kiocb,
- ki_run_list);
- list_del(&iocb->ki_run_list);
- iocb->ki_users ++;
- spin_unlock_irq(&ctx->ctx_lock);
+ if (iocb->ki_retried++ > 1024*1024) {
+ printk("Maximal retry count. Bytes done %d\n",
+ iocb->ki_nbytes - iocb->ki_left);
+ return -EAGAIN;
+ }
+
+ if (!(iocb->ki_retried & 0xff)) {
+ printk("%ld aio retries completed %d bytes of %d\n",
+ iocb->ki_retried,
+ iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes);
+ }
+
+ if (!(retry = iocb->ki_retry))
+ return 0;
+
+ iocb->ki_users ++;
+ kiocbClearKicked(iocb);
+ iocb->ki_retry = NULL;
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ BUG_ON(current->iocb != NULL);
+
+ current->iocb = iocb;
+ ret = retry(iocb);
+ current->iocb = NULL;

- kiocbClearKicked(iocb);
- ret = iocb->ki_retry(iocb);
- if (-EIOCBQUEUED != ret) {
+ if (-EIOCBQUEUED != ret) {
+ if (list_empty(&iocb->ki_wait.task_list))
aio_complete(iocb, ret, 0);
- iocb = NULL;
- }
+ else
+ printk("can't delete iocb in use\n");
+ } else {
+ if (list_empty(&iocb->ki_wait.task_list))
+ kiocbSetKicked(iocb);
+ }
+ spin_lock_irq(&ctx->ctx_lock);

- spin_lock_irq(&ctx->ctx_lock);
- if (NULL != iocb)
- __aio_put_req(ctx, iocb);
+ iocb->ki_retry = retry;
+ INIT_LIST_HEAD(&iocb->ki_run_list);
+ if (kiocbIsKicked(iocb)) {
+ BUG_ON(ret != -EIOCBQUEUED);
+ __queue_kicked_iocb(iocb);
+ }
+ __aio_put_req(ctx, iocb);
+ return ret;
+}
+
+static void aio_run_iocbs(struct kioctx *ctx)
+{
+ struct kiocb *iocb;
+ ssize_t ret;
+
+ spin_lock_irq(&ctx->ctx_lock);
+ while (!list_empty(&ctx->run_list)) {
+ iocb = list_entry(ctx->run_list.next, struct kiocb,
+ ki_run_list);
+ list_del(&iocb->ki_run_list);
+ ret = aio_run_iocb(iocb);
}
spin_unlock_irq(&ctx->ctx_lock);
+}
+
+/* Run on aiod/kevent's context. FIXME: needs to be per-cpu and warn if an
+ * operation blocks.
+ */
+static void aio_kick_handler(void *data)
+{
+ struct kioctx *ctx = data;

+ use_mm(ctx->mm);
+ aio_run_iocbs(ctx);
unuse_mm(ctx->mm);
}

-void kick_iocb(struct kiocb *iocb)
+
+void queue_kicked_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
+ unsigned long flags;
+ int run = 0;
+
+ WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
+
+ spin_lock_irqsave(&ctx->ctx_lock, flags);
+ run = __queue_kicked_iocb(iocb);
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ if (run) {
+ if (waitqueue_active(&ctx->wait))
+ wake_up(&ctx->wait);
+ else
+ queue_work(aio_wq, &ctx->wq);
+ }
+}

+void kick_iocb(struct kiocb *iocb)
+{
/* sync iocbs are easy: they can only ever be executing from a
* single context. */
if (is_sync_kiocb(iocb)) {
@@ -607,11 +684,9 @@
}

if (!kiocbTryKick(iocb)) {
- unsigned long flags;
- spin_lock_irqsave(&ctx->ctx_lock, flags);
- list_add_tail(&iocb->ki_run_list, &ctx->run_list);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
- schedule_work(&ctx->wq);
+ queue_kicked_iocb(iocb);
+ } else {
+ pr_debug("iocb already kicked or in progress\n");
}
}

@@ -664,6 +739,9 @@
*/
spin_lock_irqsave(&ctx->ctx_lock, flags);

+ if (!list_empty(&iocb->ki_run_list))
+ list_del_init(&iocb->ki_run_list);
+
ring = kmap_atomic(info->ring_pages[0], KM_IRQ1);

tail = info->tail;
@@ -865,6 +943,8 @@
ret = 0;
if (to.timed_out) /* Only check after read evt */
break;
+ /* accelerate kicked iocbs for this ctx */
+ aio_run_iocbs(ctx);
schedule();
if (signal_pending(tsk)) {
ret = -EINTR;
@@ -984,6 +1064,149 @@
return -EINVAL;
}

+ssize_t aio_pread(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret = 0;
+
+ ret = file->f_op->aio_read(iocb, iocb->ki_buf,
+ iocb->ki_left, iocb->ki_pos);
+
+ pr_debug("aio_pread: fop ret %d\n", ret);
+
+ /*
+ * Can't just depend on iocb->ki_left to determine
+ * whether we are done. This may have been a short read.
+ */
+ if (ret > 0) {
+ iocb->ki_buf += ret;
+ iocb->ki_left -= ret;
+
+ ret = -EIOCBQUEUED;
+ }
+
+ /* This means we must have transferred all that we could */
+ /* No need to retry anymore */
+ if (ret == 0)
+ ret = iocb->ki_nbytes - iocb->ki_left;
+
+ return ret;
+}
+
+ssize_t aio_pwrite(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret = 0;
+
+ ret = file->f_op->aio_write(iocb, iocb->ki_buf,
+ iocb->ki_left, iocb->ki_pos);
+
+ pr_debug("aio_pread: fop ret %d\n", ret);
+
+ /*
+ * TBD: Even if iocb->ki_left = 0, could we need to
+ * wait for data to be sync'd ? Or can we assume
+ * that aio_fdsync/aio_fsync would be called explicitly
+ * as required.
+ */
+ if (ret > 0) {
+ iocb->ki_buf += ret;
+ iocb->ki_left -= ret;
+
+ ret = -EIOCBQUEUED;
+ }
+
+ /* This means we must have transferred all that we could */
+ /* No need to retry anymore */
+ if (ret == 0)
+ ret = iocb->ki_nbytes - iocb->ki_left;
+
+ return ret;
+}
+
+ssize_t aio_fdsync(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret = -EINVAL;
+
+ if (file->f_op->aio_fsync)
+ ret = file->f_op->aio_fsync(iocb, 1);
+ return ret;
+}
+
+ssize_t aio_fsync(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret = -EINVAL;
+
+ if (file->f_op->aio_fsync)
+ ret = file->f_op->aio_fsync(iocb, 0);
+ return ret;
+}
+
+/* Called during initial submission and subsequent retry operations */
+ssize_t aio_setup_iocb(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret = 0;
+
+ switch (iocb->ki_opcode) {
+ case IOCB_CMD_PREAD:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_READ)))
+ break;
+ ret = -EFAULT;
+ if (unlikely(!access_ok(VERIFY_WRITE, iocb->ki_buf,
+ iocb->ki_left)))
+ break;
+ ret = -EINVAL;
+ if (file->f_op->aio_read)
+ iocb->ki_retry = aio_pread;
+ break;
+ case IOCB_CMD_PWRITE:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_WRITE)))
+ break;
+ ret = -EFAULT;
+ if (unlikely(!access_ok(VERIFY_READ, iocb->ki_buf,
+ iocb->ki_left)))
+ break;
+ ret = -EINVAL;
+ if (file->f_op->aio_write)
+ iocb->ki_retry = aio_pwrite;
+ break;
+ case IOCB_CMD_FDSYNC:
+ ret = -EINVAL;
+ if (file->f_op->aio_fsync)
+ iocb->ki_retry = aio_fdsync;
+ break;
+ case IOCB_CMD_FSYNC:
+ ret = -EINVAL;
+ if (file->f_op->aio_fsync)
+ iocb->ki_retry = aio_fsync;
+ break;
+ default:
+ dprintk("EINVAL: io_submit: no operation provided\n");
+ ret = -EINVAL;
+ }
+
+ if (!iocb->ki_retry)
+ return ret;
+
+ pr_debug("ki_pos = %llu\n", iocb->ki_pos);
+
+ return 0;
+}
+
+int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync)
+{
+ struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
+
+ list_del_init(&wait->task_list);
+ kick_iocb(iocb);
+ return 1;
+}
+
static int FASTCALL(io_submit_one(struct kioctx *ctx, struct iocb *user_iocb,
struct iocb *iocb));
static int io_submit_one(struct kioctx *ctx, struct iocb *user_iocb,
@@ -992,7 +1215,6 @@
struct kiocb *req;
struct file *file;
ssize_t ret;
- char *buf;

/* enforce forwards compatibility on users */
if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 ||
@@ -1033,51 +1255,27 @@
req->ki_user_data = iocb->aio_data;
req->ki_pos = iocb->aio_offset;

- buf = (char *)(unsigned long)iocb->aio_buf;
+ req->ki_buf = (char *)(unsigned long)iocb->aio_buf;
+ req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
+ req->ki_opcode = iocb->aio_lio_opcode;
+ init_waitqueue_func_entry(&req->ki_wait, aio_wake_function);
+ INIT_LIST_HEAD(&req->ki_wait.task_list);
+ req->ki_run_list.next = req->ki_run_list.prev = NULL;
+ req->ki_retry = NULL;
+ req->ki_retried = 0;

- switch (iocb->aio_lio_opcode) {
- case IOCB_CMD_PREAD:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_READ)))
- goto out_put_req;
- ret = -EFAULT;
- if (unlikely(!access_ok(VERIFY_WRITE, buf, iocb->aio_nbytes)))
- goto out_put_req;
- ret = -EINVAL;
- if (file->f_op->aio_read)
- ret = file->f_op->aio_read(req, buf,
- iocb->aio_nbytes, req->ki_pos);
- break;
- case IOCB_CMD_PWRITE:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_WRITE)))
- goto out_put_req;
- ret = -EFAULT;
- if (unlikely(!access_ok(VERIFY_READ, buf, iocb->aio_nbytes)))
- goto out_put_req;
- ret = -EINVAL;
- if (file->f_op->aio_write)
- ret = file->f_op->aio_write(req, buf,
- iocb->aio_nbytes, req->ki_pos);
- break;
- case IOCB_CMD_FDSYNC:
- ret = -EINVAL;
- if (file->f_op->aio_fsync)
- ret = file->f_op->aio_fsync(req, 1);
- break;
- case IOCB_CMD_FSYNC:
- ret = -EINVAL;
- if (file->f_op->aio_fsync)
- ret = file->f_op->aio_fsync(req, 0);
- break;
- default:
- dprintk("EINVAL: io_submit: no operation provided\n");
- ret = -EINVAL;
- }
+ ret = aio_setup_iocb(req);
+
+ if ((-EBADF == ret) || (-EFAULT == ret))
+ goto out_put_req;
+
+ spin_lock_irq(&ctx->ctx_lock);
+ ret = aio_run_iocb(req);
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ if (-EIOCBQUEUED == ret)
+ queue_work(aio_wq, &ctx->wq);

- if (likely(-EIOCBQUEUED == ret))
- return 0;
- aio_complete(req, ret, 0);
return 0;

out_put_req:
diff -ur linux-2.5.62/include/linux/aio.h linux-2.5.62-aio/include/linux/aio.h
--- linux-2.5.62/include/linux/aio.h Tue Feb 18 04:25:50 2003
+++ linux-2.5.62-aio/include/linux/aio.h Tue Mar 11 21:31:22 2003
@@ -54,7 +54,7 @@
struct file *ki_filp;
struct kioctx *ki_ctx; /* may be NULL for sync ops */
int (*ki_cancel)(struct kiocb *, struct io_event *);
- long (*ki_retry)(struct kiocb *);
+ ssize_t (*ki_retry)(struct kiocb *);

struct list_head ki_list; /* the aio core uses this
* for cancellation */
@@ -62,6 +62,14 @@
void *ki_user_obj; /* pointer to userland's iocb */
__u64 ki_user_data; /* user's data for completion */
loff_t ki_pos;
+
+ /* State that we remember to be able to restart/retry */
+ unsigned short ki_opcode;
+ size_t ki_nbytes; /* copy of iocb->aio_nbytes */
+ char *ki_buf; /* remaining iocb->aio_buf */
+ size_t ki_left; /* remaining bytes */
+ wait_queue_t ki_wait;
+ long ki_retried; /* just for testing */

char private[KIOCB_PRIVATE_SIZE];
};
@@ -77,6 +85,8 @@
(x)->ki_ctx = &tsk->active_mm->default_kioctx; \
(x)->ki_cancel = NULL; \
(x)->ki_user_obj = tsk; \
+ (x)->ki_user_data = 0; \
+ init_wait((&(x)->ki_wait)); \
} while (0)

#define AIO_RING_MAGIC 0xa10a10a1
diff -ur linux-2.5.62/include/linux/init_task.h linux-2.5.62-aio/include/linux/init_task.h
--- linux-2.5.62/include/linux/init_task.h Tue Feb 18 04:25:53 2003
+++ linux-2.5.62-aio/include/linux/init_task.h Thu Feb 27 19:01:39 2003
@@ -101,6 +101,7 @@
.alloc_lock = SPIN_LOCK_UNLOCKED, \
.switch_lock = SPIN_LOCK_UNLOCKED, \
.journal_info = NULL, \
+ .iocb = NULL, \
}


diff -ur linux-2.5.62/include/linux/sched.h linux-2.5.62-aio/include/linux/sched.h
--- linux-2.5.62/include/linux/sched.h Tue Feb 18 04:25:53 2003
+++ linux-2.5.62-aio/include/linux/sched.h Thu Feb 27 19:01:39 2003
@@ -418,6 +418,8 @@

unsigned long ptrace_message;
siginfo_t *last_siginfo; /* For ptrace use. */
+/* current aio handle */
+ struct kiocb *iocb;
};

extern void __put_task_struct(struct task_struct *tsk);
diff -ur linux-2.5.62/kernel/fork.c linux-2.5.62-aio/kernel/fork.c
--- linux-2.5.62/kernel/fork.c Tue Feb 18 04:25:53 2003
+++ linux-2.5.62-aio/kernel/fork.c Tue Mar 4 14:58:44 2003
@@ -834,6 +838,7 @@
p->lock_depth = -1; /* -1 = no lock */
p->start_time = get_jiffies_64();
p->security = NULL;
+ p->iocb = NULL;

retval = -ENOMEM;
if (security_task_alloc(p))