From: Alban Crequy <[email protected]>
Multicast is implemented on SOCK_DGRAM and SOCK_SEQPACKET unix sockets.
An userspace application can create a multicast group with:
struct unix_mreq mreq;
mreq.address.sun_family = AF_UNIX;
mreq.address.sun_path[0] = '\0';
strcpy(mreq.address.sun_path + 1, "socket-address");
mreq.flags = 0;
sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
ret = setsockopt(sockfd, SOL_UNIX, UNIX_CREATE_GROUP, &mreq, sizeof(mreq));
Then a multicast group can be joined and left with:
ret = setsockopt(sockfd, SOL_UNIX, UNIX_JOIN_GROUP, &mreq, sizeof(mreq));
ret = setsockopt(sockfd, SOL_UNIX, UNIX_LEAVE_GROUP, &mreq, sizeof(mreq));
A socket can be a member of several multicast group.
Signed-off-by: Alban Crequy <[email protected]>
Signed-off-by: Ian Molton <[email protected]>
Signed-off-by: Javier Martinez Canillas <[email protected]>
---
include/net/af_unix.h | 76 +++++++++++
net/unix/Kconfig | 9 ++
net/unix/af_unix.c | 339 ++++++++++++++++++++++++++++++++++++++++++++++++-
3 files changed, 422 insertions(+), 2 deletions(-)
diff --git a/include/net/af_unix.h b/include/net/af_unix.h
index 5a4e29b..c543f76 100644
--- a/include/net/af_unix.h
+++ b/include/net/af_unix.h
@@ -44,6 +44,60 @@ struct unix_skb_parms {
spin_lock_nested(&unix_sk(s)->lock, \
SINGLE_DEPTH_NESTING)
+/* UNIX socket options */
+#define UNIX_CREATE_GROUP 1
+#define UNIX_JOIN_GROUP 2
+#define UNIX_LEAVE_GROUP 3
+
+/* Flags on unix_mreq */
+
+/* On UNIX_JOIN_GROUP: the socket will receive its own messages */
+#define UNIX_MREQ_LOOPBACK 0x01
+
+/* ON UNIX_JOIN_GROUP: the messages will also be received by the peer */
+#define UNIX_MREQ_SEND_TO_PEER 0x02
+
+/* ON UNIX_JOIN_GROUP: just drop the message instead of blocking if the
+ * receiving queue is full */
+#define UNIX_MREQ_DROP_WHEN_FULL 0x04
+
+struct unix_mreq {
+ struct sockaddr_un address;
+ unsigned int flags;
+};
+
+struct unix_mcast_group {
+ /* RCU list of (struct unix_mcast)->member_node
+ * Messages sent to the multicast group are delivered to this list of
+ * members */
+ struct hlist_head mcast_members;
+
+ /* RCU list of (struct unix_mcast)->member_dead_node
+ * When the group dies, previous members' reference counters must be
+ * decremented */
+ struct hlist_head mcast_dead_members;
+
+ /* RCU list of (struct sock_set)->list */
+ struct hlist_head mcast_members_lists;
+
+ atomic_t mcast_members_cnt;
+
+ /* The generation is incremented each time a peer joins or
+ * leaves the group. It is used to invalidate old lists
+ * struct sock_set */
+ atomic_t mcast_membership_generation;
+
+ /* Locks to guarantee causal order in deliveries */
+#define MCAST_LOCK_CLASS_COUNT 8
+ spinlock_t lock[MCAST_LOCK_CLASS_COUNT];
+
+ /* The group is referenced by:
+ * - the socket who created the multicast group
+ * - the accepted sockets (SOCK_SEQPACKET only)
+ * - the current members of the group */
+ atomic_t refcnt;
+};
+
/* The AF_UNIX socket */
struct unix_sock {
/* WARNING: sk has to be the first member */
@@ -59,9 +113,31 @@ struct unix_sock {
spinlock_t lock;
unsigned int gc_candidate : 1;
unsigned int gc_maybe_cycle : 1;
+ unsigned int mcast_send_to_peer : 1;
+ unsigned int mcast_drop_when_peer_full : 1;
unsigned char recursion_level;
+ struct unix_mcast_group *mcast_group;
+
+ /* RCU List of (struct unix_mcast)->subscription_node
+ * A socket can subscribe to several multicast group
+ */
+ struct hlist_head mcast_subscriptions;
+
struct socket_wq peer_wq;
};
+
+struct unix_mcast {
+ struct unix_sock *member;
+ struct unix_mcast_group *group;
+ unsigned int flags;
+ struct hlist_node subscription_node;
+ /* A subscription cannot be both alive and dead but we cannot use the
+ * same field because RCU readers run lockless. member_dead_node is
+ * not read by lockless RCU readers. */
+ struct hlist_node member_node;
+ struct hlist_node member_dead_node;
+};
+
#define unix_sk(__sk) ((struct unix_sock *)__sk)
#define peer_wait peer_wq.wait
diff --git a/net/unix/Kconfig b/net/unix/Kconfig
index 8b31ab8..289d854 100644
--- a/net/unix/Kconfig
+++ b/net/unix/Kconfig
@@ -19,6 +19,15 @@ config UNIX
Say Y unless you know what you are doing.
+config UNIX_MULTICAST
+ depends on UNIX && EXPERIMENTAL
+ bool "Multicast over Unix domain sockets"
+ ---help---
+ If you say Y here, you will include support for multicasting on Unix
+ domain sockets. Support is available for SOCK_DGRAM and
+ SOCK_SEQPACKET. Certain types of delivery synchronisation are
+ provided, see Documentation/networking/multicast-unix-sockets.txt
+
config UNIX_DIAG
tristate "UNIX: socket monitoring interface"
depends on UNIX
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 3537f20..6f8fe57 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -119,6 +119,9 @@ struct hlist_head unix_socket_table[UNIX_HASH_SIZE + 1];
EXPORT_SYMBOL_GPL(unix_socket_table);
DEFINE_SPINLOCK(unix_table_lock);
EXPORT_SYMBOL_GPL(unix_table_lock);
+#ifdef CONFIG_UNIX_MULTICAST
+static DEFINE_SPINLOCK(unix_multicast_lock);
+#endif
static atomic_long_t unix_nr_socks;
#define unix_sockets_unbound (&unix_socket_table[UNIX_HASH_SIZE])
@@ -374,6 +377,28 @@ static void unix_sock_destructor(struct sock *sk)
#endif
}
+#ifdef CONFIG_UNIX_MULTICAST
+static void
+destroy_mcast_group(struct unix_mcast_group *group)
+{
+ struct unix_mcast *node;
+ struct hlist_node *pos;
+ struct hlist_node *pos_tmp;
+
+ BUG_ON(atomic_read(&group->refcnt) != 0);
+ BUG_ON(!hlist_empty(&group->mcast_members));
+
+ hlist_for_each_entry_safe(node, pos, pos_tmp,
+ &group->mcast_dead_members,
+ member_dead_node) {
+ hlist_del_rcu(&node->member_dead_node);
+ sock_put(&node->member->sk);
+ kfree(node);
+ }
+ kfree(group);
+}
+#endif
+
static int unix_release_sock(struct sock *sk, int embrion)
{
struct unix_sock *u = unix_sk(sk);
@@ -382,6 +407,11 @@ static int unix_release_sock(struct sock *sk, int embrion)
struct sock *skpair;
struct sk_buff *skb;
int state;
+#ifdef CONFIG_UNIX_MULTICAST
+ struct unix_mcast *node;
+ struct hlist_node *pos;
+ struct hlist_node *pos_tmp;
+#endif
unix_remove_socket(sk);
@@ -395,6 +425,23 @@ static int unix_release_sock(struct sock *sk, int embrion)
u->mnt = NULL;
state = sk->sk_state;
sk->sk_state = TCP_CLOSE;
+#ifdef CONFIG_UNIX_MULTICAST
+ spin_lock(&unix_multicast_lock);
+ hlist_for_each_entry_safe(node, pos, pos_tmp, &u->mcast_subscriptions,
+ subscription_node) {
+ hlist_del_rcu(&node->member_node);
+ hlist_del_rcu(&node->subscription_node);
+ atomic_dec(&node->group->mcast_members_cnt);
+ atomic_inc(&node->group->mcast_membership_generation);
+ hlist_add_head_rcu(&node->member_dead_node,
+ &node->group->mcast_dead_members);
+ if (atomic_dec_and_test(&node->group->refcnt))
+ destroy_mcast_group(node->group);
+ }
+ if (u->mcast_group && atomic_dec_and_test(&u->mcast_group->refcnt))
+ destroy_mcast_group(u->mcast_group);
+ spin_unlock(&unix_multicast_lock);
+#endif
unix_state_unlock(sk);
wake_up_interruptible_all(&u->peer_wait);
@@ -636,6 +683,9 @@ static struct sock *unix_create1(struct net *net, struct socket *sock)
atomic_long_set(&u->inflight, 0);
INIT_LIST_HEAD(&u->link);
mutex_init(&u->readlock); /* single task reading lock */
+#ifdef CONFIG_UNIX_MULTICAST
+ INIT_HLIST_HEAD(&u->mcast_subscriptions);
+#endif
init_waitqueue_head(&u->peer_wait);
unix_insert_socket(unix_sockets_unbound, sk);
out:
@@ -1056,6 +1106,10 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
struct sock *newsk = NULL;
struct sock *other = NULL;
struct sk_buff *skb = NULL;
+#ifdef CONFIG_UNIX_MULTICAST
+ struct unix_mcast *node;
+ struct hlist_node *pos;
+#endif
unsigned hash;
int st;
int err;
@@ -1083,6 +1137,7 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
newsk = unix_create1(sock_net(sk), NULL);
if (newsk == NULL)
goto out;
+ newu = unix_sk(newsk);
/* Allocate skb for sending to listening sock */
skb = sock_wmalloc(newsk, 1, 0, GFP_KERNEL);
@@ -1095,6 +1150,8 @@ restart:
if (!other)
goto out;
+ otheru = unix_sk(other);
+
/* Latch state of peer */
unix_state_lock(other);
@@ -1166,6 +1223,18 @@ restart:
goto out_unlock;
}
+#ifdef CONFIG_UNIX_MULTICAST
+ /* Multicast sockets */
+ hlist_for_each_entry_rcu(node, pos, &u->mcast_subscriptions,
+ subscription_node) {
+ if (node->group == otheru->mcast_group) {
+ atomic_inc(&otheru->mcast_group->refcnt);
+ newu->mcast_group = otheru->mcast_group;
+ break;
+ }
+ }
+#endif
+
/* The way is open! Fastly set all the necessary fields... */
sock_hold(sk);
@@ -1173,9 +1242,7 @@ restart:
newsk->sk_state = TCP_ESTABLISHED;
newsk->sk_type = sk->sk_type;
init_peercred(newsk);
- newu = unix_sk(newsk);
RCU_INIT_POINTER(newsk->sk_wq, &newu->peer_wq);
- otheru = unix_sk(other);
/* copy address information from listening to new sock*/
if (otheru->addr) {
@@ -1585,10 +1652,278 @@ out:
}
+#ifdef CONFIG_UNIX_MULTICAST
+static int unix_mc_create(struct socket *sock, struct unix_mreq *mreq)
+{
+ struct sock *other;
+ int err;
+ unsigned hash;
+ int namelen;
+ struct unix_mcast_group *mcast_group;
+ int i;
+
+ if (mreq->address.sun_family != AF_UNIX ||
+ mreq->address.sun_path[0] != '\0')
+ return -EINVAL;
+
+ err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+ if (err < 0)
+ return err;
+
+ namelen = err;
+ other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
+ sock->type, hash, &err);
+ if (other) {
+ sock_put(other);
+ return -EADDRINUSE;
+ }
+
+ mcast_group = kmalloc(sizeof(struct unix_mcast_group), GFP_KERNEL);
+ if (!mcast_group)
+ return -ENOBUFS;
+
+ INIT_HLIST_HEAD(&mcast_group->mcast_members);
+ INIT_HLIST_HEAD(&mcast_group->mcast_dead_members);
+ INIT_HLIST_HEAD(&mcast_group->mcast_members_lists);
+ atomic_set(&mcast_group->mcast_members_cnt, 0);
+ atomic_set(&mcast_group->mcast_membership_generation, 1);
+ atomic_set(&mcast_group->refcnt, 1);
+ for (i = 0 ; i < MCAST_LOCK_CLASS_COUNT ; i++) {
+ spin_lock_init(&mcast_group->lock[i]);
+ lockdep_set_subclass(&mcast_group->lock[i], i);
+ }
+
+ err = sock->ops->bind(sock,
+ (struct sockaddr *)&mreq->address,
+ sizeof(struct sockaddr_un));
+ if (err < 0) {
+ kfree(mcast_group);
+ return err;
+ }
+
+ unix_state_lock(sock->sk);
+ unix_sk(sock->sk)->mcast_group = mcast_group;
+ unix_state_unlock(sock->sk);
+
+ return 0;
+}
+
+
+static int unix_mc_join(struct socket *sock, struct unix_mreq *mreq)
+{
+ struct unix_sock *u = unix_sk(sock->sk);
+ struct sock *other, *peer;
+ struct unix_mcast_group *group;
+ struct unix_mcast *node;
+ int err;
+ unsigned hash;
+ int namelen;
+
+ if (mreq->address.sun_family != AF_UNIX ||
+ mreq->address.sun_path[0] != '\0')
+ return -EINVAL;
+
+ /* sockets which represent a group are not allowed to join another
+ * group */
+ if (u->mcast_group)
+ return -EINVAL;
+
+ err = unix_autobind(sock);
+ if (err < 0)
+ return err;
+
+ err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+ if (err < 0)
+ return err;
+
+ namelen = err;
+ other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
+ sock->type, hash, &err);
+ if (!other)
+ return -EINVAL;
+
+ group = unix_sk(other)->mcast_group;
+
+ if (!group) {
+ err = -EADDRINUSE;
+ goto sock_put_out;
+ }
+
+ node = kmalloc(sizeof(struct unix_mcast), GFP_KERNEL);
+ if (!node) {
+ err = -ENOMEM;
+ goto sock_put_out;
+ }
+ node->member = u;
+ node->group = group;
+ node->flags = mreq->flags;
+
+ if (sock->sk->sk_type == SOCK_SEQPACKET) {
+ peer = unix_peer_get(sock->sk);
+ if (peer) {
+ atomic_inc(&group->refcnt);
+ unix_sk(peer)->mcast_group = group;
+ sock_put(peer);
+ }
+ }
+
+ unix_state_lock(sock->sk);
+ unix_sk(sock->sk)->mcast_send_to_peer =
+ !!(mreq->flags & UNIX_MREQ_SEND_TO_PEER);
+ unix_sk(sock->sk)->mcast_drop_when_peer_full =
+ !!(mreq->flags & UNIX_MREQ_DROP_WHEN_FULL);
+ unix_state_unlock(sock->sk);
+
+ /* Keep a reference */
+ sock_hold(sock->sk);
+ atomic_inc(&group->refcnt);
+
+ spin_lock(&unix_multicast_lock);
+ hlist_add_head_rcu(&node->member_node,
+ &group->mcast_members);
+ hlist_add_head_rcu(&node->subscription_node, &u->mcast_subscriptions);
+ atomic_inc(&group->mcast_members_cnt);
+ atomic_inc(&group->mcast_membership_generation);
+ spin_unlock(&unix_multicast_lock);
+
+ return 0;
+
+sock_put_out:
+ sock_put(other);
+ return err;
+}
+
+
+static int unix_mc_leave(struct socket *sock, struct unix_mreq *mreq)
+{
+ struct unix_sock *u = unix_sk(sock->sk);
+ struct sock *other;
+ struct unix_mcast_group *group;
+ struct unix_mcast *node;
+ struct hlist_node *pos;
+ int err;
+ unsigned hash;
+ int namelen;
+
+ if (mreq->address.sun_family != AF_UNIX ||
+ mreq->address.sun_path[0] != '\0')
+ return -EINVAL;
+
+ err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+ if (err < 0)
+ return err;
+
+ namelen = err;
+ other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
+ sock->type, hash, &err);
+ if (!other)
+ return -EINVAL;
+
+ group = unix_sk(other)->mcast_group;
+
+ if (!group) {
+ err = -EINVAL;
+ goto sock_put_out;
+ }
+
+ spin_lock(&unix_multicast_lock);
+
+ hlist_for_each_entry_rcu(node, pos, &u->mcast_subscriptions,
+ subscription_node) {
+ if (node->group == group)
+ break;
+ }
+
+ if (!pos) {
+ spin_unlock(&unix_multicast_lock);
+ err = -EINVAL;
+ goto sock_put_out;
+ }
+
+ hlist_del_rcu(&node->member_node);
+ hlist_del_rcu(&node->subscription_node);
+ atomic_dec(&group->mcast_members_cnt);
+ atomic_inc(&group->mcast_membership_generation);
+ hlist_add_head_rcu(&node->member_dead_node,
+ &group->mcast_dead_members);
+ spin_unlock(&unix_multicast_lock);
+
+ if (sock->sk->sk_type == SOCK_SEQPACKET) {
+ struct sock *peer = unix_peer_get(sock->sk);
+ if (peer) {
+ unix_sk(peer)->mcast_group = NULL;
+ atomic_dec(&group->refcnt);
+ sock_put(peer);
+ }
+ }
+
+ synchronize_rcu();
+
+ if (atomic_dec_and_test(&group->refcnt)) {
+ spin_lock(&unix_multicast_lock);
+ destroy_mcast_group(group);
+ spin_unlock(&unix_multicast_lock);
+ }
+
+ err = 0;
+
+ /* If the receiving queue of that socket was full, some writers on the
+ * multicast group may be blocked */
+ wake_up_interruptible_sync_poll(&u->peer_wait,
+ POLLOUT | POLLWRNORM | POLLWRBAND);
+
+sock_put_out:
+ sock_put(other);
+ return err;
+}
+#endif
+
static int unix_setsockopt(struct socket *sock, int level, int optname,
char __user *optval, unsigned int optlen)
{
+#ifdef CONFIG_UNIX_MULTICAST
+ struct unix_mreq mreq;
+ int err = 0;
+
+ if (level != SOL_UNIX)
+ return -ENOPROTOOPT;
+
+ switch (optname) {
+ case UNIX_CREATE_GROUP:
+ case UNIX_JOIN_GROUP:
+ case UNIX_LEAVE_GROUP:
+ if (optlen < sizeof(struct unix_mreq))
+ return -EINVAL;
+ if (copy_from_user(&mreq, optval, sizeof(struct unix_mreq)))
+ return -EFAULT;
+ break;
+
+ default:
+ break;
+ }
+
+ switch (optname) {
+ case UNIX_CREATE_GROUP:
+ err = unix_mc_create(sock, &mreq);
+ break;
+
+ case UNIX_JOIN_GROUP:
+ err = unix_mc_join(sock, &mreq);
+ break;
+
+ case UNIX_LEAVE_GROUP:
+ err = unix_mc_leave(sock, &mreq);
+ break;
+
+ default:
+ err = -ENOPROTOOPT;
+ break;
+ }
+
+ return err;
+#else
return -EOPNOTSUPP;
+#endif
}
--
1.7.7.6
Multicast Unix domain sockets has an UNIX_ATTACH_FILTER setsockopt() operation
to attach a Berkeley Packet Filter to a remote peer.
This remote peer socket filter has to be a separate filter than the local one.
So the peer cannot replace it.
If local and peer filters are attached, both should be executed. First the peer
BPF and then the local one. Filters can have state and this shouldn't be
accesible to local filters.
Signed-off-by: Javier Martinez Canillas <[email protected]>
---
include/linux/filter.h | 4 ++
include/net/af_unix.h | 1 +
include/net/sock.h | 4 ++
net/core/filter.c | 118 ++++++++++++++++++++++++++++++++++++++++++++++++
net/unix/af_unix.c | 70 +++++++++++++++++++++++++---
5 files changed, 189 insertions(+), 8 deletions(-)
diff --git a/include/linux/filter.h b/include/linux/filter.h
index 8eeb205..1af808a 100644
--- a/include/linux/filter.h
+++ b/include/linux/filter.h
@@ -155,6 +155,10 @@ extern unsigned int sk_run_filter(const struct sk_buff *skb,
const struct sock_filter *filter);
extern int sk_attach_filter(struct sock_fprog *fprog, struct sock *sk);
extern int sk_detach_filter(struct sock *sk);
+#ifdef CONFIG_UNIX_MULTICAST
+extern int sk_attach_peer_filter(struct sock_fprog *fprog, struct sock *sk);
+extern int sk_detach_peer_filter(struct sock *sk);
+#endif
extern int sk_chk_filter(struct sock_filter *filter, unsigned int flen);
#ifdef CONFIG_BPF_JIT
diff --git a/include/net/af_unix.h b/include/net/af_unix.h
index 80e793d..307852c 100644
--- a/include/net/af_unix.h
+++ b/include/net/af_unix.h
@@ -49,6 +49,7 @@ struct unix_skb_parms {
#define UNIX_JOIN_GROUP 2
#define UNIX_LEAVE_GROUP 3
#define UNIX_ACCEPT_GROUP 4
+#define UNIX_ATTACH_FILTER 5
/* Flags on unix_mreq */
diff --git a/include/net/sock.h b/include/net/sock.h
index 91c1c8b..e502107 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -237,6 +237,7 @@ struct cg_proto;
* @sk_sndtimeo: %SO_SNDTIMEO setting
* @sk_rxhash: flow hash received from netif layer
* @sk_filter: socket filtering instructions
+ * @sk_peer_filter: peer socket filtering instructions
* @sk_protinfo: private area, net family specific, when not using slab
* @sk_timer: sock cleanup timer
* @sk_stamp: time stamp of last packet received
@@ -303,6 +304,9 @@ struct sock {
int sk_rcvbuf;
struct sk_filter __rcu *sk_filter;
+#ifdef CONFIG_UNIX_MULTICAST
+ struct sk_filter __rcu *sk_peer_filter;
+#endif
struct socket_wq __rcu *sk_wq;
#ifdef CONFIG_NET_DMA
diff --git a/net/core/filter.c b/net/core/filter.c
index 5dea452..5dd0890 100644
--- a/net/core/filter.c
+++ b/net/core/filter.c
@@ -587,6 +587,122 @@ void sk_filter_release_rcu(struct rcu_head *rcu)
}
EXPORT_SYMBOL(sk_filter_release_rcu);
+#ifdef CONFIG_UNIX_MULTICAST
+
+int __sk_attach_filter(struct sock_fprog *fprog, struct sock *sk, bool local)
+{
+ struct sk_filter *fp, *old_fp;
+ unsigned int fsize = sizeof(struct sock_filter) * fprog->len;
+ int err;
+
+ /* Make sure new filter is there and in the right amounts. */
+ if (fprog->filter == NULL)
+ return -EINVAL;
+
+ fp = sock_kmalloc(sk, fsize+sizeof(*fp), GFP_KERNEL);
+ if (!fp)
+ return -ENOMEM;
+ if (copy_from_user(fp->insns, fprog->filter, fsize)) {
+ sock_kfree_s(sk, fp, fsize+sizeof(*fp));
+ return -EFAULT;
+ }
+
+ atomic_set(&fp->refcnt, 1);
+ fp->len = fprog->len;
+ fp->bpf_func = sk_run_filter;
+
+ err = sk_chk_filter(fp->insns, fp->len);
+ if (err) {
+ sk_filter_uncharge(sk, fp);
+ return err;
+ }
+
+ bpf_jit_compile(fp);
+
+ if (local) {
+ old_fp = rcu_dereference_protected(sk->sk_filter,
+ sock_owned_by_user(sk));
+ rcu_assign_pointer(sk->sk_filter, fp);
+ } else {
+ old_fp = rcu_dereference_protected(sk->sk_peer_filter,
+ sock_owned_by_user(sk));
+ rcu_assign_pointer(sk->sk_peer_filter, fp);
+ }
+
+ if (old_fp)
+ sk_filter_uncharge(sk, old_fp);
+ return 0;
+}
+
+/**
+ * sk_attach_filter - attach a socket filter
+ * @fprog: the filter program
+ * @sk: the socket to use
+ *
+ * Attach the user's filter code. We first run some sanity checks on
+ * it to make sure it does not explode on us later. If an error
+ * occurs or there is insufficient memory for the filter a negative
+ * errno code is returned. On success the return is zero.
+ */
+int sk_attach_filter(struct sock_fprog *fprog, struct sock *sk)
+{
+ return __sk_attach_filter(fprog, sk, true);
+}
+EXPORT_SYMBOL_GPL(sk_attach_filter);
+
+int __sk_detach_filter(struct sock *sk, bool local)
+{
+ int ret = -ENOENT;
+ struct sk_filter *filter;
+
+ if (local)
+ filter = rcu_dereference_protected(sk->sk_filter,
+ sock_owned_by_user(sk));
+ else
+ filter = rcu_dereference_protected(sk->sk_peer_filter,
+ sock_owned_by_user(sk));
+
+ if (filter) {
+ if (local)
+ RCU_INIT_POINTER(sk->sk_filter, NULL);
+ else
+ RCU_INIT_POINTER(sk->sk_peer_filter, NULL);
+ sk_filter_uncharge(sk, filter);
+ ret = 0;
+ }
+ return ret;
+}
+
+int sk_detach_filter(struct sock *sk)
+{
+ return __sk_detach_filter(sk, true);
+}
+EXPORT_SYMBOL_GPL(sk_detach_filter);
+
+/**
+ * sk_attach_peer_filter - attach a remote socket filter
+ * @fprog: the filter program
+ * @sk: the socket to use
+ *
+ * Attach the user's filter code. We first run some sanity checks on
+ * it to make sure it does not explode on us later. If an error
+ * occurs or there is insufficient memory for the filter a negative
+ * errno code is returned. On success the return is zero.
+ */
+int sk_attach_peer_filter(struct sock_fprog *fprog, struct sock *sk)
+{
+ return __sk_attach_filter(fprog, sk, false);
+}
+EXPORT_SYMBOL_GPL(sk_attach_peer_filter);
+
+int sk_detach_peer_filter(struct sock *sk)
+{
+ return __sk_detach_filter(sk, false);
+}
+EXPORT_SYMBOL_GPL(sk_detach_peer_filter);
+
+#else
+
/**
* sk_attach_filter - attach a socket filter
* @fprog: the filter program
@@ -652,3 +768,5 @@ int sk_detach_filter(struct sock *sk)
return ret;
}
EXPORT_SYMBOL_GPL(sk_detach_filter);
+
+#endif
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 330c0a2..9439206 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -121,6 +121,7 @@ DEFINE_SPINLOCK(unix_table_lock);
EXPORT_SYMBOL_GPL(unix_table_lock);
#ifdef CONFIG_UNIX_MULTICAST
#include <linux/sort.h>
+#include <linux/filter.h>
static DEFINE_SPINLOCK(unix_multicast_lock);
#endif
@@ -1767,8 +1768,10 @@ static int unix_dgram_sendmsg_multicast(struct sock_iocb *siocb,
restart:
for (i = others_set->offset ; i < others_set->cnt ; i++) {
struct sock *cur = others_set->items[i].s;
- unsigned int pkt_len;
+ unsigned int pkt_len = 0xffffffff;
+ unsigned int pkt_peer_len = 0xffffffff;
struct sk_filter *filter;
+ struct sk_filter *peer_filter;
if (!others_set->items[i].to_deliver)
continue;
@@ -1777,14 +1780,15 @@ restart:
BUG_ON(cur == NULL);
rcu_read_lock();
- filter = rcu_dereference(cur->sk_filter);
- if (filter)
- pkt_len = sk_run_filter(skb, filter->insns);
- else
- pkt_len = 0xffffffff;
+
+ peer_filter = rcu_dereference(cur->sk_peer_filter);
+
+ if (peer_filter)
+ pkt_peer_len = sk_run_filter(skb, peer_filter->insns);
+
rcu_read_unlock();
- if (pkt_len == 0) {
+ if (!pkt_peer_len) {
others_set->items[i].to_deliver = 0;
continue;
}
@@ -1795,13 +1799,33 @@ restart:
err = -ENOMEM;
goto out_free;
}
+
+ if (peer_filter)
+ pskb_trim(others_set->items[i].skb, pkt_peer_len);
+
+ rcu_read_lock();
+
+ filter = rcu_dereference(cur->sk_filter);
+
+ if (filter)
+ pkt_len = sk_run_filter(others_set->items[i].skb,
+ filter->insns);
+ rcu_read_unlock();
+
+ if (!pkt_len) {
+ others_set->items[i].to_deliver = 0;
+ continue;
+ }
+
+ if (filter)
+ pskb_trim(others_set->items[i].skb, pkt_len);
+
skb_set_owner_w(others_set->items[i].skb, sk);
err = unix_scm_to_skb(siocb->scm, others_set->items[i].skb,
true);
if (err < 0)
goto out_free;
unix_get_secdata(siocb->scm, others_set->items[i].skb);
- pskb_trim(others_set->items[i].skb, pkt_len);
}
for (i = others_set->offset ; i < others_set->cnt ; i++) {
@@ -2443,6 +2467,7 @@ static int unix_setsockopt(struct socket *sock, int level, int optname,
#ifdef CONFIG_UNIX_MULTICAST
struct unix_mreq mreq;
int err = 0;
+ struct sock *other = NULL;
if (level != SOL_UNIX)
return -ENOPROTOOPT;
@@ -2458,6 +2483,35 @@ static int unix_setsockopt(struct socket *sock, int level, int optname,
return -EFAULT;
break;
+ case UNIX_ATTACH_FILTER:
+ err = -EINVAL;
+ if (sock->sk->sk_type != SOCK_SEQPACKET)
+ return err;
+
+ if (!unix_sk(sock->sk)->mcast_group)
+ return err;
+
+ other = unix_peer_get(sock->sk);
+ if (!other)
+ return err;
+
+ BUG_ON(unix_sk(other)->mcast_group);
+
+ if (optlen == sizeof(struct sock_fprog)) {
+ struct sock_fprog fprog;
+
+ err = -EFAULT;
+ if (copy_from_user(&fprog, optval, sizeof(fprog))) {
+ sock_put(other);
+ return err;
+ }
+
+ err = sk_attach_peer_filter(&fprog, other);
+ }
+ sock_put(other);
+ return err;
+ break;
+
default:
break;
}
--
1.7.7.6
From: Alban Crequy <[email protected]>
When a socket subscribed to a multicast group has its incoming queue full, it
can either block the emission to the multicast group or let the messages be
dropped. The latter is useful to monitor all messages without slowing down the
traffic.
It is specified with the flag UNIX_MREQ_DROP_WHEN_FULL when the multicast group
is joined.
poll(POLLOUT) is implemented by checking all receiving queues of subscribed
sockets. If only one of them has its receiving queue full and does not have
UNIX_MREQ_DROP_WHEN_FULL, the multicast socket is not writeable.
Signed-off-by: Alban Crequy <[email protected]>
Reviewed-by: Ian Molton <[email protected]>
---
net/unix/af_unix.c | 47 +++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 47 insertions(+), 0 deletions(-)
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index a6b489c..bd9dc58 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -2974,6 +2974,11 @@ static unsigned int unix_dgram_poll(struct file *file, struct socket *sock,
{
struct sock *sk = sock->sk, *other;
unsigned int mask, writable;
+#ifdef CONFIG_UNIX_MULTICAST
+ struct sock_set *others;
+ int err = 0;
+ int i;
+#endif
sock_poll_wait(file, sk_sleep(sk), wait);
mask = 0;
@@ -3011,6 +3016,48 @@ static unsigned int unix_dgram_poll(struct file *file, struct socket *sock,
if (unix_recvq_full(other))
writable = 0;
}
+
+#ifdef CONFIG_UNIX_MULTICAST
+ /*
+ * On multicast sockets, we need to check if the receiving
+ * queue is full on all peers who don't have
+ * UNIX_MREQ_DROP_WHEN_FULL.
+ */
+
+ /* Don't let the group die under us */
+ unix_state_lock(other);
+ if (sock_flag(other, SOCK_DEAD)
+ || !unix_sk(other)->mcast_group) {
+ unix_state_unlock(other);
+ goto skip_multicast;
+ }
+ atomic_inc(&unix_sk(other)->mcast_group->refcnt);
+ unix_state_unlock(other);
+
+ others = unix_find_multicast_recipients(sk,
+ unix_sk(other)->mcast_group, &err);
+ if (!others)
+ goto skip_multicast_peers;
+ for (i = others->offset ; i < others->cnt ; i++) {
+ if (others->items[i].flags & UNIX_MREQ_DROP_WHEN_FULL)
+ continue;
+ if (unix_peer(others->items[i].s) != sk) {
+ sock_poll_wait(file,
+ &unix_sk(others->items[i].s)
+ ->peer_wait, wait);
+ if (unix_recvq_full(others->items[i].s)) {
+ writable = 0;
+ break;
+ }
+ }
+ }
+ up_sock_set(others);
+skip_multicast_peers:
+ if (atomic_dec_and_test(&unix_sk(other)->mcast_group->refcnt))
+ destroy_mcast_group(unix_sk(other)->mcast_group);
+
+skip_multicast:
+#endif
sock_put(other);
}
--
1.7.7.6
From: Alban Crequy <[email protected]>
unix_dgram_sendmsg() implements the delivery both for SOCK_DGRAM and
SOCK_SEQPACKET unix sockets.
The delivery is done in an atomic way; either the message is delivered to all
recipients or none, even in case of interruptions or errors.
Signed-off-by: Alban Crequy <[email protected]>
Signed-off-by: Ian Molton <[email protected]>
---
net/unix/af_unix.c | 242 ++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 242 insertions(+), 0 deletions(-)
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index f2713d5..a6b489c 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -1722,6 +1722,210 @@ static void maybe_add_creds(struct sk_buff *skb, const struct socket *sock,
}
}
+#ifdef CONFIG_UNIX_MULTICAST
+static void kfree_skb_sock_set(struct sock_set *set)
+{
+ int i;
+ for (i = set->offset ; i < set->cnt ; i++) {
+ if (set->items[i].skb) {
+ kfree_skb(set->items[i].skb);
+ set->items[i].skb = NULL;
+ }
+ }
+}
+
+static void unix_mcast_lock(struct unix_mcast_group *group,
+ struct sock_set *set)
+{
+ int i;
+ for (i = 0 ; i < MCAST_LOCK_CLASS_COUNT ; i++) {
+ if (set->hash & (1 << i))
+ spin_lock_nested(&group->lock[i], i);
+ }
+}
+
+static void unix_mcast_unlock(struct unix_mcast_group *group,
+ struct sock_set *set)
+{
+ int i;
+ for (i = MCAST_LOCK_CLASS_COUNT - 1 ; i >= 0 ; i--) {
+ if (set->hash & (1 << i))
+ spin_unlock(&group->lock[i]);
+ }
+}
+
+
+static int unix_dgram_sendmsg_multicast(struct sock_iocb *siocb,
+ struct sock *sk,
+ struct sk_buff *skb,
+ struct unix_mcast_group *group,
+ struct sock_set *others_set,
+ size_t len,
+ int max_level,
+ long timeo)
+{
+ int err;
+ int i;
+
+ BUG_ON(!others_set);
+
+restart:
+ for (i = others_set->offset ; i < others_set->cnt ; i++) {
+ struct sock *cur = others_set->items[i].s;
+ unsigned int pkt_len;
+ struct sk_filter *filter;
+
+ if (!others_set->items[i].to_deliver)
+ continue;
+
+ BUG_ON(others_set->items[i].skb);
+ BUG_ON(cur == NULL);
+
+ rcu_read_lock();
+ filter = rcu_dereference(cur->sk_filter);
+ if (filter)
+ pkt_len = sk_run_filter(skb, filter->insns);
+ else
+ pkt_len = 0xffffffff;
+ rcu_read_unlock();
+
+ if (pkt_len == 0) {
+ others_set->items[i].to_deliver = 0;
+ continue;
+ }
+
+ others_set->items[i].skb = skb_clone(skb, GFP_KERNEL);
+ if (!others_set->items[i].skb) {
+ kfree_skb_sock_set(others_set);
+ err = -ENOMEM;
+ goto out_free;
+ }
+ skb_set_owner_w(others_set->items[i].skb, sk);
+ err = unix_scm_to_skb(siocb->scm, others_set->items[i].skb,
+ true);
+ if (err < 0)
+ goto out_free;
+ unix_get_secdata(siocb->scm, others_set->items[i].skb);
+ pskb_trim(others_set->items[i].skb, pkt_len);
+ }
+
+ for (i = others_set->offset ; i < others_set->cnt ; i++) {
+ struct sock *cur = others_set->items[i].s;
+
+ if (!others_set->items[i].to_deliver)
+ continue;
+
+ unix_state_lock(cur);
+
+ if (cur->sk_shutdown & RCV_SHUTDOWN) {
+ unix_state_unlock(cur);
+ kfree_skb(others_set->items[i].skb);
+ others_set->items[i].skb = NULL;
+ others_set->items[i].to_deliver = 0;
+ continue;
+ }
+
+ if (sk->sk_type != SOCK_SEQPACKET) {
+ err = security_unix_may_send(sk->sk_socket,
+ cur->sk_socket);
+ if (err) {
+ unix_state_unlock(cur);
+ kfree_skb(others_set->items[i].skb);
+ others_set->items[i].skb = NULL;
+ others_set->items[i].to_deliver = 0;
+ continue;
+ }
+ }
+
+ if (unix_peer(cur) != sk && unix_recvq_full(cur)) {
+ kfree_skb(others_set->items[i].skb);
+ others_set->items[i].skb = NULL;
+
+ if (others_set->items[i].flags
+ & UNIX_MREQ_DROP_WHEN_FULL) {
+ /* Drop the skbs and continue */
+ unix_state_unlock(cur);
+ others_set->items[i].to_deliver = 0;
+ continue;
+ } else {
+ if (!timeo) {
+ unix_state_unlock(cur);
+ err = -EAGAIN;
+ goto out_free;
+ }
+
+ timeo = unix_wait_for_peer(cur, timeo);
+
+ err = sock_intr_errno(timeo);
+ if (signal_pending(current))
+ goto out_free;
+
+ kfree_skb_sock_set(others_set);
+ goto restart;
+ }
+ }
+ unix_state_unlock(cur);
+ }
+
+ unix_mcast_lock(group, others_set);
+ for (i = others_set->offset ; i < others_set->cnt ; i++) {
+ struct sock *cur = others_set->items[i].s;
+
+ if (!others_set->items[i].to_deliver)
+ continue;
+
+ BUG_ON(cur == NULL);
+ BUG_ON(others_set->items[i].skb == NULL);
+
+ unix_state_lock(cur);
+
+ if (sock_flag(cur, SOCK_DEAD)) {
+ unix_state_unlock(cur);
+
+ kfree_skb(others_set->items[i].skb);
+ others_set->items[i].skb = NULL;
+ others_set->items[i].to_deliver = 0;
+ continue;
+ }
+
+ if (sock_flag(cur, SOCK_RCVTSTAMP))
+ __net_timestamp(others_set->items[i].skb);
+
+ skb_queue_tail(&cur->sk_receive_queue,
+ others_set->items[i].skb);
+ others_set->items[i].skb = NULL;
+ if (max_level > unix_sk(cur)->recursion_level)
+ unix_sk(cur)->recursion_level = max_level;
+
+ unix_state_unlock(cur);
+ }
+ unix_mcast_unlock(group, others_set);
+
+ for (i = others_set->offset ; i < others_set->cnt ; i++) {
+ struct sock *cur = others_set->items[i].s;
+
+ if (!others_set->items[i].to_deliver)
+ continue;
+
+ cur->sk_data_ready(cur, len);
+ }
+
+ kfree_skb(skb);
+ scm_destroy(siocb->scm);
+ up_sock_set(others_set);
+ return len;
+
+out_free:
+ kfree_skb(skb);
+ if (others_set) {
+ kfree_skb_sock_set(others_set);
+ up_sock_set(others_set);
+ }
+ return err;
+}
+#endif
+
+
/*
* Send AF_UNIX data.
*/
@@ -1742,6 +1946,10 @@ static int unix_dgram_sendmsg(struct kiocb *kiocb, struct socket *sock,
long timeo;
struct scm_cookie tmp_scm;
int max_level;
+#ifdef CONFIG_UNIX_MULTICAST
+ struct unix_mcast_group *group = NULL;
+ struct sock_set *others_set = NULL;
+#endif
if (NULL == siocb->scm)
siocb->scm = &tmp_scm;
@@ -1763,8 +1971,20 @@ static int unix_dgram_sendmsg(struct kiocb *kiocb, struct socket *sock,
sunaddr = NULL;
err = -ENOTCONN;
other = unix_peer_get(sk);
+
if (!other)
goto out;
+
+#ifdef CONFIG_UNIX_MULTICAST
+ group = unix_sk(other)->mcast_group;
+ if (group) {
+ others_set = unix_find_multicast_recipients(sk,
+ group, &err);
+
+ if (!others_set)
+ goto out;
+ }
+#endif
}
if (test_bit(SOCK_PASSCRED, &sock->flags) && !u->addr
@@ -1802,6 +2022,28 @@ restart:
hash, &err);
if (other == NULL)
goto out_free;
+
+#ifdef CONFIG_UNIX_MULTICAST
+ group = unix_sk(other)->mcast_group;
+ if (group) {
+ others_set = unix_find_multicast_recipients(sk,
+ group, &err);
+
+ sock_put(other);
+ other = NULL;
+
+ if (!others_set)
+ goto out;
+ }
+ }
+
+ if (group) {
+ err = unix_dgram_sendmsg_multicast(siocb, sk, skb, group,
+ others_set, len, max_level, timeo);
+ if (err < 0)
+ goto out;
+ return err;
+#endif
}
if (sk_filter(other, skb) < 0) {
--
1.7.7.6
From: Alban Crequy <[email protected]>
Signed-off-by: Alban Crequy <[email protected]>
---
include/net/af_unix.h | 2 +
net/unix/af_unix.c | 145 ++++++++++++++++++++++++++++++++++++++-----------
2 files changed, 116 insertions(+), 31 deletions(-)
diff --git a/include/net/af_unix.h b/include/net/af_unix.h
index c543f76..80e793d 100644
--- a/include/net/af_unix.h
+++ b/include/net/af_unix.h
@@ -48,6 +48,7 @@ struct unix_skb_parms {
#define UNIX_CREATE_GROUP 1
#define UNIX_JOIN_GROUP 2
#define UNIX_LEAVE_GROUP 3
+#define UNIX_ACCEPT_GROUP 4
/* Flags on unix_mreq */
@@ -115,6 +116,7 @@ struct unix_sock {
unsigned int gc_maybe_cycle : 1;
unsigned int mcast_send_to_peer : 1;
unsigned int mcast_drop_when_peer_full : 1;
+ unsigned int mcast_multicast_delivery : 1;
unsigned char recursion_level;
struct unix_mcast_group *mcast_group;
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 07e6b05..330c0a2 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -759,6 +759,8 @@ static struct sock *unix_create1(struct net *net, struct socket *sock)
mutex_init(&u->readlock); /* single task reading lock */
#ifdef CONFIG_UNIX_MULTICAST
INIT_HLIST_HEAD(&u->mcast_subscriptions);
+ u->mcast_group = NULL;
+ u->mcast_multicast_delivery = 0;
#endif
init_waitqueue_head(&u->peer_wait);
unix_insert_socket(unix_sockets_unbound, sk);
@@ -1345,10 +1347,6 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
struct sock *newsk = NULL;
struct sock *other = NULL;
struct sk_buff *skb = NULL;
-#ifdef CONFIG_UNIX_MULTICAST
- struct unix_mcast *node;
- struct hlist_node *pos;
-#endif
unsigned hash;
int st;
int err;
@@ -1464,14 +1462,11 @@ restart:
#ifdef CONFIG_UNIX_MULTICAST
/* Multicast sockets */
- hlist_for_each_entry_rcu(node, pos, &u->mcast_subscriptions,
- subscription_node) {
- if (node->group == otheru->mcast_group) {
- atomic_inc(&otheru->mcast_group->refcnt);
- newu->mcast_group = otheru->mcast_group;
- break;
- }
+ if (otheru->mcast_group) {
+ atomic_inc(&otheru->mcast_group->refcnt);
+ newu->mcast_group = otheru->mcast_group;
}
+ newu->mcast_multicast_delivery = 0;
#endif
/* The way is open! Fastly set all the necessary fields... */
@@ -1976,7 +1971,8 @@ static int unix_dgram_sendmsg(struct kiocb *kiocb, struct socket *sock,
goto out;
#ifdef CONFIG_UNIX_MULTICAST
- group = unix_sk(other)->mcast_group;
+ group = unix_sk(other)->mcast_multicast_delivery ?
+ unix_sk(other)->mcast_group : NULL;
if (group) {
others_set = unix_find_multicast_recipients(sk,
group, &err);
@@ -2024,7 +2020,8 @@ restart:
goto out_free;
#ifdef CONFIG_UNIX_MULTICAST
- group = unix_sk(other)->mcast_group;
+ group = unix_sk(other)->mcast_multicast_delivery ?
+ unix_sk(other)->mcast_group : NULL;
if (group) {
others_set = unix_find_multicast_recipients(sk,
group, &err);
@@ -2184,6 +2181,7 @@ static int unix_mc_create(struct socket *sock, struct unix_mreq *mreq)
unix_state_lock(sock->sk);
unix_sk(sock->sk)->mcast_group = mcast_group;
+ unix_sk(sock->sk)->mcast_multicast_delivery = 1;
unix_state_unlock(sock->sk);
return 0;
@@ -2192,8 +2190,9 @@ static int unix_mc_create(struct socket *sock, struct unix_mreq *mreq)
static int unix_mc_join(struct socket *sock, struct unix_mreq *mreq)
{
- struct unix_sock *u = unix_sk(sock->sk);
- struct sock *other, *peer;
+ struct unix_sock *u;
+ struct sock *other = NULL;
+ struct sock *peer = NULL;
struct unix_mcast_group *group;
struct unix_mcast *node;
int err;
@@ -2204,8 +2203,12 @@ static int unix_mc_join(struct socket *sock, struct unix_mreq *mreq)
mreq->address.sun_path[0] != '\0')
return -EINVAL;
- /* sockets which represent a group are not allowed to join another
- * group */
+ if (sock->type != SOCK_DGRAM)
+ return -EINVAL;
+
+ /* sockets which represent a group are not allowed to join
+ * another group */
+ u = unix_sk(sock->sk);
if (u->mcast_group)
return -EINVAL;
@@ -2213,16 +2216,16 @@ static int unix_mc_join(struct socket *sock, struct unix_mreq *mreq)
if (err < 0)
return err;
- err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+ err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un),
+ &hash);
if (err < 0)
return err;
namelen = err;
- other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
- sock->type, hash, &err);
+ other = unix_find_other(sock_net(sock->sk), &mreq->address,
+ namelen, sock->type, hash, &err);
if (!other)
return -EINVAL;
-
group = unix_sk(other)->mcast_group;
if (!group) {
@@ -2239,15 +2242,6 @@ static int unix_mc_join(struct socket *sock, struct unix_mreq *mreq)
node->group = group;
node->flags = mreq->flags;
- if (sock->sk->sk_type == SOCK_SEQPACKET) {
- peer = unix_peer_get(sock->sk);
- if (peer) {
- atomic_inc(&group->refcnt);
- unix_sk(peer)->mcast_group = group;
- sock_put(peer);
- }
- }
-
unix_state_lock(sock->sk);
unix_sk(sock->sk)->mcast_send_to_peer =
!!(mreq->flags & UNIX_MREQ_SEND_TO_PEER);
@@ -2270,7 +2264,87 @@ static int unix_mc_join(struct socket *sock, struct unix_mreq *mreq)
return 0;
sock_put_out:
- sock_put(other);
+ if (other)
+ sock_put(other);
+ if (peer)
+ sock_put(peer);
+ return err;
+}
+
+static int unix_mc_accept(struct socket *sock, struct unix_mreq *mreq)
+{
+ struct unix_sock *u = unix_sk(sock->sk);
+ struct unix_sock *peeru;
+ struct sock *peer = NULL;
+ struct unix_mcast_group *group;
+ struct unix_mcast *node;
+ int err;
+
+ if (mreq->address.sun_family != AF_UNIX ||
+ mreq->address.sun_path[0] != '\0')
+ return -EINVAL;
+
+ if (sock->type != SOCK_SEQPACKET)
+ return -EINVAL;
+
+ /* The reference is kept as long as peer is member of the group */
+ peer = unix_peer_get(sock->sk);
+ if (!peer)
+ return -ENOTCONN;
+ peeru = unix_sk(peer);
+
+ if (sock->sk->sk_state != TCP_ESTABLISHED)
+ return -ENOTCONN;
+
+ if (peer->sk_shutdown != 0) {
+ err = -ENOTCONN;
+ goto sock_put_out;
+ }
+
+ group = u->mcast_group;
+ if (!group) {
+ err = -EINVAL;
+ goto sock_put_out;
+ }
+
+ node = kmalloc(sizeof(struct unix_mcast), GFP_KERNEL);
+ if (!node) {
+ err = -ENOMEM;
+ goto sock_put_out;
+ }
+ node->member = peeru;
+ node->group = group;
+ node->flags = mreq->flags;
+
+ unix_state_lock(peer);
+ peeru->mcast_send_to_peer =
+ !!(mreq->flags & UNIX_MREQ_SEND_TO_PEER);
+ peeru->mcast_drop_when_peer_full =
+ !!(mreq->flags & UNIX_MREQ_DROP_WHEN_FULL);
+ unix_state_unlock(peer);
+
+ unix_state_lock(sock->sk);
+ u->mcast_multicast_delivery = 1;
+ unix_state_unlock(sock->sk);
+
+ /* Keep a reference for the socket and the peer */
+ atomic_inc(&group->refcnt);
+ atomic_inc(&group->refcnt);
+
+ spin_lock(&unix_multicast_lock);
+ hlist_add_head_rcu(&node->member_node,
+ &group->mcast_members);
+ hlist_add_head_rcu(&node->subscription_node,
+ &peeru->mcast_subscriptions);
+ atomic_inc(&group->mcast_members_cnt);
+ atomic_inc(&group->mcast_membership_generation);
+ spin_unlock(&unix_multicast_lock);
+
+ return 0;
+
+sock_put_out:
+ if (peer)
+ sock_put(peer);
return err;
}
@@ -2290,6 +2364,9 @@ static int unix_mc_leave(struct socket *sock, struct unix_mreq *mreq)
mreq->address.sun_path[0] != '\0')
return -EINVAL;
+ if (sock->type != SOCK_DGRAM)
+ return -EINVAL;
+
err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
if (err < 0)
return err;
@@ -2333,6 +2410,7 @@ static int unix_mc_leave(struct socket *sock, struct unix_mreq *mreq)
struct sock *peer = unix_peer_get(sock->sk);
if (peer) {
unix_sk(peer)->mcast_group = NULL;
+ unix_sk(peer)->mcast_multicast_delivery = 0;
atomic_dec(&group->refcnt);
sock_put(peer);
}
@@ -2373,6 +2451,7 @@ static int unix_setsockopt(struct socket *sock, int level, int optname,
case UNIX_CREATE_GROUP:
case UNIX_JOIN_GROUP:
case UNIX_LEAVE_GROUP:
+ case UNIX_ACCEPT_GROUP:
if (optlen < sizeof(struct unix_mreq))
return -EINVAL;
if (copy_from_user(&mreq, optval, sizeof(struct unix_mreq)))
@@ -2396,6 +2475,10 @@ static int unix_setsockopt(struct socket *sock, int level, int optname,
err = unix_mc_leave(sock, &mreq);
break;
+ case UNIX_ACCEPT_GROUP:
+ err = unix_mc_accept(sock, &mreq);
+ break;
+
default:
err = -ENOPROTOOPT;
break;
--
1.7.7.6
From: Alban Crequy <[email protected]>
Signed-off-by: Alban Crequy <[email protected]>
Reviewed-by: Ian Molton <[email protected]>
---
net/unix/af_unix.c | 35 +++++++++++++++++++++++++++++++++++
1 files changed, 35 insertions(+), 0 deletions(-)
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index bd9dc58..07e6b05 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -2841,6 +2841,10 @@ static int unix_shutdown(struct socket *sock, int mode)
{
struct sock *sk = sock->sk;
struct sock *other;
+#ifdef CONFIG_UNIX_MULTICAST
+ struct unix_sock *u = unix_sk(sk);
+ int unsubscribed = 0;
+#endif
mode = (mode+1)&(RCV_SHUTDOWN|SEND_SHUTDOWN);
@@ -2852,7 +2856,38 @@ static int unix_shutdown(struct socket *sock, int mode)
other = unix_peer(sk);
if (other)
sock_hold(other);
+
+#ifdef CONFIG_UNIX_MULTICAST
+ /* If the socket subscribed to a multicast group and it is shutdown
+ * with (mode&RCV_SHUTDOWN), it should be unsubscribed or at least
+ * stop blocking the peers */
+ if (mode&RCV_SHUTDOWN) {
+ struct unix_mcast *node;
+ struct hlist_node *pos;
+ struct hlist_node *pos_tmp;
+
+ spin_lock(&unix_multicast_lock);
+ hlist_for_each_entry_safe(node, pos, pos_tmp,
+ &u->mcast_subscriptions,
+ subscription_node) {
+ hlist_del_rcu(&node->member_node);
+ hlist_del_rcu(&node->subscription_node);
+ atomic_dec(&node->group->mcast_members_cnt);
+ atomic_inc(&node->group->mcast_membership_generation);
+ hlist_add_head_rcu(&node->member_dead_node,
+ &node->group->mcast_dead_members);
+ unsubscribed = 1;
+ }
+ spin_unlock(&unix_multicast_lock);
+ }
+#endif
unix_state_unlock(sk);
+
+#ifdef CONFIG_UNIX_MULTICAST
+ if (unsubscribed)
+ wake_up_interruptible_all(&u->peer_wait);
+#endif
+
sk->sk_state_change(sk);
if (other &&
--
1.7.7.6
From: Alban Crequy <[email protected]>
unix_find_multicast_recipients() returns a list of recipients for the specific
multicast address. It checks the options UNIX_MREQ_SEND_TO_PEER and
UNIX_MREQ_LOOPBACK to get the right recipients.
The list of recipients is ordered and guaranteed not to have duplicates.
When the caller has finished with the list of recipients, it will call
up_sock_set() and the list can be reused by another sender.
Signed-off-by: Alban Crequy <[email protected]>
Reviewed-by: Ian Molton <[email protected]>
---
net/unix/af_unix.c | 239 ++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 239 insertions(+), 0 deletions(-)
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 6f8fe57..f2713d5 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -120,6 +120,8 @@ EXPORT_SYMBOL_GPL(unix_socket_table);
DEFINE_SPINLOCK(unix_table_lock);
EXPORT_SYMBOL_GPL(unix_table_lock);
#ifdef CONFIG_UNIX_MULTICAST
+#include <linux/sort.h>
+
static DEFINE_SPINLOCK(unix_multicast_lock);
#endif
static atomic_long_t unix_nr_socks;
@@ -128,6 +130,71 @@ static atomic_long_t unix_nr_socks;
#define UNIX_ABSTRACT(sk) (unix_sk(sk)->addr->hash != UNIX_HASH_SIZE)
+#ifdef CONFIG_UNIX_MULTICAST
+/* Array of sockets used in multicast deliveries */
+struct sock_item {
+ /* constant fields */
+ struct sock *s;
+ unsigned int flags;
+
+ /* fields reinitialized at every send */
+ struct sk_buff *skb;
+ unsigned int to_deliver:1;
+};
+
+struct sock_set {
+ /* struct sock_set is used by one sender at a time */
+ struct semaphore sem;
+ struct hlist_node list;
+ struct rcu_head rcu;
+ int generation;
+
+ /* the sender should consider only sockets from items[offset] to
+ * item[cnt-1] */
+ int cnt;
+ int offset;
+ /* Bitfield of (struct unix_mcast_group)->lock spinlocks to take in
+ * order to guarantee causal order of delivery */
+ u8 hash;
+ /* ordered list of sockets without duplicates. Cell zero is reserved
+ * for sending a message to the accepted socket (SOCK_SEQPACKET only).
+ */
+ struct sock_item items[0];
+};
+
+static void up_sock_set(struct sock_set *set)
+{
+ if ((set->offset == 0) && set->items[0].s) {
+ sock_put(set->items[0].s);
+ set->items[0].s = NULL;
+ set->items[0].skb = NULL;
+ }
+ up(&set->sem);
+}
+
+static void kfree_sock_set(struct sock_set *set)
+{
+ int i;
+ for (i = set->offset ; i < set->cnt ; i++) {
+ if (set->items[i].s)
+ sock_put(set->items[i].s);
+ }
+ kfree(set);
+}
+
+static int sock_item_compare(const void *_a, const void *_b)
+{
+ const struct sock_item *a = _a;
+ const struct sock_item *b = _b;
+ if (a->s > b->s)
+ return 1;
+ else if (a->s < b->s)
+ return -1;
+ else
+ return 0;
+}
+#endif
+
#ifdef CONFIG_SECURITY_NETWORK
static void unix_get_secdata(struct scm_cookie *scm, struct sk_buff *skb)
{
@@ -382,6 +449,7 @@ static void
destroy_mcast_group(struct unix_mcast_group *group)
{
struct unix_mcast *node;
+ struct sock_set *set;
struct hlist_node *pos;
struct hlist_node *pos_tmp;
@@ -395,6 +463,12 @@ destroy_mcast_group(struct unix_mcast_group *group)
sock_put(&node->member->sk);
kfree(node);
}
+ hlist_for_each_entry_safe(set, pos, pos_tmp,
+ &group->mcast_members_lists,
+ list) {
+ hlist_del_rcu(&set->list);
+ kfree_sock_set(set);
+ }
kfree(group);
}
#endif
@@ -856,6 +930,171 @@ fail:
return NULL;
}
+#ifdef CONFIG_UNIX_MULTICAST
+static int unix_find_multicast_members(struct sock_set *set,
+ int recipient_cnt,
+ struct hlist_head *list)
+{
+ struct unix_mcast *node;
+ struct hlist_node *pos;
+
+ hlist_for_each_entry_rcu(node, pos, list,
+ member_node) {
+ struct sock *s;
+
+ if (set->cnt + 1 > recipient_cnt)
+ return -ENOMEM;
+
+ s = &node->member->sk;
+ sock_hold(s);
+ set->items[set->cnt].s = s;
+ set->items[set->cnt].flags = node->flags;
+ set->cnt++;
+
+ set->hash |= 1 << ((((int)s) >> 6) & 0x07);
+ }
+
+ return 0;
+}
+
+void sock_set_reclaim(struct rcu_head *rp)
+{
+ struct sock_set *set = container_of(rp, struct sock_set, rcu);
+ kfree_sock_set(set);
+}
+
+static struct sock_set *unix_find_multicast_recipients(struct sock *sender,
+ struct unix_mcast_group *group,
+ int *err)
+{
+ struct sock_set *set = NULL; /* fake GCC */
+ struct hlist_node *pos;
+ int recipient_cnt;
+ int generation;
+ int i;
+
+ BUG_ON(sender == NULL);
+ BUG_ON(group == NULL);
+
+ /* Find an available set if any */
+ generation = atomic_read(&group->mcast_membership_generation);
+ rcu_read_lock();
+ hlist_for_each_entry_rcu(set, pos, &group->mcast_members_lists,
+ list) {
+ if (down_trylock(&set->sem)) {
+ /* the set is being used by someone else */
+ continue;
+ }
+ if (set->generation == generation) {
+ /* the set is still valid, use it */
+ break;
+ }
+ /* The set is outdated. It will be removed from the RCU list
+ * soon but not in this lockless RCU read */
+ up(&set->sem);
+ }
+ rcu_read_unlock();
+ if (pos)
+ goto list_found;
+
+ /* We cannot allocate in the spin lock. First, count the recipients */
+try_again:
+ generation = atomic_read(&group->mcast_membership_generation);
+ recipient_cnt = atomic_read(&group->mcast_members_cnt);
+
+ /* Allocate for the set and hope the number of recipients does not
+ * change while the lock is released. If it changes, we have to try
+ * again... We allocate a bit more than needed, so if a _few_ members
+ * are added in a multicast group meanwhile, we don't always need to
+ * try again. */
+ recipient_cnt += 5;
+
+ set = kmalloc(sizeof(struct sock_set)
+ + sizeof(struct sock_item) * recipient_cnt,
+ GFP_KERNEL);
+ if (!set) {
+ *err = -ENOMEM;
+ return NULL;
+ }
+ sema_init(&set->sem, 0);
+ set->cnt = 1;
+ set->offset = 1;
+ set->generation = generation;
+ set->hash = 0;
+
+ rcu_read_lock();
+ if (unix_find_multicast_members(set, recipient_cnt,
+ &group->mcast_members)) {
+ rcu_read_unlock();
+ kfree_sock_set(set);
+ goto try_again;
+ }
+ rcu_read_unlock();
+
+ /* Keep the array ordered to prevent deadlocks when locking the
+ * receiving queues. The ordering is:
+ * - First, the accepted socket (SOCK_SEQPACKET only)
+ * - Then, the member sockets ordered by memory address
+ * The accepted socket cannot be member of a multicast group.
+ */
+ sort(set->items + 1, set->cnt - 1, sizeof(struct sock_item),
+ sock_item_compare, NULL);
+ /* Avoid duplicates */
+ for (i = 2 ; i < set->cnt ; i++) {
+ if (set->items[i].s == set->items[i - 1].s) {
+ sock_put(set->items[i - 1].s);
+ set->items[i - 1].s = NULL;
+ }
+ }
+
+ if (generation != atomic_read(&group->mcast_membership_generation)) {
+ kfree_sock_set(set);
+ goto try_again;
+ }
+
+ spin_lock(&unix_multicast_lock);
+ hlist_add_head_rcu(&set->list,
+ &group->mcast_members_lists);
+ spin_unlock(&unix_multicast_lock);
+
+list_found:
+ /* List found. Initialize the first item. */
+ if (sender->sk_type == SOCK_SEQPACKET
+ && unix_peer(sender)
+ && unix_sk(sender)->mcast_send_to_peer) {
+ set->offset = 0;
+ sock_hold(unix_peer(sender));
+ set->items[0].s = unix_peer(sender);
+ set->items[0].skb = NULL;
+ set->items[0].to_deliver = 1;
+ set->items[0].flags =
+ unix_sk(sender)->mcast_drop_when_peer_full
+ ? UNIX_MREQ_DROP_WHEN_FULL : 0;
+ } else {
+ set->items[0].s = NULL;
+ set->items[0].skb = NULL;
+ set->items[0].to_deliver = 0;
+ set->offset = 1;
+ }
+
+ /* Initialize the other items. */
+ for (i = 1 ; i < set->cnt ; i++) {
+ set->items[i].skb = NULL;
+ if (set->items[i].s == NULL) {
+ set->items[i].to_deliver = 0;
+ continue;
+ }
+ if (set->items[i].flags & UNIX_MREQ_LOOPBACK
+ || sender != set->items[i].s)
+ set->items[i].to_deliver = 1;
+ else
+ set->items[i].to_deliver = 0;
+ }
+
+ return set;
+}
+#endif
+
static int unix_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
{
--
1.7.7.6