2002-11-30 17:23:50

by Krzysztof Benedyczak

[permalink] [raw]
Subject: [PATCH] POSIX message queues, 2.5.50

Hello,

After some (rather long) work we have finished and tested
(also on SMP machine) new implementation of POSIX message queues.

As it was suggested it is made as a filesystem.

Recently there were Peter Waechtler's patch with the same functionality.
As it is concurrent to us I will point out some key differences (and
advantages of our implementation as I think :-)

- In order to work mqueue fs must be mounted (under /dev/mqueue if we
one want to use our lib)
- system read from queue file will return some info about it
also main vfs functions are supported
- we don't create any new system call (but if it is now preferred to
don't use ioctls we can change it in one day)
- this implementation doesn't change anything in the rest of kernel

and two most important ones:

- our implementation does support priority scheduling which is omitted in
Peter's version (meaning that if many processes wait e.g. for a message
_random_ one will get it). It is important because developers could rely
on this feature - and it is as I think the most difficult part of
implementation
- our version was quit well tested - with Peter's patch (at least this
for 2.5.46) I've had many problems.


Finally one note about library - it is possible to wget it from
http://www.mat.uni.torun.pl/~golbi/mqueuelib-3.0beta.tar.gz
but it is beta version (mainly because docs weren't updated)
We plan to finish it (and update our page) on next Tuesday.


Is this version acceptable for applying now?

Regards
Krzysiek Benedyczak


-----------mqueue-3.0 patch for 2.5.50----------------
diff -urN linux-org/CREDITS linux-patched/CREDITS
--- linux-org/CREDITS Wed Nov 27 23:36:15 2002
+++ linux-patched/CREDITS Sat Nov 30 12:45:32 2002
@@ -279,6 +279,15 @@
S: Greenbelt, Maryland 20771
S: USA

+N: Krzysztof Benedyczak
+E: [email protected]
+W: http://www.mat.uni.torun.pl/~golbi
+D: POSIX message queues fs (with M. Wronski)
+S: ul. Podmiejska 52
+S: Radunica
+S: 83-000 Pruszcz Gdanski
+S: Poland
+
N: Randolph Bentson
E: [email protected]
W: http://www.aa.net/~bentson/
@@ -3380,6 +3389,14 @@
S: Portland, OR 97204
S: USA

+N: Michal Wronski
+E: [email protected]
+W: http://www.mat.uni.torun.pl/~wrona
+D: POSIX message queues fs (with K. Benedyczak)
+S: ul. Teczowa 23/12
+S: 80-680 Gdansk-Sobieszewo
+S: Poland
+
N: Frank Xia
E: [email protected]
D: Xiafs filesystem [defunct]
diff -urN linux-org/Documentation/ioctl-number.txt linux-patched/Documentation/ioctl-number.txt
--- linux-org/Documentation/ioctl-number.txt Wed Nov 27 23:36:20 2002
+++ linux-patched/Documentation/ioctl-number.txt Fri Nov 29 21:03:35 2002
@@ -186,5 +186,6 @@
0xB0 all RATIO devices in development:
<mailto:[email protected]>
0xB1 00-1F PPPoX <mailto:[email protected]>
+0xB2 00-04 linux/mqueue.h <http://mat.uni.torun.pl/~wrona/posix_ipc>
0xCB 00-1F CBM serial IEC bus in development:
<mailto:[email protected]>
diff -urN linux-org/fs/Kconfig linux-patched/fs/Kconfig
--- linux-org/fs/Kconfig Wed Nov 27 23:36:03 2002
+++ linux-patched/fs/Kconfig Sat Nov 30 12:43:14 2002
@@ -600,6 +600,23 @@

See <file:Documentation/filesystems/tmpfs.txt> for details.

+config POSIX_MQUEUE
+ bool "POSIX Message Queues"
+ ---help---
+ POSIX variant of message queues is a part of IPC. In POSIX message
+ queues every message has a priority which decides about succession
+ of receiving it by a process. If you want to compile and run
+ programs written e.g. for Solaris with use of its POSIX message
+ queues (functions mq_*) say Y here. To use this feature you will
+ also need mqueue library, available from
+ <http://www.mat.uni.torun.pl/~wrona/posix_ipc/>
+
+ POSIX message queues are visible as a filesystem called 'mqueue'
+ and should be mounted in /dev/mqueue in order to work with standard
+ library.
+
+ If unsure, say N.
+
config RAMFS
bool
default y
diff -urN linux-org/include/linux/mqueue.h linux-patched/include/linux/mqueue.h
--- linux-org/include/linux/mqueue.h Thu Jan 1 01:00:00 1970
+++ linux-patched/include/linux/mqueue.h Fri Nov 29 21:23:12 2002
@@ -0,0 +1,43 @@
+#ifndef _LINUX_MQUEUE_H
+#define _LINUX_MQUEUE_H
+
+#include <linux/init.h>
+
+#define MQ_MAX 64 /* max number of message queues */
+
+#define MQ_MAXMSG 40 /* max number of messages in each queue */
+#define MQ_MSGSIZE 16384 /* max message size */
+#define MQ_MAXSYSSIZE 1048576 /* max size that all m.q. can have together */
+#define MQ_PRIO_MAX 32768 /* max priority */
+
+typedef int mqd_t;
+
+
+struct mq_attr {
+ long mq_flags; /* message queue flags */
+ long mq_maxmsg; /* maximum number of messages */
+ long mq_msgsize; /* maximum message size */
+ long mq_curmsgs; /* number of messages currently queued */
+};
+
+/*
+* struct for passing data via ioctls calls
+*/
+
+/* the same for send&receive */
+struct ioctl_mq_sndrcv {
+ const char *msg_ptr;
+ int msg_len;
+ unsigned int msg_prio;
+ struct timespec *timeout;
+};
+
+
+#define MQ_CREATE _IOW(0xB2, 0, struct mq_attr)
+#define MQ_GETATTR _IOR(0xB2, 1, struct mq_attr)
+#define MQ_SEND _IOW(0xB2, 2, struct ioctl_mq_sndrcv)
+#define MQ_RECEIVE _IOR(0xB2, 3, struct ioctl_mq_sndrcv)
+#define MQ_NOTIFY _IOW(0xB2, 4, struct sigevent)
+
+
+#endif
diff -urN linux-org/ipc/Makefile linux-patched/ipc/Makefile
--- linux-org/ipc/Makefile Wed Nov 27 23:35:50 2002
+++ linux-patched/ipc/Makefile Wed Nov 27 21:13:35 2002
@@ -5,5 +5,6 @@
obj-y := util.o

obj-$(CONFIG_SYSVIPC) += msg.o sem.o shm.o
+obj-$(CONFIG_POSIX_MQUEUE) += mqueue.o

include $(TOPDIR)/Rules.make
diff -urN linux-org/ipc/mqueue.c linux-patched/ipc/mqueue.c
--- linux-org/ipc/mqueue.c Thu Jan 1 01:00:00 1970
+++ linux-patched/ipc/mqueue.c Sat Nov 30 16:54:00 2002
@@ -0,0 +1,1055 @@
+#include <linux/mqueue.h>
+#include <linux/slab.h>
+#include <linux/list.h>
+#include <linux/module.h>
+#include <linux/smp_lock.h>
+#include <linux/poll.h>
+#include <linux/sched.h>
+#include <linux/mount.h>
+
+#include <asm/current.h>
+#include <asm/siginfo.h>
+#include <asm/uaccess.h>
+
+
+#define MQUEUE_MAGIC 0x19800202
+#define DIRENT_SIZE 20
+#define FILENT_SIZE (sizeof(long)+sizeof(int)*2+sizeof(pid_t))
+
+
+struct msg { /* this represent particular message */
+ int msg_len; /* in the queue */
+ unsigned int msg_prio;
+ char *mtext;
+};
+
+struct ext_wait_queue { /* extra data (priority) for sleeping process */
+ int prio; /* priority of the process(!) */
+ pid_t associated; /* and it's pid */
+ struct ext_wait_queue *next;
+ struct ext_wait_queue *prev;
+};
+
+
+/* this stores extra data for inode - queue specific data */
+struct mqueue_inode_info {
+ struct mq_attr attr;
+ /* table of sorted pointers to messages */
+ struct msg *messages[MQ_MAXMSG];
+ pid_t notify_pid; /* who we have to notify (or 0) */
+ struct sigevent notify; /* notification */
+ /* for processes waiting for free space or message (respectively) */
+ wait_queue_head_t wait_q[2];
+ /* avoids extra invocations of wake_up*/
+ wait_queue_head_t wait_q2[2];
+ struct ext_wait_queue *e_wait_q[2]; /* 0=free space 1=message */
+ struct semaphore mq_sem;
+ /* size of queue in memory (msgs&struct) */
+ long qsize;
+
+ /* VFS stuff */
+ struct inode vfs_inode;
+};
+
+
+static long msgs_size; /* sum of sizes of all msgs in all queues */
+static int queues_count; /* number of existing queues */
+static struct semaphore mq_sem; /* main queues semaphore */
+
+/* fs stuff */
+static inline struct mqueue_inode_info *MQUEUE_I(struct inode *ino)
+{
+ return list_entry(ino, struct mqueue_inode_info, vfs_inode);
+}
+
+static kmem_cache_t *mqueue_inode_cachep;
+
+static struct vfsmount *mounted_mq;
+
+static int mqueue_ioctl_file (struct inode * inode, struct file * filp,
+ unsigned int cmd, unsigned long arg);
+static unsigned int mqueue_poll_file (struct file *, struct poll_table_struct *);
+static struct super_block *mqueue_get_sb (struct file_system_type *fs_type,
+ int flags, char *dev_name, void *data);
+static int mqueue_create (struct inode *dir, struct dentry *dent, int mode);
+static struct inode *mqueue_alloc_inode(struct super_block *sb);
+static void mqueue_destroy_inode(struct inode *inode);
+static int mqueue_release_file(struct inode *ino, struct file * f);
+static void mqueue_delete_inode (struct inode *ino);
+static int mqueue_unlink (struct inode *dir, struct dentry *dent);
+static ssize_t mqueue_read_file (struct file *, char *, size_t, loff_t *);
+
+static struct inode *mqueue_get_inode(struct super_block *sb, int mode);
+
+static struct inode_operations mqueue_dir_inode_operations = {
+ .lookup = simple_lookup,
+ .create = mqueue_create,
+ .unlink = mqueue_unlink,
+};
+
+static struct inode_operations mqueue_file_inode_operations = {};
+
+static struct file_operations mqueue_file_operations = {
+ .ioctl = mqueue_ioctl_file,
+ .release = mqueue_release_file,
+ .poll = mqueue_poll_file,
+ .read = mqueue_read_file,
+};
+
+static struct super_operations mqueue_super_ops = {
+ .alloc_inode = mqueue_alloc_inode,
+ .destroy_inode = mqueue_destroy_inode,
+ .statfs = simple_statfs,
+ .delete_inode = mqueue_delete_inode,
+ .drop_inode = generic_delete_inode,
+};
+
+static struct file_system_type mqueue_fs_type = {
+ .owner = THIS_MODULE,
+ .name = "mqueue",
+ .get_sb = mqueue_get_sb,
+ .kill_sb = kill_litter_super,
+};
+
+
+/*
+* GENERAL FUNCTIONS FOR FS CREATION
+*/
+
+/*
+* auxiliary function - produce a new inode
+*/
+static struct inode *mqueue_get_inode(struct super_block *sb, int mode)
+{
+struct inode *inode;
+struct mqueue_inode_info *ino_extra;
+
+ inode = new_inode(sb);
+ if (inode) {
+ inode->i_mode = mode;
+ inode->i_uid = current->fsuid;
+ inode->i_gid = current->fsgid;
+ inode->i_blksize = PAGE_SIZE;
+ inode->i_blocks = 0;
+ inode->i_rdev = NODEV;
+ inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+ if ( (mode & S_IFMT) == S_IFREG) {
+ inode->i_op = &mqueue_file_inode_operations;
+ inode->i_fop = &mqueue_file_operations;
+ inode->i_size = FILENT_SIZE;
+ /* mqueue specific info */
+ ino_extra = MQUEUE_I(inode);
+ init_MUTEX(&(ino_extra->mq_sem));
+ init_waitqueue_head((&(ino_extra->wait_q[0])));
+ init_waitqueue_head((&(ino_extra->wait_q[1])));
+ init_waitqueue_head((&(ino_extra->wait_q2[0])));
+ init_waitqueue_head((&(ino_extra->wait_q2[1])));
+ ino_extra->e_wait_q[0] = NULL;
+ ino_extra->e_wait_q[1] = NULL;
+ ino_extra->notify_pid = 0;
+ ino_extra->notify.sigev_signo = 0;
+ ino_extra->notify.sigev_notify = 0;
+ ino_extra->qsize = sizeof(struct mqueue_inode_info);
+ ino_extra->attr.mq_curmsgs = 0;
+ /* fill up with defaults
+ * (mq_open will set it up via next ioctl call) */
+ ino_extra->attr.mq_maxmsg = 0;
+ ino_extra->attr.mq_msgsize = 0;
+ }else if((mode & S_IFMT) == S_IFDIR) {
+ inode->i_nlink++;
+ /* Some things misbehave if size == 0 on a directory */
+ inode->i_size = 2 * DIRENT_SIZE;
+ inode->i_op = &mqueue_dir_inode_operations;
+ inode->i_fop = &simple_dir_operations;
+ }
+ }
+ return inode;
+}
+
+
+static int mqueue_parse_options(char *options,int *mode,uid_t *uid,gid_t *gid, int silent)
+{
+ char *this_char, *value, *rest;
+
+ while ((this_char = strsep(&options, ",")) != NULL) {
+ if (!*this_char)
+ continue;
+ if ((value = strchr(this_char,'=')) != NULL) {
+ *value++ = 0;
+ } else {
+ if(!silent)
+ printk(KERN_ERR
+ "mqueuefs: No value for mount option '%s'\n",
+ this_char);
+ return 1;
+ }
+
+ if (!strcmp(this_char,"mode")) {
+ if (!mode)
+ continue;
+ *mode = simple_strtoul(value,&rest,8);
+ if (*rest)
+ goto bad_val;
+ } else if (!strcmp(this_char,"uid")) {
+ if (!uid)
+ continue;
+ *uid = simple_strtoul(value,&rest,0);
+ if (*rest)
+ goto bad_val;
+ } else if (!strcmp(this_char,"gid")) {
+ if (!gid)
+ continue;
+ *gid = simple_strtoul(value,&rest,0);
+ if (*rest)
+ goto bad_val;
+ } else {
+ if(!silent)
+ printk(KERN_ERR "mqueuefs: Bad mount option %s\n",
+ this_char);
+ return 1;
+ }
+ }
+ return 0;
+
+bad_val:
+ if(!silent)
+ printk(KERN_ERR "mqueuefs: Bad value '%s' for mount option '%s'\n",
+ value, this_char);
+ return 1;
+
+}
+
+
+/* function for get_sb_nodev. Fill up our data in super block */
+static int mqueue_fill_super(struct super_block * sb, void * data, int silent)
+{
+ struct inode * inode;
+ uid_t uid = current->fsuid;
+ gid_t gid = current->fsgid;
+ int mode = S_IRWXUGO;
+
+ if (mqueue_parse_options (data, &mode, &uid, &gid, silent))
+ return -EINVAL;
+ sb->s_blocksize = PAGE_SIZE;
+ sb->s_blocksize_bits = PAGE_SHIFT;
+ sb->s_magic = MQUEUE_MAGIC;
+ sb->s_op = &mqueue_super_ops;
+
+ inode = mqueue_get_inode(sb, S_IFDIR | mode);
+
+ if (!inode)
+ return -ENOMEM;
+ inode->i_uid = uid;
+ inode->i_gid = gid;
+
+ sb->s_root = d_alloc_root(inode);
+
+ if (!sb->s_root) {
+ iput(inode);
+ return -ENOMEM;
+ }
+ return 0;
+}
+
+/* called when mounting */
+static struct super_block *mqueue_get_sb(struct file_system_type *fs_type,
+ int flags, char *dev_name, void *data)
+{
+ return get_sb_nodev(fs_type,flags,data,mqueue_fill_super);
+}
+
+
+static struct inode *mqueue_alloc_inode(struct super_block *sb)
+{
+ struct mqueue_inode_info *ei;
+ ei = (struct mqueue_inode_info *)kmem_cache_alloc(mqueue_inode_cachep, SLAB_KERNEL);
+ if (!ei)
+ return NULL;
+ return &ei->vfs_inode;
+}
+
+static void mqueue_destroy_inode(struct inode *inode)
+{
+ kmem_cache_free(mqueue_inode_cachep, MQUEUE_I(inode));
+}
+
+static void init_once(void *foo, kmem_cache_t *cachep, unsigned long flags)
+{
+ struct mqueue_inode_info *p = (struct mqueue_inode_info *) foo;
+
+ if ((flags & (SLAB_CTOR_VERIFY|SLAB_CTOR_CONSTRUCTOR)) ==
+ SLAB_CTOR_CONSTRUCTOR) {
+ inode_init_once(&p->vfs_inode);
+ }
+}
+
+
+static int init_inode_cache(void)
+{
+ mqueue_inode_cachep = kmem_cache_create("mqueue_inode_cache",
+ sizeof(struct mqueue_inode_info),
+ 0, SLAB_HWCACHE_ALIGN,
+ init_once, NULL);
+ if (mqueue_inode_cachep == NULL)
+ return -ENOMEM;
+ return 0;
+}
+
+static void destroy_inode_cache(void)
+{
+ if (kmem_cache_destroy(mqueue_inode_cachep))
+ printk(KERN_INFO "mqueue_inode_cache: not all structures were freed\n");
+}
+
+
+/*
+* init function
+*/
+static int __init init_mqueue_fs(void)
+{
+ int error;
+ struct vfsmount * res;
+ error = init_inode_cache();
+ if(error) {
+ printk (KERN_ERR "Could not init inode cache for mqueue filesystem\n");
+ return error;
+ }
+
+ error = register_filesystem(&mqueue_fs_type);
+ if (error) {
+ printk (KERN_ERR "Could not register mqueue filesystem\n");
+ goto out_inodecache;
+ }
+
+ res = kern_mount(&mqueue_fs_type);
+ if (IS_ERR (res)) {
+ error = PTR_ERR(res);
+ printk (KERN_ERR "Could not kern_mount mqueue filesystem\n");
+ goto out_unregister;
+ }
+ mounted_mq = res;
+
+ /* internal initialization - not common for vfs*/
+ msgs_size = 0;
+ queues_count = 0;
+ init_MUTEX(&mq_sem);
+
+ return 0;
+
+out_unregister:
+ unregister_filesystem(&mqueue_fs_type);
+out_inodecache:
+ destroy_inode_cache();
+ return error;
+}
+
+static void __exit exit_mqueue_fs(void)
+{
+ unregister_filesystem(&mqueue_fs_type);
+ mntput(mounted_mq);
+}
+
+module_init(init_mqueue_fs)
+module_exit(exit_mqueue_fs)
+
+MODULE_LICENSE("GPL");
+
+
+static void mqueue_delete_inode (struct inode *ino)
+{
+ int i;
+ struct mqueue_inode_info *info;
+
+ if ( (ino->i_mode & S_IFMT) == S_IFDIR) {
+ clear_inode(ino);
+ return;
+ }
+ info = MQUEUE_I(ino);
+ i = 0;
+ down(&info->mq_sem);
+ while (info->attr.mq_curmsgs > 0) {
+ kfree(info->messages[i]->mtext);
+ msgs_size -= info->messages[i]->msg_len;
+ kfree(info->messages[i]);
+ info->messages[i] = NULL;
+ i++;
+ info->attr.mq_curmsgs--;
+ }
+ up(&info->mq_sem);
+ clear_inode(ino);
+ down(&mq_sem);
+ queues_count--;
+ up(&mq_sem);
+}
+
+static int mqueue_unlink (struct inode *dir, struct dentry *dent)
+{
+ struct inode *inode = dent->d_inode;
+
+ dir->i_size -= DIRENT_SIZE;
+ inode->i_nlink--;
+ dput(dent);
+ return 0;
+}
+
+
+static int mqueue_create(struct inode *dir, struct dentry *dent, int mode)
+{
+struct inode * ino;
+
+ down(&mq_sem);
+ if(queues_count>=MQ_MAX) {
+ up(&mq_sem);
+ return -ENOSPC;
+ }
+ ino = mqueue_get_inode(dir->i_sb,mode);
+ if(!ino) {
+ up(&mq_sem);
+ return -ENOMEM;
+ }
+
+ queues_count++;
+ up(&mq_sem);
+
+ dir->i_size += DIRENT_SIZE;
+ dir->i_ctime = dir->i_mtime = CURRENT_TIME;
+
+ d_instantiate(dent,ino);
+ dget(dent);
+ return 0;
+}
+
+/*
+* This is routine for system read from queue file.
+* To avoid mess with doing some
+* sort of mq_receive here we allow to read only: queue size &
+* notification info (the only values that are interesting from user
+* point of view and aren't accessible through std. routines
+*/
+static ssize_t mqueue_read_file(struct file *file, char *data, size_t size, loff_t *off)
+{
+ char * buffer;
+ struct mqueue_inode_info *info = MQUEUE_I(file->f_dentry->d_inode);
+ int retval=0;
+
+ if (*off>=FILENT_SIZE)
+ return 0;
+ buffer = kmalloc(FILENT_SIZE,GFP_KERNEL);
+ if (buffer==NULL)
+ return -ENOMEM;
+
+ *((long *)buffer) = info->qsize;
+ *((pid_t *)(buffer+sizeof(long))) = info->notify_pid;
+ *((int *)(buffer+sizeof(long)+sizeof(pid_t))) =
+ info->notify.sigev_signo;
+ *((int *)(buffer+sizeof(long)+sizeof(pid_t)+sizeof(int))) =
+ info->notify.sigev_notify;
+ retval = FILENT_SIZE - *off;
+ if (copy_to_user(data,buffer+*off,retval))
+ return -EFAULT;
+ *off += retval;
+ return retval;
+}
+
+
+static int mqueue_release_file(struct inode *ino, struct file * f)
+{
+ struct mqueue_inode_info *info = MQUEUE_I(ino);
+
+ down(&info->mq_sem);
+ if (info->notify_pid == current->pid) {
+ info->notify_pid = 0;
+ info->notify.sigev_signo = 0;
+ info->notify.sigev_notify = 0;
+ }
+ up(&info->mq_sem);
+ return 0;
+}
+
+
+static unsigned int mqueue_poll_file(struct file *file,
+ struct poll_table_struct *poll_tab)
+{
+ struct mqueue_inode_info *info = MQUEUE_I(file->f_dentry->d_inode);
+ int retval = 0;
+
+ poll_wait(file, &info->wait_q[0], poll_tab);
+ poll_wait(file, &info->wait_q[1], poll_tab);
+
+ down(&info->mq_sem);
+ if (info->attr.mq_curmsgs)
+ retval = POLLIN | POLLRDNORM;
+
+ if (info->attr.mq_curmsgs < info->attr.mq_maxmsg)
+ retval |= POLLOUT | POLLWRNORM;
+ up(&info->mq_sem);
+
+ return retval;
+}
+
+
+/*
+* CORE MQUEUE FUNCTIONS
+*/
+
+
+/* the same as in sched.c but added up and down of out sem*/
+/* FIXME: why there was a bug on SMP with std. function used ????? */
+void wq_sleep_on(wait_queue_head_t *q,struct mqueue_inode_info *info)
+{
+ unsigned long flags;
+ wait_queue_t wait;
+ init_waitqueue_entry(&wait, current);
+
+ current->state = TASK_UNINTERRUPTIBLE;
+ spin_lock_irqsave(&q->lock,flags);
+ __add_wait_queue(q, &wait);
+ spin_unlock(&q->lock);
+
+ up(&info->mq_sem);
+ schedule();
+ down(&info->mq_sem);
+
+ spin_lock_irq(&q->lock);
+ __remove_wait_queue(q, &wait);
+ spin_unlock_irqrestore(&q->lock, flags);
+}
+
+
+static void wq_remove(struct mqueue_inode_info *info, int sr, pid_t p)
+{
+ struct ext_wait_queue *ptr;
+
+ if (info->e_wait_q[sr] != NULL) {
+ ptr = info->e_wait_q[sr];
+ do {
+ if (ptr->associated == p) {
+ if (ptr == info->e_wait_q[sr]) {
+ if (ptr->next == ptr)
+ info->e_wait_q[sr] =
+ NULL;
+ else
+ info->e_wait_q[sr] =
+ ptr->next;
+ }
+ ptr->next->prev = ptr->prev;
+ ptr->prev->next = ptr->next;
+ kfree(ptr);
+ break;
+ }
+ ptr = ptr->next;
+ } while (ptr != info->e_wait_q[sr]);
+ }
+}
+
+/* adds current to info->e_wait_q[sr] */
+static inline int wq_add(struct mqueue_inode_info *info, int sr)
+{
+ struct ext_wait_queue *tmp, *ptr;
+
+ ptr = info->e_wait_q[sr];
+ tmp = kmalloc(sizeof(struct ext_wait_queue), GFP_KERNEL);
+ if (tmp == NULL)
+ return -2;
+ tmp->associated = current->pid;
+ tmp->prio = current->static_prio;
+
+ if (ptr == NULL) { /* empty queue */
+ tmp->next = tmp;
+ tmp->prev = tmp;
+ info->e_wait_q[sr] = tmp;
+ } else {
+/* if (ptr->prio >= current->static_prio)
+ ptr = NULL;*/
+ if (ptr->prio < current->static_prio)
+ ptr = NULL;
+ else {
+ do { /* looking up for a place */
+ ptr = ptr->next;
+/* if (ptr->prio >= current->static_prio)*/
+ if (ptr->prio < current->static_prio)
+ break;
+ } while (ptr != info->e_wait_q[sr]);
+ }
+
+ if (ptr == NULL) { /* it should be placed first */
+ tmp->next = info->e_wait_q[sr];
+ tmp->prev = info->e_wait_q[sr]->prev;
+ info->e_wait_q[sr]->prev = tmp;
+ tmp->prev->next = tmp;
+ info->e_wait_q[sr] = tmp;
+ } else { /* it should be placed before ptr element */
+ tmp->next = ptr;
+ tmp->prev = ptr->prev;
+ ptr->prev = tmp;
+ tmp->prev->next = tmp;
+ }
+ }
+ return 0;
+}
+
+/* removes from info->e_wait_q[sr] current process.
+ * Only for wq_sleep(): as we are here current must be one
+ * before-first (meaning first in order) */
+static inline void wq_delete(struct mqueue_inode_info *info, int sr)
+{
+ struct ext_wait_queue *tmp;
+
+ tmp = info->e_wait_q[sr]->prev;
+ if (tmp->next == tmp) {
+ kfree(info->e_wait_q[sr]);
+ info->e_wait_q[sr] = NULL;
+ } else {
+ info->e_wait_q[sr]->prev = tmp->prev;
+ tmp->prev->next = info->e_wait_q[sr];
+ kfree(tmp);
+ }
+}
+
+/* adds current process
+ * sr: 0-send 1-receive
+ * Returns: 0=ok -1=signal -2=memory allocation error -3=timeout passed*/
+static int wq_sleep(struct mqueue_inode_info *info, int sr, signed long timeout)
+{
+ wait_queue_t __wait;
+ long retval;
+
+ if(wq_add(info,sr)<0)
+ return -2;
+
+ init_waitqueue_entry(&__wait, current);
+ add_wait_queue(&(info->wait_q[sr]), &__wait);
+
+ for (;;) {
+ set_current_state(TASK_INTERRUPTIBLE);
+ if ((current->pid ==
+ info->e_wait_q[sr]->prev->associated)
+ && ((info->attr.mq_curmsgs > 0 && sr == 1)
+ || (info->attr.mq_curmsgs <
+ info->attr.mq_maxmsg && sr == 0)))
+ break;
+ if (!signal_pending(current)) {
+ up(&info->mq_sem);
+ retval = schedule_timeout(timeout);
+ down(&info->mq_sem);
+ if ((!retval) && (!signal_pending(current))) {
+ remove_wait_queue(&(info->wait_q[sr]),
+ &__wait);
+ wq_remove(info, sr, current->pid);
+ return -3;
+ }
+ continue;
+ } else {
+ current->state = TASK_RUNNING;
+ remove_wait_queue(&(info->wait_q[sr]),
+ &__wait);
+ wq_remove(info, sr, current->pid);
+ return -1;
+ }
+ }
+ current->state = TASK_RUNNING;
+ remove_wait_queue(&(info->wait_q[sr]), &__wait);
+ wq_delete(info,sr);
+
+ return 0;
+}
+
+/* wakes up all sleeping processes in queue */
+static void wq_wakeup(struct mqueue_inode_info *info, int sr)
+{
+ if (sr == 0) {
+ /* We can't invoke wake_up for processes waiting for free space
+ * if there is less then MAXMSG-1 messages - then wake_up was
+ * invoked previously (and finished) but mq_sleep() of proper
+ * (only one) process didn't start to continue running yet,
+ * thus we must wait until this process receives IT'S message
+ */
+ if ((info->attr.mq_curmsgs <
+ info->attr.mq_maxmsg - 1)
+ && (info->e_wait_q[sr] != NULL)) {
+ wq_sleep_on(&(info->wait_q2[sr]),info);
+ }
+ } else {
+ /* As above but for processes waiting for new message */
+ if ((info->attr.mq_curmsgs > 1)
+ && (info->e_wait_q[sr] != NULL)) {
+ wq_sleep_on(&(info->wait_q2[sr]),info);
+ }
+ }
+ /* We can wake up now - either all are sleeping or
+ * queue is empty */
+ wake_up_all((&(info->wait_q[sr])));
+}
+
+/*
+ * Invoked via ioctl to do the rest of work when creating new queue:
+ * set limits
+ */
+static int mq_create_ioctl(struct inode *ino, struct mq_attr *u_attr)
+{
+ struct mq_attr attr;
+ struct mqueue_inode_info *info = MQUEUE_I(ino);
+
+ if (u_attr != NULL) {
+ if (copy_from_user(&attr, u_attr, sizeof(struct mq_attr)))
+ return -EFAULT;
+ if (attr.mq_maxmsg <= 0
+ || attr.mq_msgsize <= 0
+ || attr.mq_maxmsg > MQ_MAXMSG
+ || attr.mq_msgsize > MQ_MSGSIZE)
+ return -EINVAL;
+ down(&info->mq_sem);
+ info->attr.mq_maxmsg = attr.mq_maxmsg;
+ info->attr.mq_msgsize = attr.mq_msgsize;
+ }else{
+ down(&info->mq_sem);
+ info->attr.mq_maxmsg = MQ_MAXMSG;
+ info->attr.mq_msgsize = MQ_MSGSIZE;
+ }
+ up(&info->mq_sem);
+ return 0;
+}
+
+
+int mq_send_ioctl(struct inode * ino, int oflag, const char *msg_ptr, int msg_len,
+ unsigned int msg_prio, struct timespec *ts_ptr)
+{
+ struct siginfo sig_i;
+ struct msg *tmp_ptr1;
+ char *tmp_ptr2;
+ int sleep_ret, i;
+ struct mqueue_inode_info *info = MQUEUE_I(ino);
+ struct timespec ts;
+ long timeout;
+ down(&info->mq_sem);
+ /* checks if O_NONBLOCK is set and queue is full */
+ if ((oflag & O_NONBLOCK) != 0)
+ if (info->attr.mq_curmsgs ==
+ info->attr.mq_maxmsg) {
+ up(&info->mq_sem);
+ return -EAGAIN;
+ }
+ /* checks msg_prio boundary */
+ if ((unsigned int) msg_prio >= (unsigned int) MQ_PRIO_MAX) {
+ up(&info->mq_sem);
+ return -EINVAL;
+ }
+ /* checks if message isn't too large */
+ if (msg_len > info->attr.mq_msgsize) {
+ up(&info->mq_sem);
+ return -EMSGSIZE;
+ }
+ /* get & validate timeout */
+ if(ts_ptr)
+ {
+ if (copy_from_user(&ts,ts_ptr,sizeof(struct timespec)))
+ {
+ up(&info->mq_sem);
+ return -EFAULT;
+ }
+ if (ts.tv_nsec<0 || ts.tv_sec<0 || ts.tv_nsec>=1000000000L)
+ {
+ up(&info->mq_sem);
+ return -EINVAL;
+ }
+ /* it schould be enough */
+ timeout = timespec_to_jiffies(&ts);
+ }else
+ timeout = MAX_SCHEDULE_TIMEOUT;
+
+ /* checks if queue is full -> I'm waitng as O_NONBLOCK isn't */
+ /* set then. mq_receive wakes up only 1 process */
+
+ if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
+ sleep_ret = wq_sleep(info, 0, timeout);
+ if (sleep_ret == -1) {
+ up(&info->mq_sem);
+ return -EINTR;
+ } else if (sleep_ret == -2) {
+ up(&info->mq_sem);
+ return -ENOMEM;
+ } else if (sleep_ret == -3) {
+ up(&info->mq_sem);
+ return -ETIMEDOUT;
+ }
+ }
+ down(&mq_sem);
+ /* check if this message will exceed overall limit for mesages */
+ if (msgs_size + msg_len > MQ_MAXSYSSIZE) {
+ up(&info->mq_sem);
+ up(&mq_sem);
+ return -ENOMEM;
+ }
+
+ /* first try to allocate memory, before doing anything with
+ * existing queues */
+ tmp_ptr1 = kmalloc(sizeof(struct msg), GFP_KERNEL);
+ if (!tmp_ptr1) {
+ up(&info->mq_sem);
+ up(&mq_sem);
+ return -ENOMEM;
+ }
+ tmp_ptr2 = kmalloc(msg_len, GFP_KERNEL);
+ if (!tmp_ptr2) {
+ up(&info->mq_sem);
+ up(&mq_sem);
+ kfree(tmp_ptr1);
+ return -ENOMEM;
+ }
+ /* adds message to the queue */
+ i = info->attr.mq_curmsgs - 1;
+ while (i >= 0 && info->messages[i]->msg_prio < msg_prio) {
+ info->messages[i + 1] = info->messages[i];
+ i--;
+ }
+
+ i++; /* i == position */
+ info->messages[i] = tmp_ptr1;
+ info->messages[i]->msg_len = msg_len;
+ info->messages[i]->msg_prio = msg_prio;
+ info->messages[i]->mtext = tmp_ptr2;
+ if (copy_from_user(info->messages[i]->mtext, msg_ptr, msg_len))
+ {
+ up(&info->mq_sem);
+ up(&mq_sem);
+ kfree(tmp_ptr1);
+ kfree(tmp_ptr2);
+ printk(KERN_ERR " coping data from user failed\n");
+ return -EFAULT;
+ }
+ info->attr.mq_curmsgs++;
+ msgs_size += msg_len;
+ up(&mq_sem);
+ info->qsize += msg_len;
+
+ /* notification
+ * invoked when there is registered process and there isn't process
+ * waiting synchronously for message AND state of queue changed from
+ * empty to not empty*/
+ if (info->notify_pid != 0
+ && info->e_wait_q[1] == NULL
+ && info->attr.mq_curmsgs == 1) {
+ /* TODO:
+ * Add support for sigev_notify==SIGEV_THREAD
+ */
+ /* sends signal */
+ if (info->notify.sigev_notify == SIGEV_SIGNAL) {
+ sig_i.si_signo = info->notify.sigev_signo;
+ sig_i.si_errno = 0;
+ sig_i.si_code = SI_MESGQ;
+ sig_i.si_pid = current->pid;
+ sig_i.si_uid = current->uid;
+ kill_proc_info(info->notify.sigev_signo,
+ &sig_i, info->notify_pid);
+ }
+ /* after notification unregisters process */
+ info->notify_pid = 0;
+ }
+
+ /* after sending message we must wake up (ONLY 1 no matter which) */
+ /* process sleeping in wq_wakeup() */
+ wake_up(&(info->wait_q2[0]));
+
+ /* wakes up processes waiting for message */
+ wq_wakeup(info, 1);
+ up(&info->mq_sem);
+ return 0;
+}
+
+
+size_t mq_receive_ioctl(struct inode *ino, long oflag, char *msg_ptr, int msg_len,
+ unsigned int *msg_prio, struct timespec *ts_ptr)
+{
+ int i, retval, sleep_ret;
+ struct mqueue_inode_info *info = MQUEUE_I(ino);
+ struct timespec ts;
+ long timeout;
+ down(&info->mq_sem);
+
+ /* checks if O_NONBLOCK is set and queue is empty */
+ if ((oflag & O_NONBLOCK) != 0)
+ if (info->attr.mq_curmsgs == 0) {
+ up(&info->mq_sem);
+ return -EAGAIN;
+ }
+
+ /* get & validate timeout */
+ if(ts_ptr)
+ {
+ if (copy_from_user(&ts,ts_ptr,sizeof(struct timespec)))
+ {
+ up(&info->mq_sem);
+ return -EFAULT;
+ }
+ if (ts.tv_nsec<0 || ts.tv_sec<0 || ts.tv_nsec>=1000000000L)
+ {
+ up(&info->mq_sem);
+ return -EINVAL;
+ }
+ /* it schould be enough */
+ timeout = timespec_to_jiffies(&ts);
+ }else
+ timeout = MAX_SCHEDULE_TIMEOUT;
+
+
+
+ /* checks if queue is empty -> as O_NONBLOCK isn't set then
+ * we must wait */
+ if (info->attr.mq_curmsgs == 0) {
+ sleep_ret = wq_sleep(info, 1, timeout);
+ if (sleep_ret == -1) {
+ up(&info->mq_sem);
+ return -EINTR;
+ } else if (sleep_ret == -2) {
+ up(&info->mq_sem);
+ return -ENOMEM;
+ } else if (sleep_ret == -3) {
+ up(&info->mq_sem);
+ return -ETIMEDOUT;
+ }
+ }
+ /* checks if buffer is big enough */
+ if (msg_len < info->messages[0]->msg_len) {
+ up(&info->mq_sem);
+ return -EMSGSIZE;
+ }
+
+ retval = info->messages[0]->msg_len;
+ /* gets message */
+ if (msg_prio != NULL)
+ if (put_user(info->messages[0]->msg_prio, msg_prio)){
+ up(&info->mq_sem);
+ return -EFAULT;
+ }
+ if (copy_to_user(msg_ptr, info->messages[0]->mtext,
+ info->messages[0]->msg_len))
+ {
+ up(&info->mq_sem);
+ return -EFAULT;
+ }
+ /* and deletes it */
+ kfree(info->messages[0]->mtext);
+ kfree(info->messages[0]);
+ for (i = 1; i < info->attr.mq_curmsgs; i++)
+ info->messages[i - 1] =
+ (void *) (info->messages[i]);
+ info->attr.mq_curmsgs--;
+ info->messages[info->attr.mq_curmsgs] = NULL;
+ /*decrease total space used by messages */
+ down(&mq_sem);
+ msgs_size -= retval;
+ up(&mq_sem);
+ info->qsize -= retval;
+
+ /* after receive we can wakeup 1 process waiting in wq_wakeup */
+ wake_up(&(info->wait_q2[1]));
+ /* wakes up processes waiting for sending message */
+ wq_wakeup(info, 0);
+ up(&info->mq_sem);
+ return retval;
+}
+
+
+int mq_notify_ioctl(struct inode *ino, const struct sigevent *u_notification)
+{
+ struct sigevent notification;
+ struct mqueue_inode_info *info=MQUEUE_I(ino);
+
+
+ if (u_notification != NULL)
+ if (copy_from_user(&notification, u_notification,
+ sizeof(struct sigevent)))
+ return -EFAULT;
+
+ down(&info->mq_sem);
+
+ if (info->notify_pid == current->pid
+ && (u_notification == NULL ||
+ notification.sigev_notify == SIGEV_NONE)) {
+ info->notify_pid = 0; /* remove notification */
+ info->notify.sigev_signo = 0;
+ info->notify.sigev_notify = 0;
+ } else if (info->notify_pid > 0) {
+ up(&info->mq_sem);
+ return -EBUSY;
+ } else if (u_notification != NULL &&
+ notification.sigev_notify != SIGEV_NONE) {
+ /* add notification */
+ info->notify_pid = current->pid;
+ info->notify.sigev_signo = notification.sigev_signo;
+ info->notify.sigev_notify = notification.sigev_notify;
+ }
+ up(&info->mq_sem);
+ return 0;
+}
+
+int mq_getattr_ioctl(struct inode *ino, int oflag, struct mq_attr *u_mqstat)
+{
+ struct mq_attr attr;
+ struct mqueue_inode_info *info = MQUEUE_I(ino);
+ int error = 0;
+
+ down(&info->mq_sem);
+
+ attr = info->attr;
+ attr.mq_flags = oflag;
+
+ if (u_mqstat != NULL)
+ if (copy_to_user(u_mqstat, &attr, sizeof(struct mq_attr)))
+ error = -EFAULT;
+ up(&info->mq_sem);
+ return error;
+}
+
+/*
+* IOCTL FUNCTION - demultiplexer for various mqueues operations
+*/
+
+static int mqueue_ioctl_file (struct inode * inode, struct file * filp,
+ unsigned int cmd, unsigned long arg)
+{
+ int ret=1;
+ struct ioctl_mq_sndrcv sndrcv_arg;
+ unlock_kernel();
+
+ switch(cmd)
+ {
+ case MQ_CREATE:
+ ret = mq_create_ioctl(inode,(struct mq_attr *)arg);
+ break;
+ case MQ_SEND:
+ if ((filp->f_flags & O_ACCMODE) == O_RDONLY)
+ return -EBADF;
+ if(copy_from_user(&sndrcv_arg,(void *)arg,sizeof(sndrcv_arg)))
+ {
+ printk(KERN_ERR " mqueue fs: can't copy data from user space");
+ break;
+ }
+ ret = mq_send_ioctl(inode,filp->f_flags,sndrcv_arg.msg_ptr,
+ sndrcv_arg.msg_len,sndrcv_arg.msg_prio,
+ sndrcv_arg.timeout);
+ break;
+ case MQ_RECEIVE:
+ if ((filp->f_flags & O_ACCMODE) == O_WRONLY)
+ return -EBADF;
+ if(copy_from_user(&sndrcv_arg,(void *)arg,sizeof(sndrcv_arg)))
+ {
+ printk(KERN_ERR " mqueue fs: can't copy data from user space");
+ break;
+ }
+ ret = mq_receive_ioctl(inode,filp->f_flags,(char *)sndrcv_arg.msg_ptr,
+ sndrcv_arg.msg_len,(unsigned *)sndrcv_arg.msg_prio,
+ sndrcv_arg.timeout);
+ break;
+ case MQ_NOTIFY:
+ ret = mq_notify_ioctl(inode, (struct sigevent *)arg);
+ break;
+ case MQ_GETATTR:
+ ret = mq_getattr_ioctl(inode, filp->f_flags,
+ (struct mq_attr *)arg);
+ break;
+ }
+
+ lock_kernel();
+ return ret;
+}


2002-12-01 10:25:57

by Manfred Spraul

[permalink] [raw]
Subject: Re: [PATCH] POSIX message queues, 2.5.50

Some notes:
- coding style: linux functions usually have only one return at the end
of the function, and goto internally. mqueue_parse_options() does that,
mqueue_create contains multiple returns.
- why do you allocate the ext_wait_queue structure dynamically? Put it
on the stack, that avoids error handling for failed allocations.
- reusing kernel functions is not a disadvantage - load_msg() and
store_msg() automagically split the kmalloc allocations into page sized
chunks.
- why do you use __add_wait_queue in wq_sleep_on()? It seems you have
copied that code from kernel/sched.c - it's not needed. It was needed for

cli()
if(condition_var==0)
sleep_on(&my_queue);

--
Manfred



2002-12-01 11:21:15

by Russell King

[permalink] [raw]
Subject: Re: [PATCH] POSIX message queues, 2.5.50

On Sun, Dec 01, 2002 at 11:33:11AM +0100, Manfred Spraul wrote:
> - why do you use __add_wait_queue in wq_sleep_on()? It seems you have
> copied that code from kernel/sched.c - it's not needed. It was needed for
>
> cli()
> if(condition_var==0)
> sleep_on(&my_queue);

Do we have to encourage this abonimation? We do have wait_event().

--
Russell King ([email protected]) The developer of ARM Linux
http://www.arm.linux.org.uk/personal/aboutme.html

2002-12-01 14:27:17

by Krzysztof Benedyczak

[permalink] [raw]
Subject: Re: [PATCH] POSIX message queues, 2.5.50

On Sun, 1 Dec 2002, Manfred Spraul wrote:

> Some notes:
> - coding style: linux functions usually have only one return at the end
> of the function, and goto internally. mqueue_parse_options() does that,
> mqueue_create contains multiple returns.
Ok, I've fixed it.
I just didn't know that this also belongs to coding style (I don't
like goto's much and it isn't in CodingStyle ;-).

> - why do you allocate the ext_wait_queue structure dynamically? Put it
> on the stack, that avoids error handling for failed allocations.
Hmh, you remind me about one thing that I'm constantly forgetting.
In fact allocation will stay (it is dynamic queue and it must be that way)
but I should use there list.h stuff. My fault.

> - reusing kernel functions is not a disadvantage - load_msg() and
> store_msg() automagically split the kmalloc allocations into page sized
> chunks.
Of course. I just wanted to be fair. There were pointed out main
differences (in first place) and some advantages.

> - why do you use __add_wait_queue in wq_sleep_on()? It seems you have
> copied that code from kernel/sched.c - it's not needed. It was needed for
>
> cli()
> if(condition_var==0)
> sleep_on(&my_queue);
>
And from Russell King:
> Do we have to encourage this abonimation? We do have
> wait_event().

wait_event is rather not good as I don't have condition to check - in that
case I just place process in a queue and wait for wake_up.
But I agree that it is ugly - I used it only as a quick fix for one bug.
Now I think I have good solution but I have to test it first.


Thanks for advices - new patch will be placed on
http://www.mat.uni.torun.pl/~wrona/posix_ipc
on Tuesday along with new library.

Krzysiek Benedyczak

2002-12-01 15:09:22

by Manfred Spraul

[permalink] [raw]
Subject: Re: [PATCH] POSIX message queues, 2.5.50

Krzysztof Benedyczak wrote:

>wait_event is rather not good as I don't have condition to check - in that
>case I just place process in a queue and wait for wake_up.
>But I agree that it is ugly - I used it only as a quick fix for one bug.
>Now I think I have good solution but I have to test it first.
>
>
The bad thing is that you use __add_wait_queue() and call the wait queue
locking functions yourself. This is not needed. The only function that
was permitted to do that is sleep_on(), and that will die soon. I've
quoted sleep_on as a reminder to kill that code in kernel/sched.c.
Just use add_wait_queue() instead of the internal functions. Or
prepare_to_wait/finish_wait, that has a slighly lower locking overhead.

Btw, could you explain how your message priority implementation works?

If I understand it correctly, wq_add maintains a priority sorted linked
list. wq_sleep() waits until the process becomes the first entry in the
priority queue.
- You use the pid value as the thread identifier - why? Usually the task
struct pointer is used within the kernel.
- Is it correct that wq_wakeup wakes up all processes that sleep in
wq_send, and then the highest priority process continues? What about
waking up just the highest priority process? Look at the wakeup code in
ipc/msg.c - it implement message types that way. The sender looks
through the list of waiting receivers, and directly sends the message to
the right receiver [pipelined_send()]

--
Manfred




2002-12-02 20:20:17

by Krzysztof Benedyczak

[permalink] [raw]
Subject: Re: [PATCH] POSIX message queues, 2.5.50

On Sun, 1 Dec 2002, Manfred Spraul wrote:

> The bad thing is that you use __add_wait_queue() and call the wait queue
> locking functions yourself. This is not needed. The only function that
> was permitted to do that is sleep_on(), and that will die soon. I've
> quoted sleep_on as a reminder to kill that code in kernel/sched.c.
> Just use add_wait_queue() instead of the internal functions. Or
> prepare_to_wait/finish_wait, that has a slighly lower locking overhead.

Some more explanation. Originally I used sleep_on(). But when it occurred
to not work well on SMP I just make my own version of it with queue-wide
semaphore protecting whole code. Then it started to work, but it was ugly,
again I agree. Now I've made my own function taking wait_event() as a
template: I've removed checking for condition and add process to wait
queue with EXCLUSIVE flag. On uniprocessor it works well - but on SMP I
just a moment before had the same race - (wq were corrupted). When I added
sem IN this function it started to work ok. Why (I mean why it didn't
worked previously - in such a case I can imagine that wait_event would
also produce some races) ?

Now it looks like this:

+void inline wait_exclusive(wait_queue_head_t *wq)
+{
+ wait_queue_t wait;
+ init_waitqueue_entry(&wait, current);
+
+ add_wait_queue_exclusive(wq, &wait);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+
++ up(&queue_sem)
+ schedule();
++ down(&queue_sem);
+
+ current->state = TASK_RUNNING;
+ remove_wait_queue(wq, &wait);
+}

I hope it is acceptable (?)


>
> Btw, could you explain how your message priority implementation works?
>
> If I understand it correctly, wq_add maintains a priority sorted linked
> list. wq_sleep() waits until the process becomes the first entry in the
> priority queue.
> - You use the pid value as the thread identifier - why? Usually the task
> struct pointer is used within the kernel.

Ups :-). Yes it will let me remove one extra field from struct.

> - Is it correct that wq_wakeup wakes up all processes that sleep in
> wq_send, and then the highest priority process continues? What about
> waking up just the highest priority process? Look at the wakeup code in
> ipc/msg.c - it implement message types that way. The sender looks
> through the list of waiting receivers, and directly sends the message to
> the right receiver [pipelined_send()]

Yes it was correct, but useless. Now I wakeup only one process. But note
that we use different algorithm then in msg.c: we haven't "pipelined" s&r.

I have applied previous suggestions & made some improvements - it works
well.

If you have some further notices please send (especially about first
case).


Thanks again

Krzysiek Benedyczak