2006-08-03 09:22:30

by Evgeniy Polyakov

[permalink] [raw]
Subject: [take3 0/4] kevent: Generic event handling mechanism.


Generic event handling mechanism.

I send this patchset for comments and review, it still contains AIO and
aio_sendfile() implementation on top of get_block() abstraction, which was
decided to postpone for a while (it is simpler right now to generate patchset as a whole,
when kevent will be ready for merge, I will generate patchset without AIO stuff).
It does not contain mapped buffer implementation, since it's design is not 100%
completed, I will present that implementation in the third patchset.

Changes from 'take2' patchset:
* split kevent_finish_user() to locked and unlocked variants
* do not use KEVENT_STAT ifdefs, use inline functions instead
* use array of callbacks of each type instead of each kevent callback initialization
* changed name of ukevent guarding lock
* use only one kevent lock in kevent_user for all hash buckets instead of per-bucket locks
* do not use kevent_user_ctl structure instead provide needed arguments as syscall parameters
* various indent cleanups
* mapped buffer (initial) implementation (no userspace yet)

Changes from 'take1' patchset:
- rebased against 2.6.18-git tree
- removed ioctl controlling
- added new syscall kevent_get_events(int fd, unsigned int min_nr, unsigned int max_nr,
unsigned int timeout, void __user *buf, unsigned flags)
- use old syscall kevent_ctl for creation/removing, modification and initial kevent
initialization
- use mutuxes instead of semaphores
- added file descriptor check and return error if provided descriptor does not match
kevent file operations
- various indent fixes
- removed aio_sendfile() declarations.

Thank you.

Signed-off-by: Evgeniy Polyakov <[email protected]>



2006-08-03 09:22:25

by Evgeniy Polyakov

[permalink] [raw]
Subject: [take3 3/4] kevent: Network AIO, socket notifications.


Network AIO, socket notifications.

This patchset includes socket notifications and network asynchronous IO.
Network AIO is based on kevent and works as usual kevent storage on top
of inode.

Signed-off-by: Evgeniy Polyakov <[email protected]>

diff --git a/include/asm-i386/socket.h b/include/asm-i386/socket.h
index 5755d57..9300678 100644
--- a/include/asm-i386/socket.h
+++ b/include/asm-i386/socket.h
@@ -50,4 +50,6 @@ #define SO_ACCEPTCONN 30
#define SO_PEERSEC 31
#define SO_PASSSEC 34

+#define SO_ASYNC_SOCK 35
+
#endif /* _ASM_SOCKET_H */
diff --git a/include/asm-x86_64/socket.h b/include/asm-x86_64/socket.h
index b467026..fc2b49d 100644
--- a/include/asm-x86_64/socket.h
+++ b/include/asm-x86_64/socket.h
@@ -50,4 +50,6 @@ #define SO_ACCEPTCONN 30
#define SO_PEERSEC 31
#define SO_PASSSEC 34

+#define SO_ASYNC_SOCK 35
+
#endif /* _ASM_SOCKET_H */
diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
index 4307e76..9267873 100644
--- a/include/linux/skbuff.h
+++ b/include/linux/skbuff.h
@@ -1283,6 +1283,8 @@ extern struct sk_buff *skb_recv_datagram
int noblock, int *err);
extern unsigned int datagram_poll(struct file *file, struct socket *sock,
struct poll_table_struct *wait);
+extern int skb_copy_datagram(const struct sk_buff *from,
+ int offset, void *dst, int size);
extern int skb_copy_datagram_iovec(const struct sk_buff *from,
int offset, struct iovec *to,
int size);
diff --git a/include/net/sock.h b/include/net/sock.h
index 324b3ea..c43a153 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -48,6 +48,7 @@ #include <linux/lockdep.h>
#include <linux/netdevice.h>
#include <linux/skbuff.h> /* struct sk_buff */
#include <linux/security.h>
+#include <linux/kevent.h>

#include <linux/filter.h>

@@ -391,6 +392,8 @@ enum sock_flags {
SOCK_RCVTSTAMP, /* %SO_TIMESTAMP setting */
SOCK_LOCALROUTE, /* route locally only, %SO_DONTROUTE setting */
SOCK_QUEUE_SHRUNK, /* write queue has been shrunk recently */
+ SOCK_ASYNC,
+ SOCK_ASYNC_INUSE,
};

static inline void sock_copy_flags(struct sock *nsk, struct sock *osk)
@@ -450,6 +453,21 @@ static inline int sk_stream_memory_free(

extern void sk_stream_rfree(struct sk_buff *skb);

+struct socket_alloc {
+ struct socket socket;
+ struct inode vfs_inode;
+};
+
+static inline struct socket *SOCKET_I(struct inode *inode)
+{
+ return &container_of(inode, struct socket_alloc, vfs_inode)->socket;
+}
+
+static inline struct inode *SOCK_INODE(struct socket *socket)
+{
+ return &container_of(socket, struct socket_alloc, socket)->vfs_inode;
+}
+
static inline void sk_stream_set_owner_r(struct sk_buff *skb, struct sock *sk)
{
skb->sk = sk;
@@ -477,6 +495,7 @@ static inline void sk_add_backlog(struct
sk->sk_backlog.tail = skb;
}
skb->next = NULL;
+ kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
}

#define sk_wait_event(__sk, __timeo, __condition) \
@@ -548,6 +567,12 @@ struct proto {

int (*backlog_rcv) (struct sock *sk,
struct sk_buff *skb);
+
+ int (*async_recv) (struct sock *sk,
+ void *dst, size_t size);
+ int (*async_send) (struct sock *sk,
+ struct page **pages, unsigned int poffset,
+ size_t size);

/* Keeping track of sk's, looking them up, and port selection methods. */
void (*hash)(struct sock *sk);
@@ -679,21 +704,6 @@ static inline struct kiocb *siocb_to_kio
return si->kiocb;
}

-struct socket_alloc {
- struct socket socket;
- struct inode vfs_inode;
-};
-
-static inline struct socket *SOCKET_I(struct inode *inode)
-{
- return &container_of(inode, struct socket_alloc, vfs_inode)->socket;
-}
-
-static inline struct inode *SOCK_INODE(struct socket *socket)
-{
- return &container_of(socket, struct socket_alloc, socket)->vfs_inode;
-}
-
extern void __sk_stream_mem_reclaim(struct sock *sk);
extern int sk_stream_mem_schedule(struct sock *sk, int size, int kind);

diff --git a/include/net/tcp.h b/include/net/tcp.h
index 0720bdd..5a1899b 100644
--- a/include/net/tcp.h
+++ b/include/net/tcp.h
@@ -364,6 +364,8 @@ extern int compat_tcp_setsockopt(struc
int level, int optname,
char __user *optval, int optlen);
extern void tcp_set_keepalive(struct sock *sk, int val);
+extern int tcp_async_recv(struct sock *sk, void *dst, size_t size);
+extern int tcp_async_send(struct sock *sk, struct page **pages, unsigned int poffset, size_t size);
extern int tcp_recvmsg(struct kiocb *iocb, struct sock *sk,
struct msghdr *msg,
size_t len, int nonblock,
@@ -857,6 +859,7 @@ static inline int tcp_prequeue(struct so
tp->ucopy.memory = 0;
} else if (skb_queue_len(&tp->ucopy.prequeue) == 1) {
wake_up_interruptible(sk->sk_sleep);
+ kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
if (!inet_csk_ack_scheduled(sk))
inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK,
(3 * TCP_RTO_MIN) / 4,
diff --git a/kernel/kevent/kevent_naio.c b/kernel/kevent/kevent_naio.c
new file mode 100644
index 0000000..71eb6a5
--- /dev/null
+++ b/kernel/kevent/kevent_naio.c
@@ -0,0 +1,242 @@
+/*
+ * kevent_naio.c
+ *
+ * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/file.h>
+#include <linux/pagemap.h>
+#include <linux/kevent.h>
+
+#include <net/sock.h>
+#include <net/tcp_states.h>
+
+static int kevent_naio_enqueue(struct kevent *k);
+static int kevent_naio_dequeue(struct kevent *k);
+static int kevent_naio_callback(struct kevent *k);
+
+static int kevent_naio_setup_aio(int ctl_fd, int s, void __user *buf,
+ size_t size, u32 event)
+{
+ struct kevent_user *u;
+ struct file *file;
+ int err, fput_needed;
+ struct ukevent uk;
+
+ file = fget_light(ctl_fd, &fput_needed);
+ if (!file)
+ return -ENODEV;
+
+ u = file->private_data;
+ if (!u) {
+ err = -EINVAL;
+ goto err_out_fput;
+ }
+
+ memset(&uk, 0, sizeof(struct ukevent));
+ uk.type = KEVENT_NAIO;
+ uk.ptr = buf;
+ uk.req_flags = KEVENT_REQ_ONESHOT;
+ uk.event = event;
+ uk.id.raw[0] = s;
+ uk.id.raw[1] = size;
+
+ err = kevent_user_add_ukevent(&uk, u);
+
+err_out_fput:
+ fput_light(file, fput_needed);
+ return err;
+}
+
+asmlinkage long sys_aio_recv(int ctl_fd, int s, void __user *buf,
+ size_t size, unsigned flags)
+{
+ return kevent_naio_setup_aio(ctl_fd, s, buf, size, KEVENT_SOCKET_RECV);
+}
+
+asmlinkage long sys_aio_send(int ctl_fd, int s, void __user *buf,
+ size_t size, unsigned flags)
+{
+ return kevent_naio_setup_aio(ctl_fd, s, buf, size, KEVENT_SOCKET_SEND);
+}
+
+static int kevent_naio_enqueue(struct kevent *k)
+{
+ int err, i;
+ struct page **page;
+ void *addr;
+ unsigned int size = k->event.id.raw[1];
+ int num = size/PAGE_SIZE;
+ struct file *file;
+ struct sock *sk = NULL;
+ int fput_needed;
+
+ file = fget_light(k->event.id.raw[0], &fput_needed);
+ if (!file)
+ return -ENODEV;
+
+ err = -EINVAL;
+ if (!file->f_dentry || !file->f_dentry->d_inode)
+ goto err_out_fput;
+
+ sk = SOCKET_I(file->f_dentry->d_inode)->sk;
+
+ err = -ESOCKTNOSUPPORT;
+ if (!sk || !sk->sk_prot->async_recv || !sk->sk_prot->async_send ||
+ !sock_flag(sk, SOCK_ASYNC))
+ goto err_out_fput;
+
+ addr = k->event.ptr;
+ if (((unsigned long)addr & PAGE_MASK) != (unsigned long)addr)
+ num++;
+
+ page = kmalloc(sizeof(struct page *) * num, GFP_KERNEL);
+ if (!page)
+ return -ENOMEM;
+
+ down_read(&current->mm->mmap_sem);
+ err = get_user_pages(current, current->mm, (unsigned long)addr,
+ num, 1, 0, page, NULL);
+ up_read(&current->mm->mmap_sem);
+ if (err <= 0)
+ goto err_out_free;
+ num = err;
+
+ k->event.ret_data[0] = num;
+ k->event.ret_data[1] = offset_in_page(k->event.ptr);
+ k->priv = page;
+
+ sk->sk_allocation = GFP_ATOMIC;
+
+ spin_lock_bh(&sk->sk_lock.slock);
+ err = kevent_socket_enqueue(k);
+ spin_unlock_bh(&sk->sk_lock.slock);
+ if (err)
+ goto err_out_put_pages;
+
+ fput_light(file, fput_needed);
+
+ return err;
+
+err_out_put_pages:
+ for (i=0; i<num; ++i)
+ page_cache_release(page[i]);
+err_out_free:
+ kfree(page);
+err_out_fput:
+ fput_light(file, fput_needed);
+
+ return err;
+}
+
+static int kevent_naio_dequeue(struct kevent *k)
+{
+ int err, i, num;
+ struct page **page = k->priv;
+
+ num = k->event.ret_data[0];
+
+ err = kevent_socket_dequeue(k);
+
+ for (i=0; i<num; ++i)
+ page_cache_release(page[i]);
+
+ kfree(k->priv);
+ k->priv = NULL;
+
+ return err;
+}
+
+static int kevent_naio_callback(struct kevent *k)
+{
+ struct inode *inode = k->st->origin;
+ struct sock *sk = SOCKET_I(inode)->sk;
+ unsigned int size = k->event.id.raw[1];
+ unsigned int off = k->event.ret_data[1];
+ struct page **pages = k->priv, *page;
+ int ready = 0, num = off/PAGE_SIZE, err = 0, send = 0;
+ void *ptr, *optr;
+ unsigned int len;
+
+ if (!sock_flag(sk, SOCK_ASYNC))
+ return -1;
+
+ if (k->event.event & KEVENT_SOCKET_SEND)
+ send = 1;
+ else if (!(k->event.event & KEVENT_SOCKET_RECV))
+ return -EINVAL;
+
+ /*
+ * sk_prot->async_*() can return either number of bytes processed,
+ * or negative error value, or zero if socket is closed.
+ */
+
+ if (!send) {
+ page = pages[num];
+
+ optr = ptr = kmap_atomic(page, KM_IRQ0);
+ if (!ptr)
+ return -ENOMEM;
+
+ ptr += off % PAGE_SIZE;
+ len = min_t(unsigned int, PAGE_SIZE - (ptr - optr), size);
+
+ err = sk->sk_prot->async_recv(sk, ptr, len);
+
+ kunmap_atomic(optr, KM_IRQ0);
+ } else {
+ len = size;
+ err = sk->sk_prot->async_send(sk, pages, off, size);
+ }
+
+ if (err > 0) {
+ num++;
+ size -= err;
+ off += err;
+ }
+
+ k->event.ret_data[1] = off;
+ k->event.id.raw[1] = size;
+
+ if (err == 0 || (err < 0 && err != -EAGAIN))
+ ready = -1;
+
+ if (!size)
+ ready = 1;
+#if 0
+ printk("%s: sk=%p, k=%p, size=%4u, off=%4u, err=%3d, ready=%1d.\n",
+ __func__, sk, k, size, off, err, ready);
+#endif
+
+ return ready;
+}
+
+static int __init kevent_init_naio(void)
+{
+ struct kevent_callbacks *nc = &kevent_registered_callbacks[KEVENT_NAIO];
+
+ nc->callback = &kevent_naio_enqueue;
+ nc->dequeue = &kevent_naio_dequeue;
+ nc->callback = &kevent_naio_callback;
+ return 0;
+}
+late_initcall(kevent_init_naio);
diff --git a/kernel/kevent/kevent_socket.c b/kernel/kevent/kevent_socket.c
new file mode 100644
index 0000000..20c9568
--- /dev/null
+++ b/kernel/kevent/kevent_socket.c
@@ -0,0 +1,128 @@
+/*
+ * kevent_socket.c
+ *
+ * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/timer.h>
+#include <linux/file.h>
+#include <linux/tcp.h>
+#include <linux/kevent.h>
+
+#include <net/sock.h>
+#include <net/request_sock.h>
+#include <net/inet_connection_sock.h>
+
+static int kevent_socket_callback(struct kevent *k)
+{
+ struct inode *inode = k->st->origin;
+ struct sock *sk = SOCKET_I(inode)->sk;
+ int rmem;
+
+ if (k->event.event & KEVENT_SOCKET_RECV) {
+ int ret = 0;
+
+ if ((rmem = atomic_read(&sk->sk_rmem_alloc)) > 0 ||
+ !skb_queue_empty(&sk->sk_receive_queue))
+ ret = 1;
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ ret = 1;
+ if (ret)
+ return ret;
+ }
+ if ((k->event.event & KEVENT_SOCKET_ACCEPT) &&
+ (!reqsk_queue_empty(&inet_csk(sk)->icsk_accept_queue) ||
+ reqsk_queue_len_young(&inet_csk(sk)->icsk_accept_queue))) {
+ k->event.ret_data[1] = reqsk_queue_len(&inet_csk(sk)->icsk_accept_queue);
+ return 1;
+ }
+
+ return 0;
+}
+
+int kevent_socket_enqueue(struct kevent *k)
+{
+ struct file *file;
+ struct inode *inode;
+ int err, fput_needed;
+
+ file = fget_light(k->event.id.raw[0], &fput_needed);
+ if (!file)
+ return -ENODEV;
+
+ err = -EINVAL;
+ if (!file->f_dentry || !file->f_dentry->d_inode)
+ goto err_out_fput;
+
+ inode = igrab(file->f_dentry->d_inode);
+ if (!inode)
+ goto err_out_fput;
+
+ err = kevent_storage_enqueue(&inode->st, k);
+ if (err)
+ goto err_out_iput;
+
+ err = k->callbacks.callback(k);
+ if (err)
+ goto err_out_dequeue;
+
+ fput_light(file, fput_needed);
+ return err;
+
+err_out_dequeue:
+ kevent_storage_dequeue(k->st, k);
+err_out_iput:
+ iput(inode);
+err_out_fput:
+ fput_light(file, fput_needed);
+ return err;
+}
+
+int kevent_socket_dequeue(struct kevent *k)
+{
+ struct inode *inode = k->st->origin;
+
+ kevent_storage_dequeue(k->st, k);
+ iput(inode);
+
+ return 0;
+}
+
+void kevent_socket_notify(struct sock *sk, u32 event)
+{
+ if (sk->sk_socket && !test_and_set_bit(SOCK_ASYNC_INUSE, &sk->sk_flags)) {
+ kevent_storage_ready(&SOCK_INODE(sk->sk_socket)->st, NULL, event);
+ sock_reset_flag(sk, SOCK_ASYNC_INUSE);
+ }
+}
+
+static int __init kevent_init_socket(void)
+{
+ struct kevent_callbacks *sc = &kevent_registered_callbacks[KEVENT_SOCKET];
+
+ sc->enqueue = &kevent_socket_enqueue;
+ sc->dequeue = &kevent_socket_dequeue;
+ sc->callback = &kevent_socket_callback;
+ return 0;
+}
+late_initcall(kevent_init_socket);
diff --git a/net/core/datagram.c b/net/core/datagram.c
index aecddcc..493245b 100644
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -236,6 +236,60 @@ void skb_kill_datagram(struct sock *sk,
EXPORT_SYMBOL(skb_kill_datagram);

/**
+ * skb_copy_datagram - Copy a datagram.
+ * @skb: buffer to copy
+ * @offset: offset in the buffer to start copying from
+ * @to: pointer to copy to
+ * @len: amount of data to copy from buffer to iovec
+ */
+int skb_copy_datagram(const struct sk_buff *skb, int offset,
+ void *to, int len)
+{
+ int i, fraglen, end = 0;
+ struct sk_buff *next = skb_shinfo(skb)->frag_list;
+
+ if (!len)
+ return 0;
+
+next_skb:
+ fraglen = skb_headlen(skb);
+ i = -1;
+
+ while (1) {
+ int start = end;
+
+ if ((end += fraglen) > offset) {
+ int copy = end - offset, o = offset - start;
+
+ if (copy > len)
+ copy = len;
+ if (i == -1)
+ memcpy(to, skb->data + o, copy);
+ else {
+ skb_frag_t *frag = &skb_shinfo(skb)->frags[i];
+ struct page *page = frag->page;
+ void *p = kmap(page) + frag->page_offset + o;
+ memcpy(to, p, copy);
+ kunmap(page);
+ }
+ if (!(len -= copy))
+ return 0;
+ offset += copy;
+ }
+ if (++i >= skb_shinfo(skb)->nr_frags)
+ break;
+ fraglen = skb_shinfo(skb)->frags[i].size;
+ }
+ if (next) {
+ skb = next;
+ BUG_ON(skb_shinfo(skb)->frag_list);
+ next = skb->next;
+ goto next_skb;
+ }
+ return -EFAULT;
+}
+
+/**
* skb_copy_datagram_iovec - Copy a datagram to an iovec.
* @skb: buffer to copy
* @offset: offset in the buffer to start copying from
@@ -530,6 +584,7 @@ unsigned int datagram_poll(struct file *

EXPORT_SYMBOL(datagram_poll);
EXPORT_SYMBOL(skb_copy_and_csum_datagram_iovec);
+EXPORT_SYMBOL(skb_copy_datagram);
EXPORT_SYMBOL(skb_copy_datagram_iovec);
EXPORT_SYMBOL(skb_free_datagram);
EXPORT_SYMBOL(skb_recv_datagram);
diff --git a/net/core/sock.c b/net/core/sock.c
index 51fcfbc..9922373 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -617,6 +617,16 @@ #endif
spin_unlock_bh(&sk->sk_lock.slock);
ret = -ENONET;
break;
+#ifdef CONFIG_KEVENT_SOCKET
+ case SO_ASYNC_SOCK:
+ spin_lock_bh(&sk->sk_lock.slock);
+ if (valbool)
+ sock_set_flag(sk, SOCK_ASYNC);
+ else
+ sock_reset_flag(sk, SOCK_ASYNC);
+ spin_unlock_bh(&sk->sk_lock.slock);
+ break;
+#endif

case SO_PASSSEC:
if (valbool)
@@ -1406,6 +1416,7 @@ static void sock_def_wakeup(struct sock
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
wake_up_interruptible_all(sk->sk_sleep);
read_unlock(&sk->sk_callback_lock);
+ kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
}

static void sock_def_error_report(struct sock *sk)
@@ -1415,6 +1426,7 @@ static void sock_def_error_report(struct
wake_up_interruptible(sk->sk_sleep);
sk_wake_async(sk,0,POLL_ERR);
read_unlock(&sk->sk_callback_lock);
+ kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
}

static void sock_def_readable(struct sock *sk, int len)
@@ -1424,6 +1436,7 @@ static void sock_def_readable(struct soc
wake_up_interruptible(sk->sk_sleep);
sk_wake_async(sk,1,POLL_IN);
read_unlock(&sk->sk_callback_lock);
+ kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
}

static void sock_def_write_space(struct sock *sk)
@@ -1443,6 +1456,7 @@ static void sock_def_write_space(struct
}

read_unlock(&sk->sk_callback_lock);
+ kevent_socket_notify(sk, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
}

static void sock_def_destruct(struct sock *sk)
@@ -1559,8 +1573,10 @@ void fastcall release_sock(struct sock *
if (sk->sk_backlog.tail)
__release_sock(sk);
sk->sk_lock.owner = NULL;
- if (waitqueue_active(&sk->sk_lock.wq))
+ if (waitqueue_active(&sk->sk_lock.wq)) {
wake_up(&sk->sk_lock.wq);
+ kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
+ }
spin_unlock_bh(&sk->sk_lock.slock);
}
EXPORT_SYMBOL(release_sock);
diff --git a/net/core/stream.c b/net/core/stream.c
index d1d7dec..2878c2a 100644
--- a/net/core/stream.c
+++ b/net/core/stream.c
@@ -36,6 +36,7 @@ void sk_stream_write_space(struct sock *
wake_up_interruptible(sk->sk_sleep);
if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
sock_wake_async(sock, 2, POLL_OUT);
+ kevent_socket_notify(sk, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
}
}

diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c
index f6a2d92..e878a41 100644
--- a/net/ipv4/tcp.c
+++ b/net/ipv4/tcp.c
@@ -206,6 +206,7 @@
* lingertime == 0 (RFC 793 ABORT Call)
* Hirokazu Takahashi : Use copy_from_user() instead of
* csum_and_copy_from_user() if possible.
+ * Evgeniy Polyakov : Network asynchronous IO.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@@ -1085,6 +1086,301 @@ int tcp_read_sock(struct sock *sk, read_
}

/*
+ * Must be called with locked sock.
+ */
+int tcp_async_send(struct sock *sk, struct page **pages, unsigned int poffset, size_t len)
+{
+ struct tcp_sock *tp = tcp_sk(sk);
+ int mss_now, size_goal;
+ int err = -EAGAIN;
+ ssize_t copied;
+
+ /* Wait for a connection to finish. */
+ if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
+ goto out_err;
+
+ clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+ mss_now = tcp_current_mss(sk, 1);
+ size_goal = tp->xmit_size_goal;
+ copied = 0;
+
+ err = -EPIPE;
+ if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN) || sock_flag(sk, SOCK_DONE) ||
+ (sk->sk_state == TCP_CLOSE) || (atomic_read(&sk->sk_refcnt) == 1))
+ goto do_error;
+
+ while (len > 0) {
+ struct sk_buff *skb = sk->sk_write_queue.prev;
+ struct page *page = pages[poffset / PAGE_SIZE];
+ int copy, i, can_coalesce;
+ int offset = poffset % PAGE_SIZE;
+ int size = min_t(size_t, len, PAGE_SIZE - offset);
+
+ if (!sk->sk_send_head || (copy = size_goal - skb->len) <= 0) {
+new_segment:
+ if (!sk_stream_memory_free(sk))
+ goto wait_for_sndbuf;
+
+ skb = sk_stream_alloc_pskb(sk, 0, 0,
+ sk->sk_allocation);
+ if (!skb)
+ goto wait_for_memory;
+
+ skb_entail(sk, tp, skb);
+ copy = size_goal;
+ }
+
+ if (copy > size)
+ copy = size;
+
+ i = skb_shinfo(skb)->nr_frags;
+ can_coalesce = skb_can_coalesce(skb, i, page, offset);
+ if (!can_coalesce && i >= MAX_SKB_FRAGS) {
+ tcp_mark_push(tp, skb);
+ goto new_segment;
+ }
+ if (!sk_stream_wmem_schedule(sk, copy))
+ goto wait_for_memory;
+
+ if (can_coalesce) {
+ skb_shinfo(skb)->frags[i - 1].size += copy;
+ } else {
+ get_page(page);
+ skb_fill_page_desc(skb, i, page, offset, copy);
+ }
+
+ skb->len += copy;
+ skb->data_len += copy;
+ skb->truesize += copy;
+ sk->sk_wmem_queued += copy;
+ sk->sk_forward_alloc -= copy;
+ skb->ip_summed = CHECKSUM_HW;
+ tp->write_seq += copy;
+ TCP_SKB_CB(skb)->end_seq += copy;
+ skb_shinfo(skb)->gso_segs = 0;
+
+ if (!copied)
+ TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
+
+ copied += copy;
+ poffset += copy;
+ if (!(len -= copy))
+ goto out;
+
+ if (skb->len < mss_now)
+ continue;
+
+ if (forced_push(tp)) {
+ tcp_mark_push(tp, skb);
+ __tcp_push_pending_frames(sk, tp, mss_now, TCP_NAGLE_PUSH);
+ } else if (skb == sk->sk_send_head)
+ tcp_push_one(sk, mss_now);
+ continue;
+
+wait_for_sndbuf:
+ set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+wait_for_memory:
+ if (copied)
+ tcp_push(sk, tp, 0, mss_now, TCP_NAGLE_PUSH);
+
+ err = -EAGAIN;
+ goto do_error;
+ }
+
+out:
+ if (copied)
+ tcp_push(sk, tp, 0, mss_now, tp->nonagle);
+ return copied;
+
+do_error:
+ if (copied)
+ goto out;
+out_err:
+ return sk_stream_error(sk, 0, err);
+}
+
+/*
+ * Must be called with locked sock.
+ */
+int tcp_async_recv(struct sock *sk, void *dst, size_t len)
+{
+ struct tcp_sock *tp = tcp_sk(sk);
+ int copied = 0;
+ u32 *seq;
+ unsigned long used;
+ int err;
+ int target; /* Read at least this many bytes */
+ int copied_early = 0;
+
+ TCP_CHECK_TIMER(sk);
+
+ err = -ENOTCONN;
+ if (sk->sk_state == TCP_LISTEN)
+ goto out;
+
+ seq = &tp->copied_seq;
+
+ target = sock_rcvlowat(sk, 0, len);
+
+ do {
+ struct sk_buff *skb;
+ u32 offset;
+
+ /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
+ if (tp->urg_data && tp->urg_seq == *seq) {
+ if (copied)
+ break;
+ }
+
+ /* Next get a buffer. */
+
+ skb = skb_peek(&sk->sk_receive_queue);
+ do {
+ if (!skb)
+ break;
+
+ /* Now that we have two receive queues this
+ * shouldn't happen.
+ */
+ if (before(*seq, TCP_SKB_CB(skb)->seq)) {
+ printk(KERN_INFO "async_recv bug: copied %X "
+ "seq %X\n", *seq, TCP_SKB_CB(skb)->seq);
+ break;
+ }
+ offset = *seq - TCP_SKB_CB(skb)->seq;
+ if (skb->h.th->syn)
+ offset--;
+ if (offset < skb->len)
+ goto found_ok_skb;
+ if (skb->h.th->fin)
+ goto found_fin_ok;
+ skb = skb->next;
+ } while (skb != (struct sk_buff *)&sk->sk_receive_queue);
+
+ if (copied)
+ break;
+
+ if (sock_flag(sk, SOCK_DONE))
+ break;
+
+ if (sk->sk_err) {
+ copied = sock_error(sk);
+ break;
+ }
+
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ break;
+
+ if (sk->sk_state == TCP_CLOSE) {
+ if (!sock_flag(sk, SOCK_DONE)) {
+ /* This occurs when user tries to read
+ * from never connected socket.
+ */
+ copied = -ENOTCONN;
+ break;
+ }
+ break;
+ }
+
+ copied = -EAGAIN;
+ break;
+
+ found_ok_skb:
+ /* Ok so how much can we use? */
+ used = skb->len - offset;
+ if (len < used)
+ used = len;
+
+ /* Do we have urgent data here? */
+ if (tp->urg_data) {
+ u32 urg_offset = tp->urg_seq - *seq;
+ if (urg_offset < used) {
+ if (!urg_offset) {
+ if (!sock_flag(sk, SOCK_URGINLINE)) {
+ ++*seq;
+ offset++;
+ used--;
+ if (!used)
+ goto skip_copy;
+ }
+ } else
+ used = urg_offset;
+ }
+ }
+#ifdef CONFIG_NET_DMA
+ if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)
+ tp->ucopy.dma_chan = get_softnet_dma();
+
+ if (tp->ucopy.dma_chan) {
+ tp->ucopy.dma_cookie = dma_skb_copy_datagram_iovec(
+ tp->ucopy.dma_chan, skb, offset,
+ msg->msg_iov, used,
+ tp->ucopy.pinned_list);
+
+ if (tp->ucopy.dma_cookie < 0) {
+
+ printk(KERN_ALERT "dma_cookie < 0\n");
+
+ /* Exception. Bailout! */
+ if (!copied)
+ copied = -EFAULT;
+ break;
+ }
+ if ((offset + used) == skb->len)
+ copied_early = 1;
+
+ } else
+#endif
+ {
+ err = skb_copy_datagram(skb, offset, dst, used);
+ if (err) {
+ /* Exception. Bailout! */
+ if (!copied)
+ copied = -EFAULT;
+ break;
+ }
+ }
+
+ *seq += used;
+ copied += used;
+ len -= used;
+ dst += used;
+
+ tcp_rcv_space_adjust(sk);
+
+skip_copy:
+ if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {
+ tp->urg_data = 0;
+ tcp_fast_path_check(sk, tp);
+ }
+ if (used + offset < skb->len)
+ continue;
+
+ if (skb->h.th->fin)
+ goto found_fin_ok;
+ sk_eat_skb(sk, skb, copied_early);
+ continue;
+
+ found_fin_ok:
+ /* Process the FIN. */
+ ++*seq;
+ sk_eat_skb(sk, skb, copied_early);
+ break;
+ } while (len > 0);
+
+ /* Clean up data we have read: This will do ACK frames. */
+ tcp_cleanup_rbuf(sk, copied);
+
+ TCP_CHECK_TIMER(sk);
+ return copied;
+
+out:
+ TCP_CHECK_TIMER(sk);
+ return err;
+}
+
+/*
* This routine copies from a sock struct into the user buffer.
*
* Technical note: in 2.3 we work on _locked_ socket, so that
@@ -2342,6 +2638,8 @@ EXPORT_SYMBOL(tcp_getsockopt);
EXPORT_SYMBOL(tcp_ioctl);
EXPORT_SYMBOL(tcp_poll);
EXPORT_SYMBOL(tcp_read_sock);
+EXPORT_SYMBOL(tcp_async_recv);
+EXPORT_SYMBOL(tcp_async_send);
EXPORT_SYMBOL(tcp_recvmsg);
EXPORT_SYMBOL(tcp_sendmsg);
EXPORT_SYMBOL(tcp_sendpage);
diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c
index 738dad9..f70d045 100644
--- a/net/ipv4/tcp_input.c
+++ b/net/ipv4/tcp_input.c
@@ -3112,6 +3112,7 @@ static void tcp_ofo_queue(struct sock *s

__skb_unlink(skb, &tp->out_of_order_queue);
__skb_queue_tail(&sk->sk_receive_queue, skb);
+ kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;
if(skb->h.th->fin)
tcp_fin(skb, sk, skb->h.th);
@@ -3955,7 +3956,8 @@ int tcp_rcv_established(struct sock *sk,
int copied_early = 0;

if (tp->copied_seq == tp->rcv_nxt &&
- len - tcp_header_len <= tp->ucopy.len) {
+ len - tcp_header_len <= tp->ucopy.len &&
+ !sock_async(sk)) {
#ifdef CONFIG_NET_DMA
if (tcp_dma_try_early_copy(sk, skb, tcp_header_len)) {
copied_early = 1;
diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c
index f6f39e8..ae4f23c 100644
--- a/net/ipv4/tcp_ipv4.c
+++ b/net/ipv4/tcp_ipv4.c
@@ -61,6 +61,7 @@ #include <linux/cache.h>
#include <linux/jhash.h>
#include <linux/init.h>
#include <linux/times.h>
+#include <linux/kevent.h>

#include <net/icmp.h>
#include <net/inet_hashtables.h>
@@ -868,6 +869,7 @@ #endif
reqsk_free(req);
} else {
inet_csk_reqsk_queue_hash_add(sk, req, TCP_TIMEOUT_INIT);
+ kevent_socket_notify(sk, KEVENT_SOCKET_ACCEPT);
}
return 0;

@@ -1108,24 +1110,30 @@ process:

skb->dev = NULL;

- bh_lock_sock_nested(sk);
ret = 0;
- if (!sock_owned_by_user(sk)) {
+ if (sock_async(sk)) {
+ spin_lock_bh(&sk->sk_lock.slock);
+ ret = tcp_v4_do_rcv(sk, skb);
+ spin_unlock_bh(&sk->sk_lock.slock);
+ } else {
+ bh_lock_sock_nested(sk);
+ if (!sock_owned_by_user(sk)) {
#ifdef CONFIG_NET_DMA
- struct tcp_sock *tp = tcp_sk(sk);
- if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)
- tp->ucopy.dma_chan = get_softnet_dma();
- if (tp->ucopy.dma_chan)
- ret = tcp_v4_do_rcv(sk, skb);
- else
+ struct tcp_sock *tp = tcp_sk(sk);
+ if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)
+ tp->ucopy.dma_chan = get_softnet_dma();
+ if (tp->ucopy.dma_chan)
+ ret = tcp_v4_do_rcv(sk, skb);
+ else
#endif
- {
- if (!tcp_prequeue(sk, skb))
- ret = tcp_v4_do_rcv(sk, skb);
- }
- } else
- sk_add_backlog(sk, skb);
- bh_unlock_sock(sk);
+ {
+ if (!tcp_prequeue(sk, skb))
+ ret = tcp_v4_do_rcv(sk, skb);
+ }
+ } else
+ sk_add_backlog(sk, skb);
+ bh_unlock_sock(sk);
+ }

sock_put(sk);

@@ -1849,6 +1857,8 @@ struct proto tcp_prot = {
.getsockopt = tcp_getsockopt,
.sendmsg = tcp_sendmsg,
.recvmsg = tcp_recvmsg,
+ .async_recv = tcp_async_recv,
+ .async_send = tcp_async_send,
.backlog_rcv = tcp_v4_do_rcv,
.hash = tcp_v4_hash,
.unhash = tcp_unhash,
diff --git a/net/ipv6/tcp_ipv6.c b/net/ipv6/tcp_ipv6.c
index 923989d..a5d3ac8 100644
--- a/net/ipv6/tcp_ipv6.c
+++ b/net/ipv6/tcp_ipv6.c
@@ -1230,22 +1230,28 @@ process:

skb->dev = NULL;

- bh_lock_sock(sk);
ret = 0;
- if (!sock_owned_by_user(sk)) {
+ if (sock_async(sk)) {
+ spin_lock_bh(&sk->sk_lock.slock);
+ ret = tcp_v4_do_rcv(sk, skb);
+ spin_unlock_bh(&sk->sk_lock.slock);
+ } else {
+ bh_lock_sock(sk);
+ if (!sock_owned_by_user(sk)) {
#ifdef CONFIG_NET_DMA
- struct tcp_sock *tp = tcp_sk(sk);
- if (tp->ucopy.dma_chan)
- ret = tcp_v6_do_rcv(sk, skb);
- else
-#endif
- {
- if (!tcp_prequeue(sk, skb))
+ struct tcp_sock *tp = tcp_sk(sk);
+ if (tp->ucopy.dma_chan)
ret = tcp_v6_do_rcv(sk, skb);
- }
- } else
- sk_add_backlog(sk, skb);
- bh_unlock_sock(sk);
+ else
+#endif
+ {
+ if (!tcp_prequeue(sk, skb))
+ ret = tcp_v6_do_rcv(sk, skb);
+ }
+ } else
+ sk_add_backlog(sk, skb);
+ bh_unlock_sock(sk);
+ }

sock_put(sk);
return ret ? -1 : 0;
@@ -1596,6 +1602,8 @@ struct proto tcpv6_prot = {
.getsockopt = tcp_getsockopt,
.sendmsg = tcp_sendmsg,
.recvmsg = tcp_recvmsg,
+ .async_recv = tcp_async_recv,
+ .async_send = tcp_async_send,
.backlog_rcv = tcp_v6_do_rcv,
.hash = tcp_v6_hash,
.unhash = tcp_unhash,

2006-08-03 09:22:50

by Evgeniy Polyakov

[permalink] [raw]
Subject: [take3 4/4] kevent: poll/select() notifications. Timer notifications.


poll/select() notifications. Timer notifications.

This patch includes generic poll/select and timer notifications.

kevent_poll works simialr to epoll and has the same issues (callback
is invoked not from internal state machine of the caller, but through
process awake).

Timer notifications can be used for fine grained per-process time
management, since iteractive timers are very inconveniently to use,
and they are limited.

Signed-off-by: Evgeniy Polyakov <[email protected]>

diff --git a/kernel/kevent/kevent_poll.c b/kernel/kevent/kevent_poll.c
new file mode 100644
index 0000000..46cc8f0
--- /dev/null
+++ b/kernel/kevent/kevent_poll.c
@@ -0,0 +1,217 @@
+/*
+ * kevent_poll.c
+ *
+ * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/timer.h>
+#include <linux/file.h>
+#include <linux/kevent.h>
+#include <linux/poll.h>
+#include <linux/fs.h>
+
+static kmem_cache_t *kevent_poll_container_cache;
+static kmem_cache_t *kevent_poll_priv_cache;
+
+struct kevent_poll_ctl
+{
+ struct poll_table_struct pt;
+ struct kevent *k;
+};
+
+struct kevent_poll_wait_container
+{
+ struct list_head container_entry;
+ wait_queue_head_t *whead;
+ wait_queue_t wait;
+ struct kevent *k;
+};
+
+struct kevent_poll_private
+{
+ struct list_head container_list;
+ spinlock_t container_lock;
+};
+
+static int kevent_poll_enqueue(struct kevent *k);
+static int kevent_poll_dequeue(struct kevent *k);
+static int kevent_poll_callback(struct kevent *k);
+
+static int kevent_poll_wait_callback(wait_queue_t *wait,
+ unsigned mode, int sync, void *key)
+{
+ struct kevent_poll_wait_container *cont =
+ container_of(wait, struct kevent_poll_wait_container, wait);
+ struct kevent *k = cont->k;
+ struct file *file = k->st->origin;
+ unsigned long flags;
+ u32 revents, event;
+
+ revents = file->f_op->poll(file, NULL);
+ spin_lock_irqsave(&k->ulock, flags);
+ event = k->event.event;
+ spin_unlock_irqrestore(&k->ulock, flags);
+
+ kevent_storage_ready(k->st, NULL, revents);
+
+ return 0;
+}
+
+static void kevent_poll_qproc(struct file *file, wait_queue_head_t *whead,
+ struct poll_table_struct *poll_table)
+{
+ struct kevent *k =
+ container_of(poll_table, struct kevent_poll_ctl, pt)->k;
+ struct kevent_poll_private *priv = k->priv;
+ struct kevent_poll_wait_container *cont;
+ unsigned long flags;
+
+ cont = kmem_cache_alloc(kevent_poll_container_cache, SLAB_KERNEL);
+ if (!cont) {
+ kevent_break(k);
+ return;
+ }
+
+ cont->k = k;
+ init_waitqueue_func_entry(&cont->wait, kevent_poll_wait_callback);
+ cont->whead = whead;
+
+ spin_lock_irqsave(&priv->container_lock, flags);
+ list_add_tail(&cont->container_entry, &priv->container_list);
+ spin_unlock_irqrestore(&priv->container_lock, flags);
+
+ add_wait_queue(whead, &cont->wait);
+}
+
+static int kevent_poll_enqueue(struct kevent *k)
+{
+ struct file *file;
+ int err, ready = 0;
+ unsigned int revents;
+ struct kevent_poll_ctl ctl;
+ struct kevent_poll_private *priv;
+
+ file = fget(k->event.id.raw[0]);
+ if (!file)
+ return -ENODEV;
+
+ err = -EINVAL;
+ if (!file->f_op || !file->f_op->poll)
+ goto err_out_fput;
+
+ err = -ENOMEM;
+ priv = kmem_cache_alloc(kevent_poll_priv_cache, SLAB_KERNEL);
+ if (!priv)
+ goto err_out_fput;
+
+ spin_lock_init(&priv->container_lock);
+ INIT_LIST_HEAD(&priv->container_list);
+
+ k->priv = priv;
+
+ ctl.k = k;
+ init_poll_funcptr(&ctl.pt, &kevent_poll_qproc);
+
+ err = kevent_storage_enqueue(&file->st, k);
+ if (err)
+ goto err_out_free;
+
+ revents = file->f_op->poll(file, &ctl.pt);
+ if (revents & k->event.event) {
+ ready = 1;
+ kevent_poll_dequeue(k);
+ }
+
+ return ready;
+
+err_out_free:
+ kmem_cache_free(kevent_poll_priv_cache, priv);
+err_out_fput:
+ fput(file);
+ return err;
+}
+
+static int kevent_poll_dequeue(struct kevent *k)
+{
+ struct file *file = k->st->origin;
+ struct kevent_poll_private *priv = k->priv;
+ struct kevent_poll_wait_container *w, *n;
+ unsigned long flags;
+
+ kevent_storage_dequeue(k->st, k);
+
+ spin_lock_irqsave(&priv->container_lock, flags);
+ list_for_each_entry_safe(w, n, &priv->container_list, container_entry) {
+ list_del(&w->container_entry);
+ remove_wait_queue(w->whead, &w->wait);
+ kmem_cache_free(kevent_poll_container_cache, w);
+ }
+ spin_unlock_irqrestore(&priv->container_lock, flags);
+
+ kmem_cache_free(kevent_poll_priv_cache, priv);
+ k->priv = NULL;
+
+ fput(file);
+
+ return 0;
+}
+
+static int kevent_poll_callback(struct kevent *k)
+{
+ struct file *file = k->st->origin;
+ unsigned int revents = file->f_op->poll(file, NULL);
+ return (revents & k->event.event);
+}
+
+static int __init kevent_poll_sys_init(void)
+{
+ struct kevent_callbacks *pc = &kevent_registered_callbacks[KEVENT_POLL];
+
+ kevent_poll_container_cache = kmem_cache_create("kevent_poll_container_cache",
+ sizeof(struct kevent_poll_wait_container), 0, 0, NULL, NULL);
+ if (!kevent_poll_container_cache) {
+ printk(KERN_ERR "Failed to create kevent poll container cache.\n");
+ return -ENOMEM;
+ }
+
+ kevent_poll_priv_cache = kmem_cache_create("kevent_poll_priv_cache",
+ sizeof(struct kevent_poll_private), 0, 0, NULL, NULL);
+ if (!kevent_poll_priv_cache) {
+ printk(KERN_ERR "Failed to create kevent poll private data cache.\n");
+ kmem_cache_destroy(kevent_poll_container_cache);
+ kevent_poll_container_cache = NULL;
+ return -ENOMEM;
+ }
+
+ pc->enqueue = &kevent_poll_enqueue;
+ pc->dequeue = &kevent_poll_dequeue;
+ pc->callback = &kevent_poll_callback;
+
+ printk(KERN_INFO "Kevent poll()/select() subsystem has been initialized.\n");
+ return 0;
+}
+
+static void __exit kevent_poll_sys_fini(void)
+{
+ kmem_cache_destroy(kevent_poll_priv_cache);
+ kmem_cache_destroy(kevent_poll_container_cache);
+}
+
+module_init(kevent_poll_sys_init);
+module_exit(kevent_poll_sys_fini);
diff --git a/kernel/kevent/kevent_timer.c b/kernel/kevent/kevent_timer.c
new file mode 100644
index 0000000..9063cec
--- /dev/null
+++ b/kernel/kevent/kevent_timer.c
@@ -0,0 +1,116 @@
+/*
+ * kevent_timer.c
+ *
+ * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/timer.h>
+#include <linux/jiffies.h>
+#include <linux/kevent.h>
+
+static void kevent_timer_func(unsigned long data)
+{
+ struct kevent *k = (struct kevent *)data;
+ struct timer_list *t = k->st->origin;
+
+ kevent_storage_ready(k->st, NULL, KEVENT_MASK_ALL);
+ mod_timer(t, jiffies + msecs_to_jiffies(k->event.id.raw[0]));
+}
+
+static int kevent_timer_enqueue(struct kevent *k)
+{
+ struct timer_list *t;
+ struct kevent_storage *st;
+ int err;
+
+ t = kmalloc(sizeof(struct timer_list) + sizeof(struct kevent_storage),
+ GFP_KERNEL);
+ if (!t)
+ return -ENOMEM;
+
+ init_timer(t);
+ t->function = kevent_timer_func;
+ t->expires = jiffies + msecs_to_jiffies(k->event.id.raw[0]);
+ t->data = (unsigned long)k;
+
+ st = (struct kevent_storage *)(t+1);
+ err = kevent_storage_init(t, st);
+ if (err)
+ goto err_out_free;
+
+ err = kevent_storage_enqueue(st, k);
+ if (err)
+ goto err_out_st_fini;
+
+ add_timer(t);
+
+ return 0;
+
+err_out_st_fini:
+ kevent_storage_fini(st);
+err_out_free:
+ kfree(t);
+
+ return err;
+}
+
+static int kevent_timer_dequeue(struct kevent *k)
+{
+ struct kevent_storage *st = k->st;
+ struct timer_list *t = st->origin;
+
+ if (!t)
+ return -ENODEV;
+
+ del_timer_sync(t);
+
+ kevent_storage_dequeue(st, k);
+
+ kfree(t);
+
+ return 0;
+}
+
+static int kevent_timer_callback(struct kevent *k)
+{
+ struct kevent_storage *st = k->st;
+ struct timer_list *t = st->origin;
+
+ if (!t)
+ return -ENODEV;
+
+ k->event.ret_data[0] = (__u32)jiffies;
+ return 1;
+}
+
+static int __init kevent_init_timer(void)
+{
+ struct kevent_callbacks *tc = &kevent_registered_callbacks[KEVENT_TIMER];
+
+ tc->enqueue = &kevent_timer_enqueue;
+ tc->dequeue = &kevent_timer_dequeue;
+ tc->callback = &kevent_timer_callback;
+
+ return 0;
+}
+late_initcall(kevent_init_timer);

2006-08-03 09:23:33

by Evgeniy Polyakov

[permalink] [raw]
Subject: [take3 1/4] kevent: Core files.


Core files.

This patch includes core kevent files:
- userspace controlling
- kernelspace interfaces
- initialization
- notification state machines

It might also inlclude parts from other subsystem (like network related
syscalls, so it is possible that it will not compile without other
patches applied).

Signed-off-by: Evgeniy Polyakov <[email protected]>

diff --git a/arch/i386/kernel/syscall_table.S b/arch/i386/kernel/syscall_table.S
index dd63d47..0af988a 100644
--- a/arch/i386/kernel/syscall_table.S
+++ b/arch/i386/kernel/syscall_table.S
@@ -317,3 +317,7 @@ ENTRY(sys_call_table)
.long sys_tee /* 315 */
.long sys_vmsplice
.long sys_move_pages
+ .long sys_aio_recv
+ .long sys_aio_send
+ .long sys_kevent_get_events
+ .long sys_kevent_ctl
diff --git a/arch/x86_64/ia32/ia32entry.S b/arch/x86_64/ia32/ia32entry.S
index 5d4a7d1..e157ad4 100644
--- a/arch/x86_64/ia32/ia32entry.S
+++ b/arch/x86_64/ia32/ia32entry.S
@@ -713,4 +713,8 @@ #endif
.quad sys_tee
.quad compat_sys_vmsplice
.quad compat_sys_move_pages
+ .quad sys_aio_recv
+ .quad sys_aio_send
+ .quad sys_kevent_get_events
+ .quad sys_kevent_ctl
ia32_syscall_end:
diff --git a/include/asm-i386/unistd.h b/include/asm-i386/unistd.h
index fc1c8dd..a76e50d 100644
--- a/include/asm-i386/unistd.h
+++ b/include/asm-i386/unistd.h
@@ -323,10 +323,14 @@ #define __NR_sync_file_range 314
#define __NR_tee 315
#define __NR_vmsplice 316
#define __NR_move_pages 317
+#define __NR_aio_recv 318
+#define __NR_aio_send 319
+#define __NR_kevent_get_events 320
+#define __NR_kevent_ctl 321

#ifdef __KERNEL__

-#define NR_syscalls 318
+#define NR_syscalls 322

/*
* user-visible error numbers are in the range -1 - -128: see
diff --git a/include/asm-x86_64/unistd.h b/include/asm-x86_64/unistd.h
index 94387c9..9a0b581 100644
--- a/include/asm-x86_64/unistd.h
+++ b/include/asm-x86_64/unistd.h
@@ -619,10 +619,18 @@ #define __NR_vmsplice 278
__SYSCALL(__NR_vmsplice, sys_vmsplice)
#define __NR_move_pages 279
__SYSCALL(__NR_move_pages, sys_move_pages)
+#define __NR_aio_recv 280
+__SYSCALL(__NR_aio_recv, sys_aio_recv)
+#define __NR_aio_send 281
+__SYSCALL(__NR_aio_send, sys_aio_send)
+#define __NR_kevent_get_events 282
+__SYSCALL(__NR_kevent_get_events, sys_kevent_get_events)
+#define __NR_kevent_ctl 283
+__SYSCALL(__NR_kevent_ctl, sys_kevent_ctl)

#ifdef __KERNEL__

-#define __NR_syscall_max __NR_move_pages
+#define __NR_syscall_max __NR_kevent_ctl

#ifndef __NO_STUBS

diff --git a/include/linux/kevent.h b/include/linux/kevent.h
new file mode 100644
index 0000000..cb09726
--- /dev/null
+++ b/include/linux/kevent.h
@@ -0,0 +1,277 @@
+/*
+ * kevent.h
+ *
+ * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#ifndef __KEVENT_H
+#define __KEVENT_H
+
+/*
+ * Kevent request flags.
+ */
+
+#define KEVENT_REQ_ONESHOT 0x1 /* Process this event only once and then dequeue. */
+
+/*
+ * Kevent return flags.
+ */
+#define KEVENT_RET_BROKEN 0x1 /* Kevent is broken. */
+#define KEVENT_RET_DONE 0x2 /* Kevent processing was finished successfully. */
+
+/*
+ * Kevent type set.
+ */
+#define KEVENT_SOCKET 0
+#define KEVENT_INODE 1
+#define KEVENT_TIMER 2
+#define KEVENT_POLL 3
+#define KEVENT_NAIO 4
+#define KEVENT_AIO 5
+#define KEVENT_MAX 6
+
+/*
+ * Per-type event sets.
+ * Number of per-event sets should be exactly as number of kevent types.
+ */
+
+/*
+ * Timer events.
+ */
+#define KEVENT_TIMER_FIRED 0x1
+
+/*
+ * Socket/network asynchronous IO events.
+ */
+#define KEVENT_SOCKET_RECV 0x1
+#define KEVENT_SOCKET_ACCEPT 0x2
+#define KEVENT_SOCKET_SEND 0x4
+
+/*
+ * Inode events.
+ */
+#define KEVENT_INODE_CREATE 0x1
+#define KEVENT_INODE_REMOVE 0x2
+
+/*
+ * Poll events.
+ */
+#define KEVENT_POLL_POLLIN 0x0001
+#define KEVENT_POLL_POLLPRI 0x0002
+#define KEVENT_POLL_POLLOUT 0x0004
+#define KEVENT_POLL_POLLERR 0x0008
+#define KEVENT_POLL_POLLHUP 0x0010
+#define KEVENT_POLL_POLLNVAL 0x0020
+
+#define KEVENT_POLL_POLLRDNORM 0x0040
+#define KEVENT_POLL_POLLRDBAND 0x0080
+#define KEVENT_POLL_POLLWRNORM 0x0100
+#define KEVENT_POLL_POLLWRBAND 0x0200
+#define KEVENT_POLL_POLLMSG 0x0400
+#define KEVENT_POLL_POLLREMOVE 0x1000
+
+/*
+ * Asynchronous IO events.
+ */
+#define KEVENT_AIO_BIO 0x1
+
+#define KEVENT_MASK_ALL 0xffffffff /* Mask of all possible event values. */
+#define KEVENT_MASK_EMPTY 0x0 /* Empty mask of ready events. */
+
+struct kevent_id
+{
+ __u32 raw[2];
+};
+
+struct ukevent
+{
+ struct kevent_id id; /* Id of this request, e.g. socket number, file descriptor and so on... */
+ __u32 type; /* Event type, e.g. KEVENT_SOCK, KEVENT_INODE, KEVENT_TIMER and so on... */
+ __u32 event; /* Event itself, e.g. SOCK_ACCEPT, INODE_CREATED, TIMER_FIRED... */
+ __u32 req_flags; /* Per-event request flags */
+ __u32 ret_flags; /* Per-event return flags */
+ __u32 ret_data[2]; /* Event return data. Event originator fills it with anything it likes. */
+ union {
+ __u32 user[2]; /* User's data. It is not used, just copied to/from user. */
+ void *ptr;
+ };
+};
+
+#define KEVENT_CTL_ADD 0
+#define KEVENT_CTL_REMOVE 1
+#define KEVENT_CTL_MODIFY 2
+#define KEVENT_CTL_INIT 3
+
+#ifdef __KERNEL__
+
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/mutex.h>
+#include <linux/wait.h>
+#include <linux/kevent_storage.h>
+
+#define KEVENT_MAX_EVENTS 4096
+#define KEVENT_MIN_BUFFS_ALLOC 3
+
+struct inode;
+struct dentry;
+struct sock;
+
+struct kevent;
+struct kevent_storage;
+typedef int (* kevent_callback_t)(struct kevent *);
+
+/* @callback is called each time new event has been caught. */
+/* @enqueue is called each time new event is queued. */
+/* @dequeue is called each time event is dequeued. */
+
+struct kevent_callbacks {
+ kevent_callback_t callback, enqueue, dequeue;
+};
+
+struct kevent
+{
+ struct ukevent event;
+ spinlock_t ulock; /* This lock protects ukevent manipulations, e.g. ret_flags changes. */
+
+ struct list_head kevent_entry; /* Entry of user's queue. */
+ struct list_head storage_entry; /* Entry of origin's queue. */
+ struct list_head ready_entry; /* Entry of user's ready. */
+
+ struct kevent_user *user; /* User who requested this kevent. */
+ struct kevent_storage *st; /* Kevent container. */
+
+ struct kevent_callbacks callbacks;
+
+ void *priv; /* Private data for different storages.
+ * poll()/select storage has a list of wait_queue_t containers
+ * for each ->poll() { poll_wait()' } here.
+ */
+};
+
+extern struct kevent_callbacks kevent_registered_callbacks[];
+
+#define KEVENT_HASH_MASK 0xff
+
+struct kevent_user
+{
+ struct list_head kevent_list[KEVENT_HASH_MASK+1];
+ spinlock_t kevent_lock;
+ unsigned int kevent_num; /* Number of queued kevents. */
+
+ struct list_head ready_list; /* List of ready kevents. */
+ unsigned int ready_num; /* Number of ready kevents. */
+ spinlock_t ready_lock; /* Protects all manipulations with ready queue. */
+
+ unsigned int max_ready_num; /* Requested number of kevents. */
+
+ struct mutex ctl_mutex; /* Protects against simultaneous kevent_user control manipulations. */
+ struct mutex wait_mutex; /* Protects against simultaneous kevent_user waits. */
+ wait_queue_head_t wait; /* Wait until some events are ready. */
+
+ atomic_t refcnt; /* Reference counter, increased for each new kevent. */
+
+ unsigned long *pring; /* Array of pages forming mapped ring buffer */
+
+#ifdef CONFIG_KEVENT_USER_STAT
+ unsigned long im_num;
+ unsigned long wait_num;
+ unsigned long total;
+#endif
+};
+
+struct kevent *kevent_alloc(gfp_t mask);
+void kevent_free(struct kevent *k);
+int kevent_enqueue(struct kevent *k);
+int kevent_dequeue(struct kevent *k);
+int kevent_init(struct kevent *k);
+void kevent_requeue(struct kevent *k);
+int kevent_break(struct kevent *k);
+
+void kevent_user_ring_add_event(struct kevent *k);
+
+void kevent_storage_ready(struct kevent_storage *st,
+ kevent_callback_t ready_callback, u32 event);
+int kevent_storage_init(void *origin, struct kevent_storage *st);
+void kevent_storage_fini(struct kevent_storage *st);
+int kevent_storage_enqueue(struct kevent_storage *st, struct kevent *k);
+void kevent_storage_dequeue(struct kevent_storage *st, struct kevent *k);
+
+int kevent_user_add_ukevent(struct ukevent *uk, struct kevent_user *u);
+
+#ifdef CONFIG_KEVENT_INODE
+void kevent_inode_notify(struct inode *inode, u32 event);
+void kevent_inode_notify_parent(struct dentry *dentry, u32 event);
+void kevent_inode_remove(struct inode *inode);
+#else
+static inline void kevent_inode_notify(struct inode *inode, u32 event)
+{
+}
+static inline void kevent_inode_notify_parent(struct dentry *dentry, u32 event)
+{
+}
+static inline void kevent_inode_remove(struct inode *inode)
+{
+}
+#endif /* CONFIG_KEVENT_INODE */
+#ifdef CONFIG_KEVENT_SOCKET
+
+void kevent_socket_notify(struct sock *sock, u32 event);
+int kevent_socket_dequeue(struct kevent *k);
+int kevent_socket_enqueue(struct kevent *k);
+#define sock_async(__sk) sock_flag(__sk, SOCK_ASYNC)
+#else
+static inline void kevent_socket_notify(struct sock *sock, u32 event)
+{
+}
+#define sock_async(__sk) ({ (void)__sk; 0; })
+#endif
+
+#ifdef CONFIG_KEVENT_USER_STAT
+static inline void kevent_user_stat_init(struct kevent_user *u)
+{
+ u->wait_num = u->im_num = u->total = 0;
+}
+static inline void kevent_user_stat_print(struct kevent_user *u)
+{
+ pr_debug("%s: u=%p, wait=%lu, immediately=%lu, total=%lu.\n",
+ __func__, u, u->wait_num, u->im_num, u->total);
+}
+static inline void kevent_user_stat_increase_im(struct kevent_user *u)
+{
+ u->im_num++;
+}
+static inline void kevent_user_stat_increase_wait(struct kevent_user *u)
+{
+ u->wait_num++;
+}
+static inline void kevent_user_stat_increase_total(struct kevent_user *u)
+{
+ u->total++;
+}
+#else
+#define kevent_user_stat_print(u) ({ (void) u;})
+#define kevent_user_stat_init(u) ({ (void) u;})
+#define kevent_user_stat_increase_im(u) ({ (void) u;})
+#define kevent_user_stat_increase_wait(u) ({ (void) u;})
+#define kevent_user_stat_increase_total(u) ({ (void) u;})
+#endif
+
+#endif /* __KERNEL__ */
+#endif /* __KEVENT_H */
diff --git a/include/linux/kevent_storage.h b/include/linux/kevent_storage.h
new file mode 100644
index 0000000..bd891f0
--- /dev/null
+++ b/include/linux/kevent_storage.h
@@ -0,0 +1,12 @@
+#ifndef __KEVENT_STORAGE_H
+#define __KEVENT_STORAGE_H
+
+struct kevent_storage
+{
+ void *origin; /* Originator's pointer, e.g. struct sock or struct file. Can be NULL. */
+ struct list_head list; /* List of queued kevents. */
+ unsigned int qlen; /* Number of queued kevents. */
+ spinlock_t lock; /* Protects users queue. */
+};
+
+#endif /* __KEVENT_STORAGE_H */
diff --git a/include/linux/syscalls.h b/include/linux/syscalls.h
index 008f04c..143f3b5 100644
--- a/include/linux/syscalls.h
+++ b/include/linux/syscalls.h
@@ -597,4 +597,9 @@ asmlinkage long sys_get_robust_list(int
asmlinkage long sys_set_robust_list(struct robust_list_head __user *head,
size_t len);

+asmlinkage long sys_aio_recv(int ctl_fd, int s, void __user *buf, size_t size, unsigned flags);
+asmlinkage long sys_aio_send(int ctl_fd, int s, void __user *buf, size_t size, unsigned flags);
+asmlinkage long sys_kevent_get_events(int ctl_fd, unsigned int min, unsigned int max,
+ unsigned int timeout, void __user *buf, unsigned flags);
+asmlinkage long sys_kevent_ctl(int ctl_fd, unsigned int cmd, unsigned int num, void __user *buf);
#endif
diff --git a/init/Kconfig b/init/Kconfig
index a099fc6..c550fcc 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -218,6 +218,8 @@ config AUDITSYSCALL
such as SELinux. To use audit's filesystem watch feature, please
ensure that INOTIFY is configured.

+source "kernel/kevent/Kconfig"
+
config IKCONFIG
bool "Kernel .config support"
---help---
diff --git a/kernel/Makefile b/kernel/Makefile
index d62ec66..2d7a6dd 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -47,6 +47,7 @@ obj-$(CONFIG_DETECT_SOFTLOCKUP) += softl
obj-$(CONFIG_GENERIC_HARDIRQS) += irq/
obj-$(CONFIG_SECCOMP) += seccomp.o
obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
+obj-$(CONFIG_KEVENT) += kevent/
obj-$(CONFIG_RELAY) += relay.o
obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o
obj-$(CONFIG_TASKSTATS) += taskstats.o
diff --git a/kernel/kevent/Kconfig b/kernel/kevent/Kconfig
new file mode 100644
index 0000000..88b35af
--- /dev/null
+++ b/kernel/kevent/Kconfig
@@ -0,0 +1,57 @@
+config KEVENT
+ bool "Kernel event notification mechanism"
+ help
+ This option enables event queue mechanism.
+ It can be used as replacement for poll()/select(), AIO callback invocations,
+ advanced timer notifications and other kernel object status changes.
+
+config KEVENT_USER_STAT
+ bool "Kevent user statistic"
+ depends on KEVENT
+ default N
+ help
+ This option will turn kevent_user statistic collection on.
+ Statistic data includes total number of kevent, number of kevents which are ready
+ immediately at insertion time and number of kevents which were removed through
+ readiness completion. It will be printed each time control kevent descriptor
+ is closed.
+
+config KEVENT_SOCKET
+ bool "Kernel event notifications for sockets"
+ depends on NET && KEVENT
+ help
+ This option enables notifications through KEVENT subsystem of
+ sockets operations, like new packet receiving conditions, ready for accept
+ conditions and so on.
+
+config KEVENT_INODE
+ bool "Kernel event notifications for inodes"
+ depends on KEVENT
+ help
+ This option enables notifications through KEVENT subsystem of
+ inode operations, like file creation, removal and so on.
+
+config KEVENT_TIMER
+ bool "Kernel event notifications for timers"
+ depends on KEVENT
+ help
+ This option allows to use timers through KEVENT subsystem.
+
+config KEVENT_POLL
+ bool "Kernel event notifications for poll()/select()"
+ depends on KEVENT
+ help
+ This option allows to use kevent subsystem for poll()/select() notifications.
+
+config KEVENT_NAIO
+ bool "Network asynchronous IO"
+ depends on KEVENT && KEVENT_SOCKET
+ help
+ This option enables kevent based network asynchronous IO subsystem.
+
+config KEVENT_AIO
+ bool "Asynchronous IO"
+ depends on KEVENT
+ help
+ This option allows to use kevent subsystem for AIO operations.
+ AIO read is currently supported.
diff --git a/kernel/kevent/Makefile b/kernel/kevent/Makefile
new file mode 100644
index 0000000..d1ef9ba
--- /dev/null
+++ b/kernel/kevent/Makefile
@@ -0,0 +1,7 @@
+obj-y := kevent.o kevent_user.o
+obj-$(CONFIG_KEVENT_SOCKET) += kevent_socket.o
+obj-$(CONFIG_KEVENT_INODE) += kevent_inode.o
+obj-$(CONFIG_KEVENT_TIMER) += kevent_timer.o
+obj-$(CONFIG_KEVENT_POLL) += kevent_poll.o
+obj-$(CONFIG_KEVENT_NAIO) += kevent_naio.o
+obj-$(CONFIG_KEVENT_AIO) += kevent_aio.o
diff --git a/kernel/kevent/kevent.c b/kernel/kevent/kevent.c
new file mode 100644
index 0000000..8c71ca9
--- /dev/null
+++ b/kernel/kevent/kevent.c
@@ -0,0 +1,248 @@
+/*
+ * kevent.c
+ *
+ * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/mempool.h>
+#include <linux/sched.h>
+#include <linux/wait.h>
+#include <linux/kevent.h>
+
+static kmem_cache_t *kevent_cache;
+
+/*
+ * Attempts to add an event into appropriate origin's queue.
+ * Returns positive value if this event is ready immediately,
+ * negative value in case of error and zero if event has been queued.
+ * ->enqueue() callback must increase origin's reference counter.
+ */
+int kevent_enqueue(struct kevent *k)
+{
+ if (k->event.type >= KEVENT_MAX)
+ return -E2BIG;
+
+ if (!k->callbacks.enqueue) {
+ kevent_break(k);
+ return -EINVAL;
+ }
+
+ return k->callbacks.enqueue(k);
+}
+
+/*
+ * Remove event from the appropriate queue.
+ * ->dequeue() callback must decrease origin's reference counter.
+ */
+int kevent_dequeue(struct kevent *k)
+{
+ if (k->event.type >= KEVENT_MAX)
+ return -E2BIG;
+
+ if (!k->callbacks.dequeue) {
+ kevent_break(k);
+ return -EINVAL;
+ }
+
+ return k->callbacks.dequeue(k);
+}
+
+int kevent_break(struct kevent *k)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&k->ulock, flags);
+ k->event.ret_flags |= KEVENT_RET_BROKEN;
+ spin_unlock_irqrestore(&k->ulock, flags);
+ return 0;
+}
+
+struct kevent_callbacks kevent_registered_callbacks[KEVENT_MAX];
+
+/*
+ * Must be called before event is going to be added into some origin's queue.
+ * Initializes ->enqueue(), ->dequeue() and ->callback() callbacks.
+ * If failed, kevent should not be used or kevent_enqueue() will fail to add
+ * this kevent into origin's queue with setting
+ * KEVENT_RET_BROKEN flag in kevent->event.ret_flags.
+ */
+int kevent_init(struct kevent *k)
+{
+ spin_lock_init(&k->ulock);
+ k->kevent_entry.next = LIST_POISON1;
+ k->storage_entry.next = LIST_POISON1;
+ k->ready_entry.next = LIST_POISON1;
+
+ if (k->event.type >= KEVENT_MAX)
+ return -E2BIG;
+
+ k->callbacks = kevent_registered_callbacks[k->event.type];
+ if (!k->callbacks.callback) {
+ kevent_break(k);
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+/*
+ * Called from ->enqueue() callback when reference counter for given
+ * origin (socket, inode...) has been increased.
+ */
+int kevent_storage_enqueue(struct kevent_storage *st, struct kevent *k)
+{
+ unsigned long flags;
+
+ k->st = st;
+ spin_lock_irqsave(&st->lock, flags);
+ list_add_tail(&k->storage_entry, &st->list);
+ st->qlen++;
+ spin_unlock_irqrestore(&st->lock, flags);
+ return 0;
+}
+
+/*
+ * Dequeue kevent from origin's queue.
+ * It does not decrease origin's reference counter in any way
+ * and must be called before it, so storage itself must be valid.
+ * It is called from ->dequeue() callback.
+ */
+void kevent_storage_dequeue(struct kevent_storage *st, struct kevent *k)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&st->lock, flags);
+ if (k->storage_entry.next != LIST_POISON1) {
+ list_del(&k->storage_entry);
+ st->qlen--;
+ }
+ spin_unlock_irqrestore(&st->lock, flags);
+}
+
+static void __kevent_requeue(struct kevent *k, u32 event)
+{
+ int err, rem = 0;
+ unsigned long flags;
+
+ err = k->callbacks.callback(k);
+
+ spin_lock_irqsave(&k->ulock, flags);
+ if (err > 0) {
+ k->event.ret_flags |= KEVENT_RET_DONE;
+ } else if (err < 0) {
+ k->event.ret_flags |= KEVENT_RET_BROKEN;
+ k->event.ret_flags |= KEVENT_RET_DONE;
+ }
+ rem = (k->event.req_flags & KEVENT_REQ_ONESHOT);
+ if (!err)
+ err = (k->event.ret_flags & (KEVENT_RET_BROKEN|KEVENT_RET_DONE));
+ spin_unlock_irqrestore(&k->ulock, flags);
+
+ if (err) {
+ if (rem) {
+ list_del(&k->storage_entry);
+ k->st->qlen--;
+ }
+
+ spin_lock_irqsave(&k->user->ready_lock, flags);
+ if (k->ready_entry.next == LIST_POISON1) {
+ kevent_user_ring_add_event(k);
+ list_add_tail(&k->ready_entry, &k->user->ready_list);
+ k->user->ready_num++;
+ }
+ spin_unlock_irqrestore(&k->user->ready_lock, flags);
+ wake_up(&k->user->wait);
+ }
+}
+
+void kevent_requeue(struct kevent *k)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&k->st->lock, flags);
+ __kevent_requeue(k, 0);
+ spin_unlock_irqrestore(&k->st->lock, flags);
+}
+
+/*
+ * Called each time some activity in origin (socket, inode...) is noticed.
+ */
+void kevent_storage_ready(struct kevent_storage *st,
+ kevent_callback_t ready_callback, u32 event)
+{
+ struct kevent *k, *n;
+
+ spin_lock(&st->lock);
+ list_for_each_entry_safe(k, n, &st->list, storage_entry) {
+ if (ready_callback)
+ ready_callback(k);
+
+ if (event & k->event.event)
+ __kevent_requeue(k, event);
+ }
+ spin_unlock(&st->lock);
+}
+
+int kevent_storage_init(void *origin, struct kevent_storage *st)
+{
+ spin_lock_init(&st->lock);
+ st->origin = origin;
+ st->qlen = 0;
+ INIT_LIST_HEAD(&st->list);
+ return 0;
+}
+
+void kevent_storage_fini(struct kevent_storage *st)
+{
+ kevent_storage_ready(st, kevent_break, KEVENT_MASK_ALL);
+}
+
+struct kevent *kevent_alloc(gfp_t mask)
+{
+ return kmem_cache_alloc(kevent_cache, mask);
+}
+
+void kevent_free(struct kevent *k)
+{
+ kmem_cache_free(kevent_cache, k);
+}
+
+static int __init kevent_sys_init(void)
+{
+ int i;
+
+ kevent_cache = kmem_cache_create("kevent_cache",
+ sizeof(struct kevent), 0, 0, NULL, NULL);
+ if (!kevent_cache)
+ panic("kevent: Unable to create a cache.\n");
+
+ for (i=0; i<ARRAY_SIZE(kevent_registered_callbacks); ++i) {
+ struct kevent_callbacks *c = &kevent_registered_callbacks[i];
+
+ c->callback = c->enqueue = c->dequeue = NULL;
+ }
+
+ return 0;
+}
+
+late_initcall(kevent_sys_init);
diff --git a/kernel/kevent/kevent_user.c b/kernel/kevent/kevent_user.c
new file mode 100644
index 0000000..87ac367
--- /dev/null
+++ b/kernel/kevent/kevent_user.c
@@ -0,0 +1,876 @@
+/*
+ * kevent_user.c
+ *
+ * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/mount.h>
+#include <linux/device.h>
+#include <linux/poll.h>
+#include <linux/kevent.h>
+#include <linux/jhash.h>
+#include <asm/io.h>
+
+static struct class *kevent_user_class;
+static char kevent_name[] = "kevent";
+static int kevent_user_major;
+
+static int kevent_user_open(struct inode *, struct file *);
+static int kevent_user_release(struct inode *, struct file *);
+static unsigned int kevent_user_poll(struct file *, struct poll_table_struct *);
+static int kevnet_user_mmap(struct file *, struct vm_area_struct *);
+
+static struct file_operations kevent_user_fops = {
+ .mmap = kevnet_user_mmap,
+ .open = kevent_user_open,
+ .release = kevent_user_release,
+ .poll = kevent_user_poll,
+ .owner = THIS_MODULE,
+};
+
+static int kevent_get_sb(struct file_system_type *fs_type,
+ int flags, const char *dev_name, void *data, struct vfsmount *mnt)
+{
+ /* So original magic... */
+ return get_sb_pseudo(fs_type, kevent_name, NULL, 0xabcdef, mnt);
+}
+
+static struct file_system_type kevent_fs_type = {
+ .name = kevent_name,
+ .get_sb = kevent_get_sb,
+ .kill_sb = kill_anon_super,
+};
+
+static struct vfsmount *kevent_mnt;
+
+static unsigned int kevent_user_poll(struct file *file, struct poll_table_struct *wait)
+{
+ struct kevent_user *u = file->private_data;
+ unsigned int mask;
+
+ poll_wait(file, &u->wait, wait);
+ mask = 0;
+
+ if (u->ready_num)
+ mask |= POLLIN | POLLRDNORM;
+
+ return mask;
+}
+
+static inline void kevent_user_ring_set(struct kevent_user *u, unsigned int num)
+{
+ unsigned int *idx;
+
+ idx = (unsigned int *)u->pring[0];
+ idx[0] = num;
+}
+
+/*
+ * Note that kevents does not exactly fill the page (each ukevent is 40 bytes),
+ * so we reuse 4 bytes at the begining of the first page to store index.
+ * Take that into account if you want to change size of struct ukevent.
+ */
+#define KEVENTS_ON_PAGE PAGE_SIZE/sizeof(struct ukevent)
+
+/*
+ * Called under kevent_user->ready_lock, so updates are always protected.
+ */
+void kevent_user_ring_add_event(struct kevent *k)
+{
+ unsigned int *idx_ptr, idx, pidx, off;
+ struct ukevent *ukev;
+
+ idx_ptr = (unsigned int *)k->user->pring[0];
+ idx = idx_ptr[0];
+
+ pidx = idx/KEVENTS_ON_PAGE;
+ off = idx%KEVENTS_ON_PAGE;
+
+ if (pidx == 0)
+ ukev = (struct ukevent *)(k->user->pring[pidx] + sizeof(unsigned int));
+ else
+ ukev = (struct ukevent *)(k->user->pring[pidx]);
+
+ memcpy(&ukev[off], &k->event, sizeof(struct ukevent));
+
+ idx++;
+ if (idx >= KEVENT_MAX_EVENTS)
+ idx = 0;
+
+ idx_ptr[0] = idx;
+}
+
+static int kevent_user_ring_init(struct kevent_user *u)
+{
+ int i, pnum;
+
+ pnum = ALIGN(KEVENT_MAX_EVENTS*sizeof(struct ukevent) + sizeof(unsigned int), PAGE_SIZE)/PAGE_SIZE;
+
+ u->pring = kmalloc(pnum * sizeof(unsigned long), GFP_KERNEL);
+ if (!u->pring)
+ return -ENOMEM;
+
+ for (i=0; i<pnum; ++i) {
+ u->pring[i] = __get_free_page(GFP_KERNEL);
+ if (!u->pring)
+ break;
+ }
+
+ if (i != pnum) {
+ pnum = i;
+ goto err_out_free;
+ }
+
+ kevent_user_ring_set(u, 0);
+
+ return 0;
+
+err_out_free:
+ for (i=0; i<pnum; ++i)
+ free_page(u->pring[i]);
+
+ kfree(u->pring);
+
+ return -ENOMEM;
+}
+
+static void kevent_user_ring_fini(struct kevent_user *u)
+{
+ int i, pnum;
+
+ pnum = ALIGN(KEVENT_MAX_EVENTS*sizeof(struct ukevent) + sizeof(unsigned int), PAGE_SIZE)/PAGE_SIZE;
+
+ for (i=0; i<pnum; ++i)
+ free_page(u->pring[i]);
+
+ kfree(u->pring);
+}
+
+static struct kevent_user *kevent_user_alloc(void)
+{
+ struct kevent_user *u;
+ int i;
+
+ u = kzalloc(sizeof(struct kevent_user), GFP_KERNEL);
+ if (!u)
+ return NULL;
+
+ INIT_LIST_HEAD(&u->ready_list);
+ spin_lock_init(&u->ready_lock);
+ u->ready_num = 0;
+ kevent_user_stat_init(u);
+ spin_lock_init(&u->kevent_lock);
+ for (i=0; i<ARRAY_SIZE(u->kevent_list); ++i)
+ INIT_LIST_HEAD(&u->kevent_list[i]);
+ u->kevent_num = 0;
+
+ mutex_init(&u->ctl_mutex);
+ mutex_init(&u->wait_mutex);
+ init_waitqueue_head(&u->wait);
+ u->max_ready_num = 0;
+
+ atomic_set(&u->refcnt, 1);
+
+ if (kevent_user_ring_init(u)) {
+ kfree(u);
+ u = NULL;
+ }
+
+ return u;
+}
+
+static int kevent_user_open(struct inode *inode, struct file *file)
+{
+ struct kevent_user *u = kevent_user_alloc();
+
+ if (!u)
+ return -ENOMEM;
+
+ file->private_data = u;
+
+ return 0;
+}
+
+static inline void kevent_user_get(struct kevent_user *u)
+{
+ atomic_inc(&u->refcnt);
+}
+
+static inline void kevent_user_put(struct kevent_user *u)
+{
+ if (atomic_dec_and_test(&u->refcnt)) {
+ kevent_user_stat_print(u);
+ kevent_user_ring_fini(u);
+ kfree(u);
+ }
+}
+
+static int kevnet_user_mmap(struct file *file, struct vm_area_struct *vma)
+{
+ size_t size = vma->vm_end - vma->vm_start, psize;
+ int pnum = size/PAGE_SIZE, i;
+ unsigned long start = vma->vm_start;
+ struct kevent_user *u = file->private_data;
+
+ psize = ALIGN(KEVENT_MAX_EVENTS*sizeof(struct ukevent) + sizeof(unsigned int), PAGE_SIZE);
+
+ if (size + vma->vm_pgoff*PAGE_SIZE != psize)
+ return -EINVAL;
+
+ if (vma->vm_flags & VM_WRITE)
+ return -EPERM;
+
+ vma->vm_page_prot = pgprot_noncached(vma->vm_page_prot);
+
+ for (i=0; i<pnum; ++i) {
+ if (remap_pfn_range(vma, start, virt_to_phys((void *)u->pring[i+vma->vm_pgoff]), PAGE_SIZE,
+ vma->vm_page_prot))
+ return -EAGAIN;
+ start += PAGE_SIZE;
+ }
+
+ return 0;
+}
+
+#if 0
+static inline unsigned int kevent_user_hash(struct ukevent *uk)
+{
+ unsigned int h = (uk->user[0] ^ uk->user[1]) ^ (uk->id.raw[0] ^ uk->id.raw[1]);
+
+ h = (((h >> 16) & 0xffff) ^ (h & 0xffff)) & 0xffff;
+ h = (((h >> 8) & 0xff) ^ (h & 0xff)) & KEVENT_HASH_MASK;
+
+ return h;
+}
+#else
+static inline unsigned int kevent_user_hash(struct ukevent *uk)
+{
+ return jhash_1word(uk->id.raw[0], 0) & KEVENT_HASH_MASK;
+}
+#endif
+
+static void kevent_finish_user_complete(struct kevent *k, int deq)
+{
+ struct kevent_user *u = k->user;
+ unsigned long flags;
+
+ if (deq)
+ kevent_dequeue(k);
+
+ spin_lock_irqsave(&u->ready_lock, flags);
+ if (k->ready_entry.next != LIST_POISON1) {
+ list_del(&k->ready_entry);
+ u->ready_num--;
+ }
+ spin_unlock_irqrestore(&u->ready_lock, flags);
+
+ kevent_user_put(u);
+ kevent_free(k);
+}
+
+static void __kevent_finish_user(struct kevent *k, int deq)
+{
+ struct kevent_user *u = k->user;
+
+ list_del(&k->kevent_entry);
+ u->kevent_num--;
+ kevent_finish_user_complete(k, deq);
+}
+
+/*
+ * Remove kevent from user's list of all events,
+ * dequeue it from storage and decrease user's reference counter,
+ * since this kevent does not exist anymore. That is why it is freed here.
+ */
+static void kevent_finish_user(struct kevent *k, int deq)
+{
+ struct kevent_user *u = k->user;
+ unsigned long flags;
+
+ spin_lock_irqsave(&u->kevent_lock, flags);
+ list_del(&k->kevent_entry);
+ u->kevent_num--;
+ spin_unlock_irqrestore(&u->kevent_lock, flags);
+ kevent_finish_user_complete(k, deq);
+}
+
+/*
+ * Dequeue one entry from user's ready queue.
+ */
+
+static struct kevent *kqueue_dequeue_ready(struct kevent_user *u)
+{
+ unsigned long flags;
+ struct kevent *k = NULL;
+
+ spin_lock_irqsave(&u->ready_lock, flags);
+ if (u->ready_num && !list_empty(&u->ready_list)) {
+ k = list_entry(u->ready_list.next, struct kevent, ready_entry);
+ list_del(&k->ready_entry);
+ u->ready_num--;
+ }
+ spin_unlock_irqrestore(&u->ready_lock, flags);
+
+ return k;
+}
+
+static struct kevent *__kevent_search(struct list_head *head, struct ukevent *uk,
+ struct kevent_user *u)
+{
+ struct kevent *k;
+ int found = 0;
+
+ list_for_each_entry(k, head, kevent_entry) {
+ spin_lock(&k->ulock);
+ if (k->event.user[0] == uk->user[0] && k->event.user[1] == uk->user[1] &&
+ k->event.id.raw[0] == uk->id.raw[0] &&
+ k->event.id.raw[1] == uk->id.raw[1]) {
+ found = 1;
+ spin_unlock(&k->ulock);
+ break;
+ }
+ spin_unlock(&k->ulock);
+ }
+
+ return (found)?k:NULL;
+}
+
+static int kevent_modify(struct ukevent *uk, struct kevent_user *u)
+{
+ struct kevent *k;
+ unsigned int hash = kevent_user_hash(uk);
+ int err = -ENODEV;
+ unsigned long flags;
+
+ spin_lock_irqsave(&u->kevent_lock, flags);
+ k = __kevent_search(&u->kevent_list[hash], uk, u);
+ if (k) {
+ spin_lock(&k->ulock);
+ k->event.event = uk->event;
+ k->event.req_flags = uk->req_flags;
+ k->event.ret_flags = 0;
+ spin_unlock(&k->ulock);
+ kevent_requeue(k);
+ err = 0;
+ }
+ spin_unlock_irqrestore(&u->kevent_lock, flags);
+
+ return err;
+}
+
+static int kevent_remove(struct ukevent *uk, struct kevent_user *u)
+{
+ int err = -ENODEV;
+ struct kevent *k;
+ unsigned int hash = kevent_user_hash(uk);
+ unsigned long flags;
+
+ spin_lock_irqsave(&u->kevent_lock, flags);
+ k = __kevent_search(&u->kevent_list[hash], uk, u);
+ if (k) {
+ __kevent_finish_user(k, 1);
+ err = 0;
+ }
+ spin_unlock_irqrestore(&u->kevent_lock, flags);
+
+ return err;
+}
+
+/*
+ * No new entry can be added or removed from any list at this point.
+ * It is not permitted to call ->ioctl() and ->release() in parallel.
+ */
+static int kevent_user_release(struct inode *inode, struct file *file)
+{
+ struct kevent_user *u = file->private_data;
+ struct kevent *k, *n;
+ int i;
+
+ for (i=0; i<KEVENT_HASH_MASK+1; ++i) {
+ list_for_each_entry_safe(k, n, &u->kevent_list[i], kevent_entry)
+ kevent_finish_user(k, 1);
+ }
+
+ kevent_user_put(u);
+ file->private_data = NULL;
+
+ return 0;
+}
+
+static struct ukevent *kevent_get_user(unsigned int num, void __user *arg)
+{
+ struct ukevent *ukev;
+
+ ukev = kmalloc(sizeof(struct ukevent) * num, GFP_KERNEL);
+ if (!ukev)
+ return NULL;
+
+ if (copy_from_user(arg, ukev, sizeof(struct ukevent) * num)) {
+ kfree(ukev);
+ return NULL;
+ }
+
+ return ukev;
+}
+
+static int kevent_user_ctl_modify(struct kevent_user *u, unsigned int num, void __user *arg)
+{
+ int err = 0, i;
+ struct ukevent uk;
+
+ mutex_lock(&u->ctl_mutex);
+
+ if (num > KEVENT_MIN_BUFFS_ALLOC) {
+ struct ukevent *ukev;
+
+ ukev = kevent_get_user(num, arg);
+ if (ukev) {
+ for (i=0; i<num; ++i) {
+ if (kevent_modify(&ukev[i], u))
+ ukev[i].ret_flags |= KEVENT_RET_BROKEN;
+ ukev[i].ret_flags |= KEVENT_RET_DONE;
+ }
+ if (copy_to_user(arg, ukev, num*sizeof(struct ukevent)))
+ err = -EINVAL;
+ kfree(ukev);
+ goto out;
+ }
+ }
+
+ for (i=0; i<num; ++i) {
+ if (copy_from_user(&uk, arg, sizeof(struct ukevent))) {
+ err = -EINVAL;
+ break;
+ }
+
+ if (kevent_modify(&uk, u))
+ uk.ret_flags |= KEVENT_RET_BROKEN;
+ uk.ret_flags |= KEVENT_RET_DONE;
+
+ if (copy_to_user(arg, &uk, sizeof(struct ukevent))) {
+ err = -EINVAL;
+ break;
+ }
+
+ arg += sizeof(struct ukevent);
+ }
+out:
+ mutex_unlock(&u->ctl_mutex);
+
+ return err;
+}
+
+static int kevent_user_ctl_remove(struct kevent_user *u, unsigned int num, void __user *arg)
+{
+ int err = 0, i;
+ struct ukevent uk;
+
+ mutex_lock(&u->ctl_mutex);
+
+ if (num > KEVENT_MIN_BUFFS_ALLOC) {
+ struct ukevent *ukev;
+
+ ukev = kevent_get_user(num, arg);
+ if (ukev) {
+ for (i=0; i<num; ++i) {
+ if (kevent_remove(&ukev[i], u))
+ ukev[i].ret_flags |= KEVENT_RET_BROKEN;
+ ukev[i].ret_flags |= KEVENT_RET_DONE;
+ }
+ if (copy_to_user(arg, ukev, num*sizeof(struct ukevent)))
+ err = -EINVAL;
+ kfree(ukev);
+ goto out;
+ }
+ }
+
+ for (i=0; i<num; ++i) {
+ if (copy_from_user(&uk, arg, sizeof(struct ukevent))) {
+ err = -EINVAL;
+ break;
+ }
+
+ if (kevent_remove(&uk, u))
+ uk.ret_flags |= KEVENT_RET_BROKEN;
+
+ uk.ret_flags |= KEVENT_RET_DONE;
+
+ if (copy_to_user(arg, &uk, sizeof(struct ukevent))) {
+ err = -EINVAL;
+ break;
+ }
+
+ arg += sizeof(struct ukevent);
+ }
+out:
+ mutex_unlock(&u->ctl_mutex);
+
+ return err;
+}
+
+static void kevent_user_enqueue(struct kevent_user *u, struct kevent *k)
+{
+ unsigned long flags;
+ unsigned int hash = kevent_user_hash(&k->event);
+
+ spin_lock_irqsave(&u->kevent_lock, flags);
+ list_add_tail(&k->kevent_entry, &u->kevent_list[hash]);
+ u->kevent_num++;
+ kevent_user_get(u);
+ spin_unlock_irqrestore(&u->kevent_lock, flags);
+}
+
+int kevent_user_add_ukevent(struct ukevent *uk, struct kevent_user *u)
+{
+ struct kevent *k;
+ int err;
+
+ k = kevent_alloc(GFP_KERNEL);
+ if (!k) {
+ err = -ENOMEM;
+ goto err_out_exit;
+ }
+
+ memcpy(&k->event, uk, sizeof(struct ukevent));
+
+ k->event.ret_flags = 0;
+
+ err = kevent_init(k);
+ if (err) {
+ kevent_free(k);
+ goto err_out_exit;
+ }
+ k->user = u;
+ kevent_user_stat_increase_total(u);
+ kevent_user_enqueue(u, k);
+
+ err = kevent_enqueue(k);
+ if (err) {
+ memcpy(uk, &k->event, sizeof(struct ukevent));
+ if (err < 0)
+ uk->ret_flags |= KEVENT_RET_BROKEN;
+ uk->ret_flags |= KEVENT_RET_DONE;
+ kevent_finish_user(k, 0);
+ }
+
+err_out_exit:
+ return err;
+}
+
+/*
+ * Copy all ukevents from userspace, allocate kevent for each one
+ * and add them into appropriate kevent_storages,
+ * e.g. sockets, inodes and so on...
+ * If something goes wrong, all events will be dequeued and
+ * negative error will be returned.
+ * On success number of finished events is returned and
+ * Array of finished events (struct ukevent) will be placed behind
+ * kevent_user_control structure. User must run through that array and check
+ * ret_flags field of each ukevent structure to determine if it is fired or failed event.
+ */
+static int kevent_user_ctl_add(struct kevent_user *u, unsigned int num, void __user *arg)
+{
+ int err, cerr = 0, knum = 0, rnum = 0, i;
+ void __user *orig = arg;
+ struct ukevent uk;
+
+ mutex_lock(&u->ctl_mutex);
+
+ err = -ENFILE;
+ if (u->kevent_num + num >= KEVENT_MAX_EVENTS)
+ goto out_remove;
+
+ if (num > KEVENT_MIN_BUFFS_ALLOC) {
+ struct ukevent *ukev;
+
+ ukev = kevent_get_user(num, arg);
+ if (ukev) {
+ for (i=0; i<num; ++i) {
+ err = kevent_user_add_ukevent(&ukev[i], u);
+ if (err) {
+ kevent_user_stat_increase_im(u);
+ if (i != rnum)
+ memcpy(&ukev[rnum], &ukev[i], sizeof(struct ukevent));
+ rnum++;
+ } else
+ knum++;
+ }
+ if (copy_to_user(orig, ukev, rnum*sizeof(struct ukevent)))
+ cerr = -EINVAL;
+ kfree(ukev);
+ goto out_setup;
+ }
+ }
+
+ for (i=0; i<num; ++i) {
+ if (copy_from_user(&uk, arg, sizeof(struct ukevent))) {
+ cerr = -EINVAL;
+ break;
+ }
+ arg += sizeof(struct ukevent);
+
+ err = kevent_user_add_ukevent(&uk, u);
+ if (err) {
+ kevent_user_stat_increase_im(u);
+ if (copy_to_user(orig, &uk, sizeof(struct ukevent))) {
+ cerr = -EINVAL;
+ break;
+ }
+ orig += sizeof(struct ukevent);
+ rnum++;
+ } else
+ knum++;
+ }
+
+out_setup:
+ if (cerr < 0) {
+ err = cerr;
+ goto out_remove;
+ }
+
+ err = rnum;
+out_remove:
+ mutex_unlock(&u->ctl_mutex);
+
+ return err;
+}
+
+/*
+ * In nonblocking mode it returns as many events as possible, but not more than @max_nr.
+ * In blocking mode it waits until timeout or if at least @min_nr events are ready,
+ * if timeout is zero, than it waits no more than 1 second or if at least one event
+ * is ready.
+ */
+static int kevent_user_wait(struct file *file, struct kevent_user *u,
+ unsigned int min_nr, unsigned int max_nr, unsigned int timeout,
+ void __user *buf)
+{
+ struct kevent *k;
+ int cerr = 0, num = 0;
+
+ if (!(file->f_flags & O_NONBLOCK)) {
+ if (timeout)
+ wait_event_interruptible_timeout(u->wait,
+ u->ready_num >= min_nr, msecs_to_jiffies(timeout));
+ else
+ wait_event_interruptible_timeout(u->wait,
+ u->ready_num > 0, msecs_to_jiffies(1000));
+ }
+
+ mutex_lock(&u->ctl_mutex);
+ while (num < max_nr && ((k = kqueue_dequeue_ready(u)) != NULL)) {
+ if (copy_to_user(buf + num*sizeof(struct ukevent),
+ &k->event, sizeof(struct ukevent))) {
+ cerr = -EINVAL;
+ break;
+ }
+
+ /*
+ * If it is one-shot kevent, it has been removed already from
+ * origin's queue, so we can easily free it here.
+ */
+ if (k->event.req_flags & KEVENT_REQ_ONESHOT)
+ kevent_finish_user(k, 1);
+ ++num;
+ kevent_user_stat_increase_wait(u);
+ }
+ mutex_unlock(&u->ctl_mutex);
+
+ return (cerr)?cerr:num;
+}
+
+static int kevent_ctl_init(void)
+{
+ struct kevent_user *u;
+ struct file *file;
+ int fd, ret;
+
+ fd = get_unused_fd();
+ if (fd < 0)
+ return fd;
+
+ file = get_empty_filp();
+ if (!file) {
+ ret = -ENFILE;
+ goto out_put_fd;
+ }
+
+ u = kevent_user_alloc();
+ if (unlikely(!u)) {
+ ret = -ENOMEM;
+ goto out_put_file;
+ }
+
+ file->f_op = &kevent_user_fops;
+ file->f_vfsmnt = mntget(kevent_mnt);
+ file->f_dentry = dget(kevent_mnt->mnt_root);
+ file->f_mapping = file->f_dentry->d_inode->i_mapping;
+ file->f_mode = FMODE_READ;
+ file->f_flags = O_RDONLY;
+ file->private_data = u;
+
+ fd_install(fd, file);
+
+ return fd;
+
+out_put_file:
+ put_filp(file);
+out_put_fd:
+ put_unused_fd(fd);
+ return ret;
+}
+
+static int kevent_ctl_process(struct file *file, unsigned int cmd, unsigned int num, void __user *arg)
+{
+ int err;
+ struct kevent_user *u = file->private_data;
+
+ if (!u)
+ return -EINVAL;
+
+ switch (cmd) {
+ case KEVENT_CTL_ADD:
+ err = kevent_user_ctl_add(u, num, arg);
+ break;
+ case KEVENT_CTL_REMOVE:
+ err = kevent_user_ctl_remove(u, num, arg);
+ break;
+ case KEVENT_CTL_MODIFY:
+ err = kevent_user_ctl_modify(u, num, arg);
+ break;
+ default:
+ err = -EINVAL;
+ break;
+ }
+
+ return err;
+}
+
+asmlinkage long sys_kevent_get_events(int ctl_fd, unsigned int min_nr, unsigned int max_nr,
+ unsigned int timeout, void __user *buf, unsigned flags)
+{
+ int err = -EINVAL, fput_needed;
+ struct file *file;
+ struct kevent_user *u;
+
+ file = fget_light(ctl_fd, &fput_needed);
+ if (!file)
+ return -ENODEV;
+
+ if (file->f_op != &kevent_user_fops)
+ goto out_fput;
+ u = file->private_data;
+
+ err = kevent_user_wait(file, u, min_nr, max_nr, timeout, buf);
+out_fput:
+ fput_light(file, fput_needed);
+ return err;
+}
+
+asmlinkage long sys_kevent_ctl(int fd, unsigned int cmd, unsigned int num, void __user *arg)
+{
+ int err = -EINVAL, fput_needed;
+ struct file *file;
+
+ if (cmd == KEVENT_CTL_INIT)
+ return kevent_ctl_init();
+
+ file = fget_light(fd, &fput_needed);
+ if (!file)
+ return -ENODEV;
+
+ if (file->f_op != &kevent_user_fops)
+ goto out_fput;
+
+ err = kevent_ctl_process(file, cmd, num, arg);
+
+out_fput:
+ fput_light(file, fput_needed);
+ return err;
+}
+
+static int __devinit kevent_user_init(void)
+{
+ struct class_device *dev;
+ int err = 0;
+
+ err = register_filesystem(&kevent_fs_type);
+ if (err)
+ panic("%s: failed to register filesystem: err=%d.\n",
+ kevent_name, err);
+
+ kevent_mnt = kern_mount(&kevent_fs_type);
+ if (IS_ERR(kevent_mnt))
+ panic("%s: failed to mount silesystem: err=%ld.\n",
+ kevent_name, PTR_ERR(kevent_mnt));
+
+ kevent_user_major = register_chrdev(0, kevent_name, &kevent_user_fops);
+ if (kevent_user_major < 0) {
+ printk(KERN_ERR "Failed to register \"%s\" char device: err=%d.\n",
+ kevent_name, kevent_user_major);
+ return -ENODEV;
+ }
+
+ kevent_user_class = class_create(THIS_MODULE, "kevent");
+ if (IS_ERR(kevent_user_class)) {
+ printk(KERN_ERR "Failed to register \"%s\" class: err=%ld.\n",
+ kevent_name, PTR_ERR(kevent_user_class));
+ err = PTR_ERR(kevent_user_class);
+ goto err_out_unregister;
+ }
+
+ dev = class_device_create(kevent_user_class, NULL,
+ MKDEV(kevent_user_major, 0), NULL, kevent_name);
+ if (IS_ERR(dev)) {
+ printk(KERN_ERR "Failed to create %d.%d class device in \"%s\" class: err=%ld.\n",
+ kevent_user_major, 0, kevent_name, PTR_ERR(dev));
+ err = PTR_ERR(dev);
+ goto err_out_class_destroy;
+ }
+
+ printk("KEVENT subsystem: chardev helper: major=%d.\n", kevent_user_major);
+
+ return 0;
+
+err_out_class_destroy:
+ class_destroy(kevent_user_class);
+err_out_unregister:
+ unregister_chrdev(kevent_user_major, kevent_name);
+
+ return err;
+}
+
+static void __devexit kevent_user_fini(void)
+{
+ class_device_destroy(kevent_user_class, MKDEV(kevent_user_major, 0));
+ class_destroy(kevent_user_class);
+ unregister_chrdev(kevent_user_major, kevent_name);
+ mntput(kevent_mnt);
+ unregister_filesystem(&kevent_fs_type);
+}
+
+module_init(kevent_user_init);
+module_exit(kevent_user_fini);
diff --git a/kernel/sys_ni.c b/kernel/sys_ni.c
index 6991bec..8843cca 100644
--- a/kernel/sys_ni.c
+++ b/kernel/sys_ni.c
@@ -122,6 +122,11 @@ cond_syscall(ppc_rtas);
cond_syscall(sys_spu_run);
cond_syscall(sys_spu_create);

+cond_syscall(sys_aio_recv);
+cond_syscall(sys_aio_send);
+cond_syscall(sys_kevent_get_events);
+cond_syscall(sys_kevent_ctl);
+
/* mmu depending weak syscall entries */
cond_syscall(sys_mprotect);
cond_syscall(sys_msync);

2006-08-03 09:23:01

by Evgeniy Polyakov

[permalink] [raw]
Subject: [take3 2/4] kevent: AIO, aio_sendfile() implementation.


AIO, aio_sendfile() implementation.

This patch includes asynchronous propagation of file's data into VFS
cache and aio_sendfile() implementation.
Network aio_sendfile() works lazily - it asynchronously populates pages
into the VFS cache (which can be used for various tricks with adaptive
readahead) and then uses usual ->sendfile() callback.

Signed-off-by: Evgeniy Polyakov <[email protected]>

diff --git a/fs/bio.c b/fs/bio.c
index 6a0b9ad..a3ee530 100644
--- a/fs/bio.c
+++ b/fs/bio.c
@@ -119,7 +119,7 @@ void bio_free(struct bio *bio, struct bi
/*
* default destructor for a bio allocated with bio_alloc_bioset()
*/
-static void bio_fs_destructor(struct bio *bio)
+void bio_fs_destructor(struct bio *bio)
{
bio_free(bio, fs_bio_set);
}
diff --git a/fs/ext2/inode.c b/fs/ext2/inode.c
index fb4d322..9316551 100644
--- a/fs/ext2/inode.c
+++ b/fs/ext2/inode.c
@@ -685,6 +685,7 @@ ext2_writepages(struct address_space *ma
}

const struct address_space_operations ext2_aops = {
+ .get_block = ext2_get_block,
.readpage = ext2_readpage,
.readpages = ext2_readpages,
.writepage = ext2_writepage,
diff --git a/fs/ext3/inode.c b/fs/ext3/inode.c
index c5ee9f0..d9210d4 100644
--- a/fs/ext3/inode.c
+++ b/fs/ext3/inode.c
@@ -1699,6 +1699,7 @@ static int ext3_journalled_set_page_dirt
}

static const struct address_space_operations ext3_ordered_aops = {
+ .get_block = ext3_get_block,
.readpage = ext3_readpage,
.readpages = ext3_readpages,
.writepage = ext3_ordered_writepage,
diff --git a/fs/file_table.c b/fs/file_table.c
index 0131ba0..b649317 100644
--- a/fs/file_table.c
+++ b/fs/file_table.c
@@ -112,6 +112,9 @@ struct file *get_empty_filp(void)
if (security_file_alloc(f))
goto fail_sec;

+#ifdef CONFIG_KEVENT_POLL
+ kevent_storage_init(f, &f->st);
+#endif
tsk = current;
INIT_LIST_HEAD(&f->f_u.fu_list);
atomic_set(&f->f_count, 1);
@@ -159,6 +162,9 @@ void fastcall __fput(struct file *file)
might_sleep();

fsnotify_close(file);
+#ifdef CONFIG_KEVENT_POLL
+ kevent_storage_fini(&file->st);
+#endif
/*
* The function eventpoll_release() should be the first called
* in the file cleanup chain.
diff --git a/fs/inode.c b/fs/inode.c
index 0bf9f04..fdbd0ba 100644
--- a/fs/inode.c
+++ b/fs/inode.c
@@ -21,6 +21,7 @@ #include <linux/pagemap.h>
#include <linux/cdev.h>
#include <linux/bootmem.h>
#include <linux/inotify.h>
+#include <linux/kevent.h>
#include <linux/mount.h>

/*
@@ -165,12 +166,18 @@ #endif
}
memset(&inode->u, 0, sizeof(inode->u));
inode->i_mapping = mapping;
+#if defined CONFIG_KEVENT
+ kevent_storage_init(inode, &inode->st);
+#endif
}
return inode;
}

void destroy_inode(struct inode *inode)
{
+#if defined CONFIG_KEVENT_INODE || defined CONFIG_KEVENT_SOCKET
+ kevent_storage_fini(&inode->st);
+#endif
BUG_ON(inode_has_buffers(inode));
security_inode_free(inode);
if (inode->i_sb->s_op->destroy_inode)
diff --git a/fs/reiserfs/inode.c b/fs/reiserfs/inode.c
index 12dfdcf..f8dca72 100644
--- a/fs/reiserfs/inode.c
+++ b/fs/reiserfs/inode.c
@@ -3001,6 +3001,7 @@ int reiserfs_setattr(struct dentry *dent
}

const struct address_space_operations reiserfs_address_space_operations = {
+ .get_block = reiserfs_get_block,
.writepage = reiserfs_writepage,
.readpage = reiserfs_readpage,
.readpages = reiserfs_readpages,
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 2561020..65eb438 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -240,6 +240,9 @@ #include <linux/mutex.h>
#include <asm/atomic.h>
#include <asm/semaphore.h>
#include <asm/byteorder.h>
+#ifdef CONFIG_KEVENT
+#include <linux/kevent_storage.h>
+#endif

struct hd_geometry;
struct iovec;
@@ -352,6 +355,8 @@ struct address_space;
struct writeback_control;

struct address_space_operations {
+ int (*get_block)(struct inode *inode, sector_t iblock,
+ struct buffer_head *bh_result, int create);
int (*writepage)(struct page *page, struct writeback_control *wbc);
int (*readpage)(struct file *, struct page *);
void (*sync_page)(struct page *);
@@ -546,6 +551,10 @@ #ifdef CONFIG_INOTIFY
struct mutex inotify_mutex; /* protects the watches list */
#endif

+#ifdef CONFIG_KEVENT_INODE
+ struct kevent_storage st;
+#endif
+
unsigned long i_state;
unsigned long dirtied_when; /* jiffies of first dirtying */

@@ -698,6 +707,9 @@ #ifdef CONFIG_EPOLL
struct list_head f_ep_links;
spinlock_t f_ep_lock;
#endif /* #ifdef CONFIG_EPOLL */
+#ifdef CONFIG_KEVENT_POLL
+ struct kevent_storage st;
+#endif
struct address_space *f_mapping;
};
extern spinlock_t files_lock;
diff --git a/include/linux/fsnotify.h b/include/linux/fsnotify.h
index cc5dec7..0acc8db 100644
--- a/include/linux/fsnotify.h
+++ b/include/linux/fsnotify.h
@@ -15,6 +15,7 @@ #ifdef __KERNEL__

#include <linux/dnotify.h>
#include <linux/inotify.h>
+#include <linux/kevent.h>
#include <linux/audit.h>

/*
@@ -79,6 +80,7 @@ static inline void fsnotify_nameremove(s
isdir = IN_ISDIR;
dnotify_parent(dentry, DN_DELETE);
inotify_dentry_parent_queue_event(dentry, IN_DELETE|isdir, 0, dentry->d_name.name);
+ kevent_inode_notify_parent(dentry, KEVENT_INODE_REMOVE);
}

/*
@@ -88,6 +90,7 @@ static inline void fsnotify_inoderemove(
{
inotify_inode_queue_event(inode, IN_DELETE_SELF, 0, NULL, NULL);
inotify_inode_is_dead(inode);
+ kevent_inode_remove(inode);
}

/*
@@ -96,6 +99,7 @@ static inline void fsnotify_inoderemove(
static inline void fsnotify_create(struct inode *inode, struct dentry *dentry)
{
inode_dir_notify(inode, DN_CREATE);
+ kevent_inode_notify(inode, KEVENT_INODE_CREATE);
inotify_inode_queue_event(inode, IN_CREATE, 0, dentry->d_name.name,
dentry->d_inode);
audit_inode_child(dentry->d_name.name, dentry->d_inode, inode->i_ino);
@@ -107,6 +111,7 @@ static inline void fsnotify_create(struc
static inline void fsnotify_mkdir(struct inode *inode, struct dentry *dentry)
{
inode_dir_notify(inode, DN_CREATE);
+ kevent_inode_notify(inode, KEVENT_INODE_CREATE);
inotify_inode_queue_event(inode, IN_CREATE | IN_ISDIR, 0,
dentry->d_name.name, dentry->d_inode);
audit_inode_child(dentry->d_name.name, dentry->d_inode, inode->i_ino);
diff --git a/kernel/kevent/kevent_aio.c b/kernel/kevent/kevent_aio.c
new file mode 100644
index 0000000..9cbba69
--- /dev/null
+++ b/kernel/kevent/kevent_aio.c
@@ -0,0 +1,584 @@
+/*
+ * kevent_aio.c
+ *
+ * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/file.h>
+#include <linux/fs.h>
+#include <linux/swap.h>
+#include <linux/pagemap.h>
+#include <linux/bio.h>
+#include <linux/buffer_head.h>
+#include <linux/kevent.h>
+
+#include <net/sock.h>
+
+#define KEVENT_AIO_DEBUG
+
+#ifdef KEVENT_AIO_DEBUG
+#define dprintk(f, a...) printk(f, ##a)
+#else
+#define dprintk(f, a...) do {} while (0)
+#endif
+
+struct kevent_aio_private
+{
+ int pg_num;
+ size_t size;
+ loff_t offset;
+ loff_t processed;
+ atomic_t bio_page_num;
+ struct completion bio_complete;
+ struct file *file, *sock;
+ struct work_struct work;
+};
+
+static int kevent_aio_dequeue(struct kevent *k);
+static int kevent_aio_enqueue(struct kevent *k);
+static int kevent_aio_callback(struct kevent *k);
+
+extern void bio_fs_destructor(struct bio *bio);
+
+static void kevent_aio_bio_destructor(struct bio *bio)
+{
+ struct kevent *k = bio->bi_private;
+ struct kevent_aio_private *priv = k->priv;
+
+ dprintk("%s: bio=%p, num=%u, k=%p, inode=%p.\n", __func__, bio, bio->bi_vcnt, k, k->st->origin);
+ schedule_work(&priv->work);
+ bio_fs_destructor(bio);
+}
+
+static void kevent_aio_bio_put(struct kevent *k)
+{
+ struct kevent_aio_private *priv = k->priv;
+
+ if (atomic_dec_and_test(&priv->bio_page_num))
+ complete(&priv->bio_complete);
+}
+
+static int kevent_mpage_end_io_read(struct bio *bio, unsigned int bytes_done, int err)
+{
+ const int uptodate = test_bit(BIO_UPTODATE, &bio->bi_flags);
+ struct bio_vec *bvec = bio->bi_io_vec + bio->bi_vcnt - 1;
+ struct kevent *k = bio->bi_private;
+
+ if (bio->bi_size)
+ return 1;
+
+ do {
+ struct page *page = bvec->bv_page;
+
+ if (--bvec >= bio->bi_io_vec)
+ prefetchw(&bvec->bv_page->flags);
+
+ if (uptodate) {
+ SetPageUptodate(page);
+ } else {
+ ClearPageUptodate(page);
+ SetPageError(page);
+ }
+
+ unlock_page(page);
+ kevent_aio_bio_put(k);
+ } while (bvec >= bio->bi_io_vec);
+
+ bio_put(bio);
+ return 0;
+}
+
+static inline struct bio *kevent_mpage_bio_submit(int rw, struct bio *bio)
+{
+ if (bio) {
+ bio->bi_end_io = kevent_mpage_end_io_read;
+ dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt);
+ submit_bio(READ, bio);
+ }
+ return NULL;
+}
+
+static struct bio *kevent_mpage_readpage(struct kevent *k, struct bio *bio,
+ struct page *page, unsigned nr_pages, get_block_t get_block,
+ loff_t *offset, sector_t *last_block_in_bio)
+{
+ struct inode *inode = k->st->origin;
+ const unsigned blkbits = inode->i_blkbits;
+ const unsigned blocks_per_page = PAGE_CACHE_SIZE >> blkbits;
+ const unsigned blocksize = 1 << blkbits;
+ sector_t block_in_file;
+ sector_t last_block;
+ struct block_device *bdev = NULL;
+ unsigned first_hole = blocks_per_page;
+ unsigned page_block;
+ sector_t blocks[MAX_BUF_PER_PAGE];
+ struct buffer_head bh;
+ int fully_mapped = 1, length;
+
+ block_in_file = (*offset + blocksize - 1) >> blkbits;
+ last_block = (i_size_read(inode) + blocksize - 1) >> blkbits;
+
+ bh.b_page = page;
+ for (page_block = 0; page_block < blocks_per_page; page_block++, block_in_file++) {
+ bh.b_state = 0;
+ if (block_in_file < last_block) {
+ if (get_block(inode, block_in_file, &bh, 0))
+ goto confused;
+ }
+
+ if (!buffer_mapped(&bh)) {
+ fully_mapped = 0;
+ if (first_hole == blocks_per_page)
+ first_hole = page_block;
+ continue;
+ }
+
+ /* some filesystems will copy data into the page during
+ * the get_block call, in which case we don't want to
+ * read it again. map_buffer_to_page copies the data
+ * we just collected from get_block into the page's buffers
+ * so readpage doesn't have to repeat the get_block call
+ */
+ if (buffer_uptodate(&bh)) {
+ BUG();
+ //map_buffer_to_page(page, &bh, page_block);
+ goto confused;
+ }
+
+ if (first_hole != blocks_per_page)
+ goto confused; /* hole -> non-hole */
+
+ /* Contiguous blocks? */
+ if (page_block && blocks[page_block-1] != bh.b_blocknr-1)
+ goto confused;
+ blocks[page_block] = bh.b_blocknr;
+ bdev = bh.b_bdev;
+ }
+
+ if (!bdev)
+ goto confused;
+
+ if (first_hole != blocks_per_page) {
+ char *kaddr = kmap_atomic(page, KM_USER0);
+ memset(kaddr + (first_hole << blkbits), 0,
+ PAGE_CACHE_SIZE - (first_hole << blkbits));
+ flush_dcache_page(page);
+ kunmap_atomic(kaddr, KM_USER0);
+ if (first_hole == 0) {
+ SetPageUptodate(page);
+ goto out;
+ }
+ } else if (fully_mapped) {
+ SetPageMappedToDisk(page);
+ }
+
+ /*
+ * This page will go to BIO. Do we need to send this BIO off first?
+ */
+ if (bio && (*last_block_in_bio != blocks[0] - 1))
+ bio = kevent_mpage_bio_submit(READ, bio);
+
+alloc_new:
+ if (bio == NULL) {
+ nr_pages = min_t(unsigned, nr_pages, bio_get_nr_vecs(bdev));
+ bio = bio_alloc(GFP_KERNEL, nr_pages);
+ if (bio == NULL)
+ goto confused;
+
+ bio->bi_destructor = kevent_aio_bio_destructor;
+ bio->bi_bdev = bdev;
+ bio->bi_sector = blocks[0] << (blkbits - 9);
+ bio->bi_private = k;
+ }
+
+ length = first_hole << blkbits;
+ if (bio_add_page(bio, page, length, 0) < length) {
+ bio = kevent_mpage_bio_submit(READ, bio);
+ dprintk("%s: Failed to add a page: nr_pages=%d, length=%d, page=%p.\n",
+ __func__, nr_pages, length, page);
+ goto alloc_new;
+ }
+
+ dprintk("%s: bio=%p, b=%d, m=%d, u=%d, nr_pages=%d, offset=%Lu, "
+ "size=%Lu. page_block=%u, page=%p.\n",
+ __func__, bio, buffer_boundary(&bh), buffer_mapped(&bh),
+ buffer_uptodate(&bh), nr_pages, *offset, i_size_read(inode),
+ page_block, page);
+
+ *offset = *offset + length;
+
+ if (buffer_boundary(&bh) || (first_hole != blocks_per_page))
+ bio = kevent_mpage_bio_submit(READ, bio);
+ else
+ *last_block_in_bio = blocks[blocks_per_page - 1];
+
+out:
+ return bio;
+
+confused:
+ dprintk("%s: confused. bio=%p, nr_pages=%d.\n", __func__, bio, nr_pages);
+ if (bio)
+ bio = kevent_mpage_bio_submit(READ, bio);
+ kevent_aio_bio_put(k);
+ SetPageUptodate(page);
+
+ if (nr_pages == 1) {
+ struct kevent_aio_private *priv = k->priv;
+
+ wait_for_completion(&priv->bio_complete);
+ kevent_storage_ready(k->st, NULL, KEVENT_AIO_BIO);
+ init_completion(&priv->bio_complete);
+ complete(&priv->bio_complete);
+ }
+ goto out;
+}
+
+static int kevent_aio_alloc_cached_page(struct kevent *k, struct page **cached_page)
+{
+ struct kevent_aio_private *priv = k->priv;
+ struct address_space *mapping = priv->file->f_mapping;
+ struct page *page;
+ int err = 0;
+ pgoff_t index = priv->offset >> PAGE_CACHE_SHIFT;
+
+ page = page_cache_alloc_cold(mapping);
+ if (!page) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ err = add_to_page_cache_lru(page, mapping, index, GFP_KERNEL);
+ if (err) {
+ if (err == -EEXIST)
+ err = 0;
+ page_cache_release(page);
+ goto out;
+ }
+
+ dprintk("%s: page=%p, offset=%Lu, processed=%Lu, index=%lu, size=%zu.\n",
+ __func__, page, priv->offset, priv->processed, index, priv->size);
+
+ *cached_page = page;
+
+out:
+ return err;
+}
+
+static int kevent_mpage_readpages(struct kevent *k, int first,
+ int (* get_block)(struct inode *inode, sector_t iblock,
+ struct buffer_head *bh_result, int create))
+{
+ struct bio *bio = NULL;
+ struct kevent_aio_private *priv = k->priv;
+ sector_t last_block_in_bio = 0;
+ int i, err = 0;
+
+ atomic_set(&priv->bio_page_num, priv->pg_num);
+
+ for (i=first; i<priv->pg_num; ++i) {
+ struct page *page = NULL;
+
+ err = kevent_aio_alloc_cached_page(k, &page);
+ if (err)
+ break;
+
+ /*
+ * If there is no error and page is NULL, this means
+ * that someone added a page into VFS cache.
+ * We will not process this page, since it is that who
+ * added a page must read data from disk.
+ */
+ if (!page)
+ continue;
+
+ bio = kevent_mpage_readpage(k, bio, page, priv->pg_num - i,
+ get_block, &priv->offset, &last_block_in_bio);
+ }
+
+ if (bio)
+ bio = kevent_mpage_bio_submit(READ, bio);
+
+ return err;
+}
+
+static size_t kevent_aio_vfs_read_actor(struct kevent *k, struct page *kpage, size_t len)
+{
+ struct kevent_aio_private *priv = k->priv;
+ size_t ret;
+
+ ret = priv->sock->f_op->sendpage(priv->sock, kpage, 0, len, &priv->sock->f_pos, 1);
+
+ dprintk("%s: k=%p, page=%p, len=%zu, ret=%zd.\n",
+ __func__, k, kpage, len, ret);
+
+ return ret;
+}
+
+static int kevent_aio_vfs_read(struct kevent *k,
+ size_t (*actor)(struct kevent *, struct page *, size_t))
+{
+ struct kevent_aio_private *priv = k->priv;
+ struct address_space *mapping;
+ size_t isize, actor_size;
+ int i;
+
+ mapping = priv->file->f_mapping;
+ isize = i_size_read(priv->file->f_dentry->d_inode);
+
+ dprintk("%s: start: size_left=%zd, offset=%Lu, processed=%Lu, isize=%zu, pg_num=%d.\n",
+ __func__, priv->size, priv->offset, priv->processed, isize, priv->pg_num);
+
+ for (i=0; i<priv->pg_num && priv->size; ++i) {
+ struct page *page;
+ size_t nr = PAGE_CACHE_SIZE;
+
+ cond_resched();
+ page = find_get_page(mapping, priv->processed >> PAGE_CACHE_SHIFT);
+ if (unlikely(page == NULL))
+ break;
+ if (!PageUptodate(page)) {
+ dprintk("%s: %2d: page=%p, processed=%Lu, size=%zu not uptodate.\n",
+ __func__, i, page, priv->processed, priv->size);
+ page_cache_release(page);
+ break;
+ }
+
+ if (mapping_writably_mapped(mapping))
+ flush_dcache_page(page);
+
+ mark_page_accessed(page);
+
+ if (nr + priv->processed > isize)
+ nr = isize - priv->processed;
+ if (nr > priv->size)
+ nr = priv->size;
+
+ actor_size = actor(k, page, nr);
+ if (actor_size < 0) {
+ page_cache_release(page);
+ break;
+ }
+
+ page_cache_release(page);
+
+ priv->processed += actor_size;
+ priv->size -= actor_size;
+ }
+
+ if (!priv->size)
+ i = priv->pg_num;
+
+ if (i != priv->pg_num)
+ priv->offset = priv->processed;
+
+ dprintk("%s: end: next=%d, num=%d, left=%zu, offset=%Lu, procesed=%Lu, ret=%d.\n",
+ __func__, i, priv->pg_num,
+ priv->size, priv->offset, priv->processed, i);
+
+ return i;
+}
+
+static int kevent_aio_callback(struct kevent *k)
+{
+ return 1;
+}
+
+static void kevent_aio_work(void *data)
+{
+ struct kevent *k = data;
+ struct kevent_aio_private *priv = k->priv;
+ struct inode *inode = k->st->origin;
+ struct address_space *mapping = priv->file->f_mapping;
+ int err, ready = 0, num;
+
+ dprintk("%s: k=%p, priv=%p, inode=%p.\n", __func__, k, priv, inode);
+
+ init_completion(&priv->bio_complete);
+
+ num = ready = kevent_aio_vfs_read(k, &kevent_aio_vfs_read_actor);
+ if (ready > 0 && ready != priv->pg_num)
+ ready = 0;
+
+ dprintk("%s: k=%p, ready=%d, size=%zd.\n", __func__, k, ready, priv->size);
+
+ if (!ready) {
+ err = kevent_mpage_readpages(k, num, mapping->a_ops->get_block);
+ if (err) {
+ dprintk("%s: kevent_mpage_readpages failed: err=%d, k=%p, size=%zd.\n",
+ __func__, err, k, priv->size);
+ kevent_break(k);
+ kevent_storage_ready(k->st, NULL, KEVENT_MASK_ALL);
+ }
+ } else {
+ dprintk("%s: next k=%p, size=%zd.\n", __func__, k, priv->size);
+
+ if (priv->size)
+ schedule_work(&priv->work);
+ else {
+ kevent_storage_ready(k->st, NULL, KEVENT_MASK_ALL);
+ }
+
+ complete(&priv->bio_complete);
+ }
+}
+
+static int kevent_aio_enqueue(struct kevent *k)
+{
+ int err;
+ struct file *file, *sock;
+ struct inode *inode;
+ struct kevent_aio_private *priv;
+ struct address_space *mapping;
+ int fd = k->event.id.raw[0];
+ int num = k->event.id.raw[1];
+ int s = k->event.ret_data[0];
+ size_t size;
+
+ err = -ENODEV;
+ file = fget(fd);
+ if (!file)
+ goto err_out_exit;
+
+ sock = fget(s);
+ if (!sock)
+ goto err_out_fput_file;
+
+ mapping = file->f_mapping;
+
+ err = -EINVAL;
+ if (!file->f_dentry || !file->f_dentry->d_inode || !mapping->a_ops->get_block)
+ goto err_out_fput;
+ if (!sock->f_dentry || !sock->f_dentry->d_inode)
+ goto err_out_fput;
+
+ inode = igrab(file->f_dentry->d_inode);
+ if (!inode)
+ goto err_out_fput;
+
+ size = i_size_read(inode);
+
+ num = (size > num << PAGE_SHIFT) ? num : (size >> PAGE_SHIFT);
+
+ err = -ENOMEM;
+ priv = kzalloc(sizeof(struct kevent_aio_private), GFP_KERNEL);
+ if (!priv)
+ goto err_out_iput;
+
+ priv->pg_num = num;
+ priv->size = size;
+ priv->offset = 0;
+ priv->file = file;
+ priv->sock = sock;
+ INIT_WORK(&priv->work, kevent_aio_work, k);
+ k->priv = priv;
+
+ dprintk("%s: read: k=%p, priv=%p, inode=%p, num=%u, size=%zu, off=%Lu.\n",
+ __func__, k, priv, inode, priv->pg_num, priv->size, priv->offset);
+
+ init_completion(&priv->bio_complete);
+ kevent_storage_enqueue(&inode->st, k);
+ schedule_work(&priv->work);
+
+ return 0;
+
+err_out_iput:
+ iput(inode);
+err_out_fput:
+ fput(sock);
+err_out_fput_file:
+ fput(file);
+err_out_exit:
+
+ return err;
+}
+
+static int kevent_aio_dequeue(struct kevent *k)
+{
+ struct kevent_aio_private *priv = k->priv;
+ struct inode *inode = k->st->origin;
+ struct file *file = priv->file;
+ struct file *sock = priv->sock;
+
+ kevent_storage_dequeue(k->st, k);
+ flush_scheduled_work();
+ wait_for_completion(&priv->bio_complete);
+
+ kfree(k->priv);
+ k->priv = NULL;
+ iput(inode);
+ fput(file);
+ fput(sock);
+
+ return 0;
+}
+
+asmlinkage long sys_aio_sendfile(int ctl_fd, int fd, int s,
+ size_t size, unsigned flags)
+{
+ struct ukevent ukread, uksend;
+ struct kevent_user *u;
+ struct file *file;
+ int err, fput_needed;
+ int num = (flags & 7)?(flags & 7):8;
+
+ memset(&ukread, 0, sizeof(struct ukevent));
+ memset(&uksend, 0, sizeof(struct ukevent));
+
+ ukread.type = KEVENT_AIO;
+ ukread.event = KEVENT_AIO_BIO;
+
+ ukread.id.raw[0] = fd;
+ ukread.id.raw[1] = num;
+ ukread.ret_data[0] = s;
+
+ dprintk("%s: fd=%d, s=%d, num=%d.\n", __func__, fd, s, num);
+
+ file = fget_light(ctl_fd, &fput_needed);
+ if (!file)
+ return -ENODEV;
+
+ u = file->private_data;
+ if (!u) {
+ err = -EINVAL;
+ goto err_out_fput;
+ }
+
+ err = kevent_user_add_ukevent(&ukread, u);
+ if (err < 0)
+ goto err_out_fput;
+
+err_out_fput:
+ fput_light(file, fput_needed);
+ return err;
+}
+
+static int __init kevent_init_aio(void)
+{
+ struct kevent_callbacks *ac = &kevent_registered_callbacks[KEVENT_AIO];
+
+ ac->enqueue = &kevent_aio_enqueue;
+ ac->dequeue = &kevent_aio_dequeue;
+ ac->callback = &kevent_aio_callback;
+
+ return 0;
+}
+late_initcall(kevent_init_aio);
diff --git a/kernel/kevent/kevent_inode.c b/kernel/kevent/kevent_inode.c
new file mode 100644
index 0000000..1626067
--- /dev/null
+++ b/kernel/kevent/kevent_inode.c
@@ -0,0 +1,114 @@
+/*
+ * kevent_inode.c
+ *
+ * 2006 Copyright (c) Evgeniy Polyakov <[email protected]>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/timer.h>
+#include <linux/file.h>
+#include <linux/kevent.h>
+#include <linux/fs.h>
+
+static int kevent_inode_enqueue(struct kevent *k)
+{
+ struct file *file;
+ struct inode *inode;
+ int err, fput_needed;
+
+ file = fget_light(k->event.id.raw[0], &fput_needed);
+ if (!file)
+ return -ENODEV;
+
+ err = -EINVAL;
+ if (!file->f_dentry || !file->f_dentry->d_inode)
+ goto err_out_fput;
+
+ inode = igrab(file->f_dentry->d_inode);
+ if (!inode)
+ goto err_out_fput;
+
+ err = kevent_storage_enqueue(&inode->st, k);
+ if (err)
+ goto err_out_iput;
+
+ fput_light(file, fput_needed);
+ return 0;
+
+err_out_iput:
+ iput(inode);
+err_out_fput:
+ fput_light(file, fput_needed);
+ return err;
+}
+
+static int kevent_inode_dequeue(struct kevent *k)
+{
+ struct inode *inode = k->st->origin;
+
+ kevent_storage_dequeue(k->st, k);
+ iput(inode);
+
+ return 0;
+}
+
+static int kevent_inode_callback(struct kevent *k)
+{
+ return 1;
+}
+
+void kevent_inode_notify_parent(struct dentry *dentry, u32 event)
+{
+ struct dentry *parent;
+ struct inode *inode;
+
+ spin_lock(&dentry->d_lock);
+ parent = dentry->d_parent;
+ inode = parent->d_inode;
+
+ dget(parent);
+ spin_unlock(&dentry->d_lock);
+ kevent_inode_notify(inode, KEVENT_INODE_REMOVE);
+ dput(parent);
+}
+
+void kevent_inode_remove(struct inode *inode)
+{
+ kevent_storage_fini(&inode->st);
+}
+
+void kevent_inode_notify(struct inode *inode, u32 event)
+{
+ kevent_storage_ready(&inode->st, NULL, event);
+}
+
+static int __init kevent_init_inode(void)
+{
+ struct kevent_callbacks *ic = &kevent_registered_callbacks[KEVENT_INODE];
+
+ ic->enqueue = &kevent_inode_enqueue;
+ ic->dequeue = &kevent_inode_dequeue;
+ ic->callback = &kevent_inode_callback;
+
+ return 0;
+}
+late_initcall(kevent_init_inode);

2006-08-03 09:41:40

by Evgeniy Polyakov

[permalink] [raw]
Subject: Re: [take3 0/4] kevent: Generic event handling mechanism.

On Thu, Aug 03, 2006 at 01:45:59PM +0400, Evgeniy Polyakov ([email protected]) wrote:
> Changes from 'take2' patchset:
> * split kevent_finish_user() to locked and unlocked variants
> * do not use KEVENT_STAT ifdefs, use inline functions instead
> * use array of callbacks of each type instead of each kevent callback initialization
> * changed name of ukevent guarding lock
> * use only one kevent lock in kevent_user for all hash buckets instead of per-bucket locks
> * do not use kevent_user_ctl structure instead provide needed arguments as syscall parameters
> * various indent cleanups
> * mapped buffer (initial) implementation (no userspace yet)

Also added optimisation aimed to help when a lot of kevents are being
copied from userspace in one syscall.

--
Evgeniy Polyakov

2006-08-03 09:43:12

by Eric Dumazet

[permalink] [raw]
Subject: Re: [take3 4/4] kevent: poll/select() notifications. Timer notifications.

On Thursday 03 August 2006 11:46, Evgeniy Polyakov wrote:
> poll/select() notifications. Timer notifications.
>
> +++ b/kernel/kevent/kevent_poll.c

> +static int kevent_poll_wait_callback(wait_queue_t *wait,
> + unsigned mode, int sync, void *key)
> +{
> + struct kevent_poll_wait_container *cont =
> + container_of(wait, struct kevent_poll_wait_container, wait);
> + struct kevent *k = cont->k;
> + struct file *file = k->st->origin;
> + unsigned long flags;
> + u32 revents, event;
> +
> + revents = file->f_op->poll(file, NULL);
> + spin_lock_irqsave(&k->ulock, flags);
> + event = k->event.event;
> + spin_unlock_irqrestore(&k->ulock, flags);

Not sure why you take a spinlock just to read a u32

Eric

2006-08-03 09:49:09

by Evgeniy Polyakov

[permalink] [raw]
Subject: Re: [take3 4/4] kevent: poll/select() notifications. Timer notifications.

On Thu, Aug 03, 2006 at 11:43:02AM +0200, Eric Dumazet ([email protected]) wrote:
> On Thursday 03 August 2006 11:46, Evgeniy Polyakov wrote:
> > poll/select() notifications. Timer notifications.
> >
> > +++ b/kernel/kevent/kevent_poll.c
>
> > +static int kevent_poll_wait_callback(wait_queue_t *wait,
> > + unsigned mode, int sync, void *key)
> > +{
> > + struct kevent_poll_wait_container *cont =
> > + container_of(wait, struct kevent_poll_wait_container, wait);
> > + struct kevent *k = cont->k;
> > + struct file *file = k->st->origin;
> > + unsigned long flags;
> > + u32 revents, event;
> > +
> > + revents = file->f_op->poll(file, NULL);
> > + spin_lock_irqsave(&k->ulock, flags);
> > + event = k->event.event;
> > + spin_unlock_irqrestore(&k->ulock, flags);
>
> Not sure why you take a spinlock just to read a u32

You are right, it is not needed there.
Thank you.

> Eric

--
Evgeniy Polyakov

2006-08-03 09:54:31

by Eric Dumazet

[permalink] [raw]
Subject: Re: [take3 3/4] kevent: Network AIO, socket notifications.

On Thursday 03 August 2006 11:46, Evgeniy Polyakov wrote:
> Network AIO, socket notifications.
>
> This patchset includes socket notifications and network asynchronous IO.
> Network AIO is based on kevent and works as usual kevent storage on top
> of inode.
> (3 * TCP_RTO_MIN) / 4,
> diff --git a/kernel/kevent/kevent_naio.c b/kernel/kevent/kevent_naio.c
+
> +static int kevent_naio_enqueue(struct kevent *k)
> +{
> + int err, i;
> + struct page **page;
> + void *addr;
> + unsigned int size = k->event.id.raw[1];
> + int num = size/PAGE_SIZE;
> + struct file *file;
> + struct sock *sk = NULL;
> + int fput_needed;
> +
> + file = fget_light(k->event.id.raw[0], &fput_needed);
> + if (!file)
> + return -ENODEV;
> +
> + err = -EINVAL;
> + if (!file->f_dentry || !file->f_dentry->d_inode)
> + goto err_out_fput;

How can you be 100% sure this file is actually a socket here ?
(Another thread could close the fd and this fd can now point to another file)

You should do
if (file->f_op != &socket_file_ops)
goto err_out_fput;
sk = file->private_data; /* set in sock_map_fd */

> +
> + sk = SOCKET_I(file->f_dentry->d_inode)->sk;
> +


Eric

2006-08-03 10:14:45

by Evgeniy Polyakov

[permalink] [raw]
Subject: Re: [take3 3/4] kevent: Network AIO, socket notifications.

On Thu, Aug 03, 2006 at 11:54:26AM +0200, Eric Dumazet ([email protected]) wrote:
> On Thursday 03 August 2006 11:46, Evgeniy Polyakov wrote:
> > Network AIO, socket notifications.
> >
> > This patchset includes socket notifications and network asynchronous IO.
> > Network AIO is based on kevent and works as usual kevent storage on top
> > of inode.

> > + file = fget_light(k->event.id.raw[0], &fput_needed);
> > + if (!file)
> > + return -ENODEV;
> > +
> > + err = -EINVAL;
> > + if (!file->f_dentry || !file->f_dentry->d_inode)
> > + goto err_out_fput;
>
> How can you be 100% sure this file is actually a socket here ?
> (Another thread could close the fd and this fd can now point to another file)
>
> You should do
> if (file->f_op != &socket_file_ops)
> goto err_out_fput;
> sk = file->private_data; /* set in sock_map_fd */

That will be socket, not sock, but that check is definitely needed in
both socket and network aio code.
Thanks Eric.

> Eric

--
Evgeniy Polyakov

2006-08-03 14:40:45

by Eric Dumazet

[permalink] [raw]
Subject: Re: [take3 1/4] kevent: Core files.

On Thursday 03 August 2006 11:46, Evgeniy Polyakov wrote:
> Core files.
>
> This patch includes core kevent files:
> - userspace controlling
> - kernelspace interfaces
> - initialization
> - notification state machines
>

> +static int kevent_user_wait(struct file *file, struct kevent_user *u,
> + unsigned int min_nr, unsigned int max_nr, unsigned int timeout,
> + void __user *buf)
> +{
>

> + mutex_lock(&u->ctl_mutex);
> + while (num < max_nr && ((k = kqueue_dequeue_ready(u)) != NULL)) {
> + if (copy_to_user(buf + num*sizeof(struct ukevent),
> + &k->event, sizeof(struct ukevent))) {
> + cerr = -EINVAL;
> + break;
> + }


It seems quite wrong to hold ctl_mutex while doing a copy_to_user() (of
possibly a large amount of data) : A thread can sleep on a page fault and
other threads cannot make progress.

Eric

2006-08-03 14:57:04

by Evgeniy Polyakov

[permalink] [raw]
Subject: Re: [take3 1/4] kevent: Core files.

On Thu, Aug 03, 2006 at 04:40:34PM +0200, Eric Dumazet ([email protected]) wrote:
> > + mutex_lock(&u->ctl_mutex);
> > + while (num < max_nr && ((k = kqueue_dequeue_ready(u)) != NULL)) {
> > + if (copy_to_user(buf + num*sizeof(struct ukevent),
> > + &k->event, sizeof(struct ukevent))) {
> > + cerr = -EINVAL;
> > + break;
> > + }
>
>
> It seems quite wrong to hold ctl_mutex while doing a copy_to_user() (of
> possibly a large amount of data) : A thread can sleep on a page fault and
> other threads cannot make progress.

I would not call that wrong - system prevents some threads from removing
kevents which are counted to be transfered to the userspace, i.e. when
dequeuing was awakened and it had seen some events it is possible, that
when it will dequeue them part will be removed by other thread, so I
prevent this.

> Eric

--
Evgeniy Polyakov

2006-08-03 15:12:05

by Eric Dumazet

[permalink] [raw]
Subject: Re: [take3 1/4] kevent: Core files.

On Thursday 03 August 2006 16:55, Evgeniy Polyakov wrote:
> On Thu, Aug 03, 2006 at 04:40:34PM +0200, Eric Dumazet ([email protected])
wrote:
> > > + mutex_lock(&u->ctl_mutex);
> > > + while (num < max_nr && ((k = kqueue_dequeue_ready(u)) != NULL)) {
> > > + if (copy_to_user(buf + num*sizeof(struct ukevent),
> > > + &k->event, sizeof(struct ukevent))) {
> > > + cerr = -EINVAL;
> > > + break;
> > > + }
> >
> > It seems quite wrong to hold ctl_mutex while doing a copy_to_user() (of
> > possibly a large amount of data) : A thread can sleep on a page fault and
> > other threads cannot make progress.
>
> I would not call that wrong - system prevents some threads from removing
> kevents which are counted to be transfered to the userspace, i.e. when
> dequeuing was awakened and it had seen some events it is possible, that
> when it will dequeue them part will be removed by other thread, so I
> prevent this.

Hum, "wrong" was maybe not the good word.... but kqueue_dequeue_ready() uses a
spinlock (ready_lock) to protect ready_list. One particular struct kevent is
given to one thread, one at a time.

If you look at fs/eventpoll.c, you can see how carefull is ep_send_events() so
that multiple threads can in the same time transfer different items to user
memory.

In a model where several threads are servicing events collected by a single
point (epoll, or kevent), this is important to not block all threads because
of a single thread waiting a swapin (trigered by copy_to_user() )

Eric

2006-08-03 15:22:37

by Evgeniy Polyakov

[permalink] [raw]
Subject: Re: [take3 1/4] kevent: Core files.

On Thu, Aug 03, 2006 at 05:11:58PM +0200, Eric Dumazet ([email protected]) wrote:
> On Thursday 03 August 2006 16:55, Evgeniy Polyakov wrote:
> > On Thu, Aug 03, 2006 at 04:40:34PM +0200, Eric Dumazet ([email protected])
> wrote:
> > > > + mutex_lock(&u->ctl_mutex);
> > > > + while (num < max_nr && ((k = kqueue_dequeue_ready(u)) != NULL)) {
> > > > + if (copy_to_user(buf + num*sizeof(struct ukevent),
> > > > + &k->event, sizeof(struct ukevent))) {
> > > > + cerr = -EINVAL;
> > > > + break;
> > > > + }
> > >
> > > It seems quite wrong to hold ctl_mutex while doing a copy_to_user() (of
> > > possibly a large amount of data) : A thread can sleep on a page fault and
> > > other threads cannot make progress.
> >
> > I would not call that wrong - system prevents some threads from removing
> > kevents which are counted to be transfered to the userspace, i.e. when
> > dequeuing was awakened and it had seen some events it is possible, that
> > when it will dequeue them part will be removed by other thread, so I
> > prevent this.
>
> Hum, "wrong" was maybe not the good word.... but kqueue_dequeue_ready() uses a
> spinlock (ready_lock) to protect ready_list. One particular struct kevent is
> given to one thread, one at a time.

I mean that wait_event logic will see that there are requested number of
events, and when it starts to get them, it is possible that there will
be no events at all.

> If you look at fs/eventpoll.c, you can see how carefull is ep_send_events() so
> that multiple threads can in the same time transfer different items to user
> memory.

It is done under the same logic under ep->sem semaphore, which is being
held for del and read operations.
Or do you mean to have rw semahore instead of mutex here?

> In a model where several threads are servicing events collected by a single
> point (epoll, or kevent), this is important to not block all threads because
> of a single thread waiting a swapin (trigered by copy_to_user() )

> Eric

--
Evgeniy Polyakov

2006-08-03 17:04:45

by Badari Pulavarty

[permalink] [raw]
Subject: Re: [take3 2/4] kevent: AIO, aio_sendfile() implementation.

Evgeniy Polyakov wrote:
> AIO, aio_sendfile() implementation.
>
> This patch includes asynchronous propagation of file's data into VFS
> cache and aio_sendfile() implementation.
> Network aio_sendfile() works lazily - it asynchronously populates pages
> into the VFS cache (which can be used for various tricks with adaptive
> readahead) and then uses usual ->sendfile() callback.
>
> ...
> --- /dev/null
> +++ b/kernel/kevent/kevent_aio.c
> @@ -0,0 +1,584 @@
> +/*
> + * kevent_aio.c
> + *
>
Since this is *almost* same as mpage.c code, wondering if its possible
to make common
generic/helper routines in mpage.c and use it here ?

Thanks,
Badari

2006-08-03 17:14:44

by Evgeniy Polyakov

[permalink] [raw]
Subject: Re: [take3 2/4] kevent: AIO, aio_sendfile() implementation.

On Thu, Aug 03, 2006 at 10:04:36AM -0700, Badari Pulavarty ([email protected]) wrote:
> Evgeniy Polyakov wrote:
> >AIO, aio_sendfile() implementation.
> >
> >This patch includes asynchronous propagation of file's data into VFS
> >cache and aio_sendfile() implementation.
> >Network aio_sendfile() works lazily - it asynchronously populates pages
> >into the VFS cache (which can be used for various tricks with adaptive
> >readahead) and then uses usual ->sendfile() callback.
> >
> >...
> >--- /dev/null
> >+++ b/kernel/kevent/kevent_aio.c
> >@@ -0,0 +1,584 @@
> >+/*
> >+ * kevent_aio.c
> >+ *
> >
> Since this is *almost* same as mpage.c code, wondering if its possible
> to make common
> generic/helper routines in mpage.c and use it here ?

Yes, as I mentioned in mail to Christoph, I did it just to separate
kevent as much as possible (so I introduced ->get_block() based
approach). It can be safely moved into mpage code and used from more
clear callback like ->readpage().
Since this AIO code was decided to be postponed for a while, I'm not
updating it (just make sure that it compiles with new changes), since
overall design of AIO changes (if any) is not 100% completed.

> Thanks,
> Badari

--
Evgeniy Polyakov

2006-08-03 21:38:12

by David Miller

[permalink] [raw]
Subject: Re: [take3 1/4] kevent: Core files.

From: Evgeniy Polyakov <[email protected]>
Date: Thu, 3 Aug 2006 18:55:57 +0400

> I would not call that wrong - system prevents some threads from removing
> kevents which are counted to be transfered to the userspace, i.e. when
> dequeuing was awakened and it had seen some events it is possible, that
> when it will dequeue them part will be removed by other thread, so I
> prevent this.

Queue is all that matters to be synchronized, so it seems
better to have a mutex on the queue rather than a global
one. That way, user can only hurt himself.