2017-03-21 04:11:22

by Jason Wang

[permalink] [raw]
Subject: [PATCH net-next 0/8] vhost-net rx batching

Hi all:

This series tries to implement rx batching for vhost-net. This is done
by batching the dequeuing from skb_array which was exported by
underlayer socket and pass the sbk back through msg_control to finish
userspace copying.

Tests shows at most 19% improvment on rx pps.

Please review.

Thanks

Jason Wang (8):
ptr_ring: introduce batch dequeuing
skb_array: introduce batch dequeuing
tun: export skb_array
tap: export skb_array
tun: support receiving skb through msg_control
tap: support receiving skb from msg_control
vhost_net: try batch dequing from skb array
vhost_net: use lockless peeking for skb array during busy polling

drivers/net/tap.c | 25 ++++++++++++++---
drivers/net/tun.c | 31 +++++++++++++++------
drivers/vhost/net.c | 71 +++++++++++++++++++++++++++++++++++++++++++----
include/linux/if_tap.h | 5 ++++
include/linux/if_tun.h | 5 ++++
include/linux/ptr_ring.h | 65 +++++++++++++++++++++++++++++++++++++++++++
include/linux/skb_array.h | 25 +++++++++++++++++
7 files changed, 209 insertions(+), 18 deletions(-)

--
2.7.4


2017-03-21 04:11:24

by Jason Wang

[permalink] [raw]
Subject: [PATCH net-next 6/8] tap: support receiving skb from msg_control

This patch makes tap_recvmsg() can receive from skb from its caller
through msg_control. Vhost_net will be the first user.

Signed-off-by: Jason Wang <[email protected]>
---
drivers/net/tap.c | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index abdaf86..07d9174 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -824,15 +824,17 @@ static ssize_t tap_put_user(struct tap_queue *q,

static ssize_t tap_do_read(struct tap_queue *q,
struct iov_iter *to,
- int noblock)
+ int noblock, struct sk_buff *skb)
{
DEFINE_WAIT(wait);
- struct sk_buff *skb;
ssize_t ret = 0;

if (!iov_iter_count(to))
return 0;

+ if (skb)
+ goto done;
+
while (1) {
if (!noblock)
prepare_to_wait(sk_sleep(&q->sk), &wait,
@@ -856,6 +858,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
if (!noblock)
finish_wait(sk_sleep(&q->sk), &wait);

+done:
if (skb) {
ret = tap_put_user(q, skb, to);
if (unlikely(ret < 0))
@@ -872,7 +875,7 @@ static ssize_t tap_read_iter(struct kiocb *iocb, struct iov_iter *to)
struct tap_queue *q = file->private_data;
ssize_t len = iov_iter_count(to), ret;

- ret = tap_do_read(q, to, file->f_flags & O_NONBLOCK);
+ ret = tap_do_read(q, to, file->f_flags & O_NONBLOCK, NULL);
ret = min_t(ssize_t, ret, len);
if (ret > 0)
iocb->ki_pos = ret;
@@ -1155,7 +1158,8 @@ static int tap_recvmsg(struct socket *sock, struct msghdr *m,
int ret;
if (flags & ~(MSG_DONTWAIT|MSG_TRUNC))
return -EINVAL;
- ret = tap_do_read(q, &m->msg_iter, flags & MSG_DONTWAIT);
+ ret = tap_do_read(q, &m->msg_iter, flags & MSG_DONTWAIT,
+ m->msg_control);
if (ret > total_len) {
m->msg_flags |= MSG_TRUNC;
ret = flags & MSG_TRUNC ? ret : total_len;
--
2.7.4

2017-03-21 04:11:21

by Jason Wang

[permalink] [raw]
Subject: [PATCH net-next 4/8] tap: export skb_array

This patch exports skb_array through tap_get_skb_array(). Caller can
then manipulate skb array directly.

Signed-off-by: Jason Wang <[email protected]>
---
drivers/net/tap.c | 13 +++++++++++++
include/linux/if_tap.h | 5 +++++
2 files changed, 18 insertions(+)

diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index 4d4173d..abdaf86 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -1193,6 +1193,19 @@ struct socket *tap_get_socket(struct file *file)
}
EXPORT_SYMBOL_GPL(tap_get_socket);

+struct skb_array *tap_get_skb_array(struct file *file)
+{
+ struct tap_queue *q;
+
+ if (file->f_op != &tap_fops)
+ return ERR_PTR(-EINVAL);
+ q = file->private_data;
+ if (!q)
+ return ERR_PTR(-EBADFD);
+ return &q->skb_array;
+}
+EXPORT_SYMBOL_GPL(tap_get_skb_array);
+
int tap_queue_resize(struct tap_dev *tap)
{
struct net_device *dev = tap->dev;
diff --git a/include/linux/if_tap.h b/include/linux/if_tap.h
index 3482c3c..4837157 100644
--- a/include/linux/if_tap.h
+++ b/include/linux/if_tap.h
@@ -3,6 +3,7 @@

#if IS_ENABLED(CONFIG_TAP)
struct socket *tap_get_socket(struct file *);
+struct skb_array *tap_get_skb_array(struct file *file);
#else
#include <linux/err.h>
#include <linux/errno.h>
@@ -12,6 +13,10 @@ static inline struct socket *tap_get_socket(struct file *f)
{
return ERR_PTR(-EINVAL);
}
+static inline struct skb_array *tap_get_skb_array(struct file *f)
+{
+ return ERR_PTR(-EINVAL);
+}
#endif /* CONFIG_TAP */

#include <net/sock.h>
--
2.7.4

2017-03-21 04:12:54

by Jason Wang

[permalink] [raw]
Subject: [PATCH net-next 8/8] vhost_net: use lockless peeking for skb array during busy polling

For the socket that exports its skb array, we can use lockless polling
to avoid touching spinlock during busy polling.

Signed-off-by: Jason Wang <[email protected]>
---
drivers/vhost/net.c | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index 53f09f2..41153a3 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -551,10 +551,13 @@ static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
return len;
}

-static int sk_has_rx_data(struct sock *sk)
+static int sk_has_rx_data(struct vhost_net_virtqueue *rvq, struct sock *sk)
{
struct socket *sock = sk->sk_socket;

+ if (rvq->rx_array)
+ return !__skb_array_empty(rvq->rx_array);
+
if (sock->ops->peek_len)
return sock->ops->peek_len(sock);

@@ -579,7 +582,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net,
endtime = busy_clock() + vq->busyloop_timeout;

while (vhost_can_busy_poll(&net->dev, endtime) &&
- !sk_has_rx_data(sk) &&
+ !sk_has_rx_data(rvq, sk) &&
vhost_vq_avail_empty(&net->dev, vq))
cpu_relax();

--
2.7.4

2017-03-21 04:15:55

by Jason Wang

[permalink] [raw]
Subject: [PATCH net-next 1/8] ptr_ring: introduce batch dequeuing

Signed-off-by: Jason Wang <[email protected]>
---
include/linux/ptr_ring.h | 65 ++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 65 insertions(+)

diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 6c70444..4771ded 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -247,6 +247,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r)
return ptr;
}

+static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
+ void **array, int n)
+{
+ void *ptr;
+ int i = 0;
+
+ while (i < n) {
+ ptr = __ptr_ring_consume(r);
+ if (!ptr)
+ break;
+ array[i++] = ptr;
+ }
+
+ return i;
+}
+
/*
* Note: resize (below) nests producer lock within consumer lock, so if you
* call this in interrupt or BH context, you must disable interrupts/BH when
@@ -297,6 +313,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
return ptr;
}

+static inline int ptr_ring_consume_batched(struct ptr_ring *r,
+ void **array, int n)
+{
+ int ret;
+
+ spin_lock(&r->consumer_lock);
+ ret = __ptr_ring_consume_batched(r, array, n);
+ spin_unlock(&r->consumer_lock);
+
+ return ret;
+}
+
+static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
+ void **array, int n)
+{
+ int ret;
+
+ spin_lock_irq(&r->consumer_lock);
+ ret = __ptr_ring_consume_batched(r, array, n);
+ spin_unlock_irq(&r->consumer_lock);
+
+ return ret;
+}
+
+static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
+ void **array, int n)
+{
+ unsigned long flags;
+ int ret;
+
+ spin_lock_irqsave(&r->consumer_lock, flags);
+ ret = __ptr_ring_consume_batched(r, array, n);
+ spin_unlock_irqrestore(&r->consumer_lock, flags);
+
+ return ret;
+}
+
+static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
+ void **array, int n)
+{
+ int ret;
+
+ spin_lock_bh(&r->consumer_lock);
+ ret = __ptr_ring_consume_batched(r, array, n);
+ spin_unlock_bh(&r->consumer_lock);
+
+ return ret;
+}
+
/* Cast to structure type and call a function without discarding from FIFO.
* Function must return a value.
* Callers must take consumer_lock.
--
2.7.4

2017-03-21 04:16:03

by Jason Wang

[permalink] [raw]
Subject: [PATCH net-next 2/8] skb_array: introduce batch dequeuing

Signed-off-by: Jason Wang <[email protected]>
---
include/linux/skb_array.h | 25 +++++++++++++++++++++++++
1 file changed, 25 insertions(+)

diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
index f4dfade..90e44b9 100644
--- a/include/linux/skb_array.h
+++ b/include/linux/skb_array.h
@@ -97,21 +97,46 @@ static inline struct sk_buff *skb_array_consume(struct skb_array *a)
return ptr_ring_consume(&a->ring);
}

+static inline int skb_array_consume_batched(struct skb_array *a,
+ void **array, int n)
+{
+ return ptr_ring_consume_batched(&a->ring, array, n);
+}
+
static inline struct sk_buff *skb_array_consume_irq(struct skb_array *a)
{
return ptr_ring_consume_irq(&a->ring);
}

+static inline int skb_array_consume_batched_irq(struct skb_array *a,
+ void **array, int n)
+{
+ return ptr_ring_consume_batched_irq(&a->ring, array, n);
+}
+
static inline struct sk_buff *skb_array_consume_any(struct skb_array *a)
{
return ptr_ring_consume_any(&a->ring);
}

+static inline int skb_array_consume_batched_any(struct skb_array *a,
+ void **array, int n)
+{
+ return ptr_ring_consume_batched_any(&a->ring, array, n);
+}
+
+
static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a)
{
return ptr_ring_consume_bh(&a->ring);
}

+static inline int skb_array_consume_batched_bh(struct skb_array *a,
+ void **array, int n)
+{
+ return ptr_ring_consume_batched_bh(&a->ring, array, n);
+}
+
static inline int __skb_array_len_with_tag(struct sk_buff *skb)
{
if (likely(skb)) {
--
2.7.4

2017-03-21 04:16:15

by Jason Wang

[permalink] [raw]
Subject: [PATCH net-next 5/8] tun: support receiving skb through msg_control

This patch makes tun_recvmsg() can receive from skb from its caller
through msg_control. Vhost_net will be the first user.

Signed-off-by: Jason Wang <[email protected]>
---
drivers/net/tun.c | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 70dd9ec..a82bced 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -1498,9 +1498,8 @@ static struct sk_buff *tun_ring_recv(struct tun_file *tfile, int noblock,

static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
struct iov_iter *to,
- int noblock)
+ int noblock, struct sk_buff *skb)
{
- struct sk_buff *skb;
ssize_t ret;
int err;

@@ -1509,10 +1508,12 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
if (!iov_iter_count(to))
return 0;

- /* Read frames from ring */
- skb = tun_ring_recv(tfile, noblock, &err);
- if (!skb)
- return err;
+ if (!skb) {
+ /* Read frames from ring */
+ skb = tun_ring_recv(tfile, noblock, &err);
+ if (!skb)
+ return err;
+ }

ret = tun_put_user(tun, tfile, skb, to);
if (unlikely(ret < 0))
@@ -1532,7 +1533,7 @@ static ssize_t tun_chr_read_iter(struct kiocb *iocb, struct iov_iter *to)

if (!tun)
return -EBADFD;
- ret = tun_do_read(tun, tfile, to, file->f_flags & O_NONBLOCK);
+ ret = tun_do_read(tun, tfile, to, file->f_flags & O_NONBLOCK, NULL);
ret = min_t(ssize_t, ret, len);
if (ret > 0)
iocb->ki_pos = ret;
@@ -1634,7 +1635,8 @@ static int tun_recvmsg(struct socket *sock, struct msghdr *m, size_t total_len,
SOL_PACKET, TUN_TX_TIMESTAMP);
goto out;
}
- ret = tun_do_read(tun, tfile, &m->msg_iter, flags & MSG_DONTWAIT);
+ ret = tun_do_read(tun, tfile, &m->msg_iter, flags & MSG_DONTWAIT,
+ m->msg_control);
if (ret > (ssize_t)total_len) {
m->msg_flags |= MSG_TRUNC;
ret = flags & MSG_TRUNC ? ret : total_len;
--
2.7.4

2017-03-21 04:16:24

by Jason Wang

[permalink] [raw]
Subject: [PATCH net-next 7/8] vhost_net: try batch dequing from skb array

We used to dequeue one skb during recvmsg() from skb_array, this could
be inefficient because of the bad cache utilization and spinlock
touching for each packet. This patch tries to batch them by calling
batch dequeuing helpers explicitly on the exported skb array and pass
the skb back through msg_control for underlayer socket to finish the
userspace copying.

Tests were done by XDP1:
- small buffer:
Before: 1.88Mpps
After : 2.25Mpps (+19.6%)
- mergeable buffer:
Before: 1.83Mpps
After : 2.10Mpps (+14.7%)

Signed-off-by: Jason Wang <[email protected]>
---
drivers/vhost/net.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++----
1 file changed, 60 insertions(+), 4 deletions(-)

diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index 9b51989..53f09f2 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -28,6 +28,8 @@
#include <linux/if_macvlan.h>
#include <linux/if_tap.h>
#include <linux/if_vlan.h>
+#include <linux/skb_array.h>
+#include <linux/skbuff.h>

#include <net/sock.h>

@@ -85,6 +87,7 @@ struct vhost_net_ubuf_ref {
struct vhost_virtqueue *vq;
};

+#define VHOST_RX_BATCH 64
struct vhost_net_virtqueue {
struct vhost_virtqueue vq;
size_t vhost_hlen;
@@ -99,6 +102,10 @@ struct vhost_net_virtqueue {
/* Reference counting for outstanding ubufs.
* Protected by vq mutex. Writers must also take device mutex. */
struct vhost_net_ubuf_ref *ubufs;
+ struct skb_array *rx_array;
+ void *rxq[VHOST_RX_BATCH];
+ int rt;
+ int rh;
};

struct vhost_net {
@@ -201,6 +208,8 @@ static void vhost_net_vq_reset(struct vhost_net *n)
n->vqs[i].ubufs = NULL;
n->vqs[i].vhost_hlen = 0;
n->vqs[i].sock_hlen = 0;
+ n->vqs[i].rt = 0;
+ n->vqs[i].rh = 0;
}

}
@@ -503,13 +512,30 @@ static void handle_tx(struct vhost_net *net)
mutex_unlock(&vq->mutex);
}

-static int peek_head_len(struct sock *sk)
+static int peek_head_len_batched(struct vhost_net_virtqueue *rvq)
+{
+ if (rvq->rh != rvq->rt)
+ goto out;
+
+ rvq->rh = rvq->rt = 0;
+ rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
+ VHOST_RX_BATCH);
+ if (!rvq->rt)
+ return 0;
+out:
+ return __skb_array_len_with_tag(rvq->rxq[rvq->rh]);
+}
+
+static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
{
struct socket *sock = sk->sk_socket;
struct sk_buff *head;
int len = 0;
unsigned long flags;

+ if (rvq->rx_array)
+ return peek_head_len_batched(rvq);
+
if (sock->ops->peek_len)
return sock->ops->peek_len(sock);

@@ -535,12 +561,14 @@ static int sk_has_rx_data(struct sock *sk)
return skb_queue_empty(&sk->sk_receive_queue);
}

-static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
+static int vhost_net_rx_peek_head_len(struct vhost_net *net,
+ struct sock *sk)
{
+ struct vhost_net_virtqueue *rvq = &net->vqs[VHOST_NET_VQ_RX];
struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
struct vhost_virtqueue *vq = &nvq->vq;
unsigned long uninitialized_var(endtime);
- int len = peek_head_len(sk);
+ int len = peek_head_len(rvq, sk);

if (!len && vq->busyloop_timeout) {
/* Both tx vq and rx socket were polled here */
@@ -561,7 +589,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
vhost_poll_queue(&vq->poll);
mutex_unlock(&vq->mutex);

- len = peek_head_len(sk);
+ len = peek_head_len(rvq, sk);
}

return len;
@@ -699,6 +727,8 @@ static void handle_rx(struct vhost_net *net)
/* On error, stop handling until the next kick. */
if (unlikely(headcount < 0))
goto out;
+ if (nvq->rx_array)
+ msg.msg_control = nvq->rxq[nvq->rh++];
/* On overrun, truncate and discard */
if (unlikely(headcount > UIO_MAXIOV)) {
iov_iter_init(&msg.msg_iter, READ, vq->iov, 1, 1);
@@ -841,6 +871,8 @@ static int vhost_net_open(struct inode *inode, struct file *f)
n->vqs[i].done_idx = 0;
n->vqs[i].vhost_hlen = 0;
n->vqs[i].sock_hlen = 0;
+ n->vqs[i].rt = 0;
+ n->vqs[i].rh = 0;
}
vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX);

@@ -856,11 +888,15 @@ static struct socket *vhost_net_stop_vq(struct vhost_net *n,
struct vhost_virtqueue *vq)
{
struct socket *sock;
+ struct vhost_net_virtqueue *nvq =
+ container_of(vq, struct vhost_net_virtqueue, vq);

mutex_lock(&vq->mutex);
sock = vq->private_data;
vhost_net_disable_vq(n, vq);
vq->private_data = NULL;
+ while (nvq->rh != nvq->rt)
+ kfree_skb(nvq->rxq[nvq->rh++]);
mutex_unlock(&vq->mutex);
return sock;
}
@@ -953,6 +989,25 @@ static struct socket *get_raw_socket(int fd)
return ERR_PTR(r);
}

+static struct skb_array *get_tap_skb_array(int fd)
+{
+ struct skb_array *array;
+ struct file *file = fget(fd);
+
+ if (!file)
+ return NULL;
+ array = tun_get_skb_array(file);
+ if (!IS_ERR(array))
+ goto out;
+ array = tap_get_skb_array(file);
+ if (!IS_ERR(array))
+ goto out;
+ array = NULL;
+out:
+ fput(file);
+ return array;
+}
+
static struct socket *get_tap_socket(int fd)
{
struct file *file = fget(fd);
@@ -1029,6 +1084,7 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)

vhost_net_disable_vq(n, vq);
vq->private_data = sock;
+ nvq->rx_array = get_tap_skb_array(fd);
r = vhost_vq_init_access(vq);
if (r)
goto err_used;
--
2.7.4

2017-03-21 04:17:04

by Jason Wang

[permalink] [raw]
Subject: [PATCH net-next 3/8] tun: export skb_array

This patch exports skb_array through tun_get_skb_array(). Caller can
then manipulate skb array directly.

Signed-off-by: Jason Wang <[email protected]>
---
drivers/net/tun.c | 13 +++++++++++++
include/linux/if_tun.h | 5 +++++
2 files changed, 18 insertions(+)

diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index c418f0a..70dd9ec 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -2613,6 +2613,19 @@ struct socket *tun_get_socket(struct file *file)
}
EXPORT_SYMBOL_GPL(tun_get_socket);

+struct skb_array *tun_get_skb_array(struct file *file)
+{
+ struct tun_file *tfile;
+
+ if (file->f_op != &tun_fops)
+ return ERR_PTR(-EINVAL);
+ tfile = file->private_data;
+ if (!tfile)
+ return ERR_PTR(-EBADFD);
+ return &tfile->tx_array;
+}
+EXPORT_SYMBOL_GPL(tun_get_skb_array);
+
module_init(tun_init);
module_exit(tun_cleanup);
MODULE_DESCRIPTION(DRV_DESCRIPTION);
diff --git a/include/linux/if_tun.h b/include/linux/if_tun.h
index ed6da2e..bf9bdf4 100644
--- a/include/linux/if_tun.h
+++ b/include/linux/if_tun.h
@@ -19,6 +19,7 @@

#if defined(CONFIG_TUN) || defined(CONFIG_TUN_MODULE)
struct socket *tun_get_socket(struct file *);
+struct skb_array *tun_get_skb_array(struct file *file);
#else
#include <linux/err.h>
#include <linux/errno.h>
@@ -28,5 +29,9 @@ static inline struct socket *tun_get_socket(struct file *f)
{
return ERR_PTR(-EINVAL);
}
+static inline struct skb_array *tun_get_skb_array(struct file *f)
+{
+ return ERR_PTR(-EINVAL);
+}
#endif /* CONFIG_TUN */
#endif /* __IF_TUN_H */
--
2.7.4

2017-03-21 10:25:18

by Sergei Shtylyov

[permalink] [raw]
Subject: Re: [PATCH net-next 1/8] ptr_ring: introduce batch dequeuing

Hello!

On 3/21/2017 7:04 AM, Jason Wang wrote:

> Signed-off-by: Jason Wang <[email protected]>
> ---
> include/linux/ptr_ring.h | 65 ++++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 65 insertions(+)
>
> diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
> index 6c70444..4771ded 100644
> --- a/include/linux/ptr_ring.h
> +++ b/include/linux/ptr_ring.h
> @@ -247,6 +247,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r)
> return ptr;
> }
>
> +static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
> + void **array, int n)
> +{
> + void *ptr;
> + int i = 0;
> +
> + while (i < n) {

Hm, why not *for*?

> + ptr = __ptr_ring_consume(r);
> + if (!ptr)
> + break;
> + array[i++] = ptr;
> + }
> +
> + return i;
> +}
> +
> /*
> * Note: resize (below) nests producer lock within consumer lock, so if you
> * call this in interrupt or BH context, you must disable interrupts/BH when
> @@ -297,6 +313,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
[...]

MBR, Sergei

2017-03-22 03:16:16

by Jason Wang

[permalink] [raw]
Subject: Re: [PATCH net-next 1/8] ptr_ring: introduce batch dequeuing



On 2017年03月21日 18:25, Sergei Shtylyov wrote:
> Hello!
>
> On 3/21/2017 7:04 AM, Jason Wang wrote:
>
>> Signed-off-by: Jason Wang <[email protected]>
>> ---
>> include/linux/ptr_ring.h | 65
>> ++++++++++++++++++++++++++++++++++++++++++++++++
>> 1 file changed, 65 insertions(+)
>>
>> diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
>> index 6c70444..4771ded 100644
>> --- a/include/linux/ptr_ring.h
>> +++ b/include/linux/ptr_ring.h
>> @@ -247,6 +247,22 @@ static inline void *__ptr_ring_consume(struct
>> ptr_ring *r)
>> return ptr;
>> }
>>
>> +static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
>> + void **array, int n)
>> +{
>> + void *ptr;
>> + int i = 0;
>> +
>> + while (i < n) {
>
> Hm, why not *for*?

Yes, it maybe better, if there's other comment on the series, will
change it in next version.

Thanks

2017-03-22 13:43:44

by Michael S. Tsirkin

[permalink] [raw]
Subject: Re: [PATCH net-next 1/8] ptr_ring: introduce batch dequeuing

On Tue, Mar 21, 2017 at 12:04:40PM +0800, Jason Wang wrote:
> Signed-off-by: Jason Wang <[email protected]>
> ---
> include/linux/ptr_ring.h | 65 ++++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 65 insertions(+)
>
> diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
> index 6c70444..4771ded 100644
> --- a/include/linux/ptr_ring.h
> +++ b/include/linux/ptr_ring.h
> @@ -247,6 +247,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r)
> return ptr;
> }
>
> +static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
> + void **array, int n)
> +{
> + void *ptr;
> + int i = 0;
> +
> + while (i < n) {
> + ptr = __ptr_ring_consume(r);
> + if (!ptr)
> + break;
> + array[i++] = ptr;
> + }
> +
> + return i;
> +}
> +
> /*
> * Note: resize (below) nests producer lock within consumer lock, so if you
> * call this in interrupt or BH context, you must disable interrupts/BH when


This ignores the comment above that function:

/* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax().
*/

Also - it looks like it shouldn't matter if reads are reordered but I wonder.
Thoughts? Including some reasoning about it in commit log would be nice.

> @@ -297,6 +313,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
> return ptr;
> }
>
> +static inline int ptr_ring_consume_batched(struct ptr_ring *r,
> + void **array, int n)
> +{
> + int ret;
> +
> + spin_lock(&r->consumer_lock);
> + ret = __ptr_ring_consume_batched(r, array, n);
> + spin_unlock(&r->consumer_lock);
> +
> + return ret;
> +}
> +
> +static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
> + void **array, int n)
> +{
> + int ret;
> +
> + spin_lock_irq(&r->consumer_lock);
> + ret = __ptr_ring_consume_batched(r, array, n);
> + spin_unlock_irq(&r->consumer_lock);
> +
> + return ret;
> +}
> +
> +static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
> + void **array, int n)
> +{
> + unsigned long flags;
> + int ret;
> +
> + spin_lock_irqsave(&r->consumer_lock, flags);
> + ret = __ptr_ring_consume_batched(r, array, n);
> + spin_unlock_irqrestore(&r->consumer_lock, flags);
> +
> + return ret;
> +}
> +
> +static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
> + void **array, int n)
> +{
> + int ret;
> +
> + spin_lock_bh(&r->consumer_lock);
> + ret = __ptr_ring_consume_batched(r, array, n);
> + spin_unlock_bh(&r->consumer_lock);
> +
> + return ret;
> +}
> +
> /* Cast to structure type and call a function without discarding from FIFO.
> * Function must return a value.
> * Callers must take consumer_lock.
> --
> 2.7.4

2017-03-22 14:16:33

by Michael S. Tsirkin

[permalink] [raw]
Subject: Re: [PATCH net-next 7/8] vhost_net: try batch dequing from skb array

On Tue, Mar 21, 2017 at 12:04:46PM +0800, Jason Wang wrote:
> We used to dequeue one skb during recvmsg() from skb_array, this could
> be inefficient because of the bad cache utilization and spinlock
> touching for each packet. This patch tries to batch them by calling
> batch dequeuing helpers explicitly on the exported skb array and pass
> the skb back through msg_control for underlayer socket to finish the
> userspace copying.
>
> Tests were done by XDP1:
> - small buffer:
> Before: 1.88Mpps
> After : 2.25Mpps (+19.6%)
> - mergeable buffer:
> Before: 1.83Mpps
> After : 2.10Mpps (+14.7%)
>
> Signed-off-by: Jason Wang <[email protected]>
> ---
> drivers/vhost/net.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++----
> 1 file changed, 60 insertions(+), 4 deletions(-)
>
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index 9b51989..53f09f2 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -28,6 +28,8 @@
> #include <linux/if_macvlan.h>
> #include <linux/if_tap.h>
> #include <linux/if_vlan.h>
> +#include <linux/skb_array.h>
> +#include <linux/skbuff.h>
>
> #include <net/sock.h>
>
> @@ -85,6 +87,7 @@ struct vhost_net_ubuf_ref {
> struct vhost_virtqueue *vq;
> };
>
> +#define VHOST_RX_BATCH 64
> struct vhost_net_virtqueue {
> struct vhost_virtqueue vq;
> size_t vhost_hlen;
> @@ -99,6 +102,10 @@ struct vhost_net_virtqueue {
> /* Reference counting for outstanding ubufs.
> * Protected by vq mutex. Writers must also take device mutex. */
> struct vhost_net_ubuf_ref *ubufs;
> + struct skb_array *rx_array;
> + void *rxq[VHOST_RX_BATCH];
> + int rt;
> + int rh;
> };
>
> struct vhost_net {
> @@ -201,6 +208,8 @@ static void vhost_net_vq_reset(struct vhost_net *n)
> n->vqs[i].ubufs = NULL;
> n->vqs[i].vhost_hlen = 0;
> n->vqs[i].sock_hlen = 0;
> + n->vqs[i].rt = 0;
> + n->vqs[i].rh = 0;
> }
>
> }
> @@ -503,13 +512,30 @@ static void handle_tx(struct vhost_net *net)
> mutex_unlock(&vq->mutex);
> }
>
> -static int peek_head_len(struct sock *sk)
> +static int peek_head_len_batched(struct vhost_net_virtqueue *rvq)

Pls rename to say what it actually does: fetch skbs

> +{
> + if (rvq->rh != rvq->rt)
> + goto out;
> +
> + rvq->rh = rvq->rt = 0;
> + rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
> + VHOST_RX_BATCH);

A comment explaining why is is -bh would be helpful.

> + if (!rvq->rt)
> + return 0;
> +out:
> + return __skb_array_len_with_tag(rvq->rxq[rvq->rh]);
> +}
> +
> +static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
> {
> struct socket *sock = sk->sk_socket;
> struct sk_buff *head;
> int len = 0;
> unsigned long flags;
>
> + if (rvq->rx_array)
> + return peek_head_len_batched(rvq);
> +
> if (sock->ops->peek_len)
> return sock->ops->peek_len(sock);
>
> @@ -535,12 +561,14 @@ static int sk_has_rx_data(struct sock *sk)
> return skb_queue_empty(&sk->sk_receive_queue);
> }
>
> -static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
> +static int vhost_net_rx_peek_head_len(struct vhost_net *net,
> + struct sock *sk)
> {
> + struct vhost_net_virtqueue *rvq = &net->vqs[VHOST_NET_VQ_RX];
> struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
> struct vhost_virtqueue *vq = &nvq->vq;
> unsigned long uninitialized_var(endtime);
> - int len = peek_head_len(sk);
> + int len = peek_head_len(rvq, sk);
>
> if (!len && vq->busyloop_timeout) {
> /* Both tx vq and rx socket were polled here */
> @@ -561,7 +589,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
> vhost_poll_queue(&vq->poll);
> mutex_unlock(&vq->mutex);
>
> - len = peek_head_len(sk);
> + len = peek_head_len(rvq, sk);
> }
>
> return len;
> @@ -699,6 +727,8 @@ static void handle_rx(struct vhost_net *net)
> /* On error, stop handling until the next kick. */
> if (unlikely(headcount < 0))
> goto out;
> + if (nvq->rx_array)
> + msg.msg_control = nvq->rxq[nvq->rh++];
> /* On overrun, truncate and discard */
> if (unlikely(headcount > UIO_MAXIOV)) {
> iov_iter_init(&msg.msg_iter, READ, vq->iov, 1, 1);
> @@ -841,6 +871,8 @@ static int vhost_net_open(struct inode *inode, struct file *f)
> n->vqs[i].done_idx = 0;
> n->vqs[i].vhost_hlen = 0;
> n->vqs[i].sock_hlen = 0;
> + n->vqs[i].rt = 0;
> + n->vqs[i].rh = 0;
> }
> vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX);
>
> @@ -856,11 +888,15 @@ static struct socket *vhost_net_stop_vq(struct vhost_net *n,
> struct vhost_virtqueue *vq)
> {
> struct socket *sock;
> + struct vhost_net_virtqueue *nvq =
> + container_of(vq, struct vhost_net_virtqueue, vq);
>
> mutex_lock(&vq->mutex);
> sock = vq->private_data;
> vhost_net_disable_vq(n, vq);
> vq->private_data = NULL;
> + while (nvq->rh != nvq->rt)
> + kfree_skb(nvq->rxq[nvq->rh++]);
> mutex_unlock(&vq->mutex);
> return sock;
> }
> @@ -953,6 +989,25 @@ static struct socket *get_raw_socket(int fd)
> return ERR_PTR(r);
> }
>
> +static struct skb_array *get_tap_skb_array(int fd)
> +{
> + struct skb_array *array;
> + struct file *file = fget(fd);
> +
> + if (!file)
> + return NULL;
> + array = tun_get_skb_array(file);
> + if (!IS_ERR(array))
> + goto out;
> + array = tap_get_skb_array(file);
> + if (!IS_ERR(array))
> + goto out;
> + array = NULL;
> +out:
> + fput(file);
> + return array;
> +}
> +
> static struct socket *get_tap_socket(int fd)
> {
> struct file *file = fget(fd);
> @@ -1029,6 +1084,7 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
>
> vhost_net_disable_vq(n, vq);
> vq->private_data = sock;
> + nvq->rx_array = get_tap_skb_array(fd);
> r = vhost_vq_init_access(vq);
> if (r)
> goto err_used;
> --
> 2.7.4

2017-03-23 08:01:25

by Jason Wang

[permalink] [raw]
Subject: Re: [PATCH net-next 1/8] ptr_ring: introduce batch dequeuing



On 2017年03月22日 21:43, Michael S. Tsirkin wrote:
> On Tue, Mar 21, 2017 at 12:04:40PM +0800, Jason Wang wrote:
>> Signed-off-by: Jason Wang <[email protected]>
>> ---
>> include/linux/ptr_ring.h | 65 ++++++++++++++++++++++++++++++++++++++++++++++++
>> 1 file changed, 65 insertions(+)
>>
>> diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
>> index 6c70444..4771ded 100644
>> --- a/include/linux/ptr_ring.h
>> +++ b/include/linux/ptr_ring.h
>> @@ -247,6 +247,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r)
>> return ptr;
>> }
>>
>> +static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
>> + void **array, int n)
>> +{
>> + void *ptr;
>> + int i = 0;
>> +
>> + while (i < n) {
>> + ptr = __ptr_ring_consume(r);
>> + if (!ptr)
>> + break;
>> + array[i++] = ptr;
>> + }
>> +
>> + return i;
>> +}
>> +
>> /*
>> * Note: resize (below) nests producer lock within consumer lock, so if you
>> * call this in interrupt or BH context, you must disable interrupts/BH when
>
> This ignores the comment above that function:
>
> /* Note: callers invoking this in a loop must use a compiler barrier,
> * for example cpu_relax().
> */

Yes, __ptr_ring_swap_queue() ignores this too.

>
> Also - it looks like it shouldn't matter if reads are reordered but I wonder.
> Thoughts? Including some reasoning about it in commit log would be nice.

Yes, I think it doesn't matter in this case, it matters only for batched
producing.

Thanks

>
>> @@ -297,6 +313,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
>> return ptr;
>> }
>>
>> +static inline int ptr_ring_consume_batched(struct ptr_ring *r,
>> + void **array, int n)
>> +{
>> + int ret;
>> +
>> + spin_lock(&r->consumer_lock);
>> + ret = __ptr_ring_consume_batched(r, array, n);
>> + spin_unlock(&r->consumer_lock);
>> +
>> + return ret;
>> +}
>> +
>> +static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
>> + void **array, int n)
>> +{
>> + int ret;
>> +
>> + spin_lock_irq(&r->consumer_lock);
>> + ret = __ptr_ring_consume_batched(r, array, n);
>> + spin_unlock_irq(&r->consumer_lock);
>> +
>> + return ret;
>> +}
>> +
>> +static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
>> + void **array, int n)
>> +{
>> + unsigned long flags;
>> + int ret;
>> +
>> + spin_lock_irqsave(&r->consumer_lock, flags);
>> + ret = __ptr_ring_consume_batched(r, array, n);
>> + spin_unlock_irqrestore(&r->consumer_lock, flags);
>> +
>> + return ret;
>> +}
>> +
>> +static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
>> + void **array, int n)
>> +{
>> + int ret;
>> +
>> + spin_lock_bh(&r->consumer_lock);
>> + ret = __ptr_ring_consume_batched(r, array, n);
>> + spin_unlock_bh(&r->consumer_lock);
>> +
>> + return ret;
>> +}
>> +
>> /* Cast to structure type and call a function without discarding from FIFO.
>> * Function must return a value.
>> * Callers must take consumer_lock.
>> --
>> 2.7.4

2017-03-23 09:10:09

by Jason Wang

[permalink] [raw]
Subject: Re: [PATCH net-next 7/8] vhost_net: try batch dequing from skb array



On 2017年03月22日 22:16, Michael S. Tsirkin wrote:
> On Tue, Mar 21, 2017 at 12:04:46PM +0800, Jason Wang wrote:
>> We used to dequeue one skb during recvmsg() from skb_array, this could
>> be inefficient because of the bad cache utilization and spinlock
>> touching for each packet. This patch tries to batch them by calling
>> batch dequeuing helpers explicitly on the exported skb array and pass
>> the skb back through msg_control for underlayer socket to finish the
>> userspace copying.
>>
>> Tests were done by XDP1:
>> - small buffer:
>> Before: 1.88Mpps
>> After : 2.25Mpps (+19.6%)
>> - mergeable buffer:
>> Before: 1.83Mpps
>> After : 2.10Mpps (+14.7%)
>>
>> Signed-off-by: Jason Wang <[email protected]>
>> ---
>> drivers/vhost/net.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++----
>> 1 file changed, 60 insertions(+), 4 deletions(-)
>>
>> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
>> index 9b51989..53f09f2 100644
>> --- a/drivers/vhost/net.c
>> +++ b/drivers/vhost/net.c
>> @@ -28,6 +28,8 @@
>> #include <linux/if_macvlan.h>
>> #include <linux/if_tap.h>
>> #include <linux/if_vlan.h>
>> +#include <linux/skb_array.h>
>> +#include <linux/skbuff.h>
>>
>> #include <net/sock.h>
>>
>> @@ -85,6 +87,7 @@ struct vhost_net_ubuf_ref {
>> struct vhost_virtqueue *vq;
>> };
>>
>> +#define VHOST_RX_BATCH 64
>> struct vhost_net_virtqueue {
>> struct vhost_virtqueue vq;
>> size_t vhost_hlen;
>> @@ -99,6 +102,10 @@ struct vhost_net_virtqueue {
>> /* Reference counting for outstanding ubufs.
>> * Protected by vq mutex. Writers must also take device mutex. */
>> struct vhost_net_ubuf_ref *ubufs;
>> + struct skb_array *rx_array;
>> + void *rxq[VHOST_RX_BATCH];
>> + int rt;
>> + int rh;
>> };
>>
>> struct vhost_net {
>> @@ -201,6 +208,8 @@ static void vhost_net_vq_reset(struct vhost_net *n)
>> n->vqs[i].ubufs = NULL;
>> n->vqs[i].vhost_hlen = 0;
>> n->vqs[i].sock_hlen = 0;
>> + n->vqs[i].rt = 0;
>> + n->vqs[i].rh = 0;
>> }
>>
>> }
>> @@ -503,13 +512,30 @@ static void handle_tx(struct vhost_net *net)
>> mutex_unlock(&vq->mutex);
>> }
>>
>> -static int peek_head_len(struct sock *sk)
>> +static int peek_head_len_batched(struct vhost_net_virtqueue *rvq)
> Pls rename to say what it actually does: fetch skbs

Ok.

>
>> +{
>> + if (rvq->rh != rvq->rt)
>> + goto out;
>> +
>> + rvq->rh = rvq->rt = 0;
>> + rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
>> + VHOST_RX_BATCH);
> A comment explaining why is is -bh would be helpful.

Ok.

Thanks

2017-03-29 09:58:42

by Jason Wang

[permalink] [raw]
Subject: Re: [PATCH net-next 7/8] vhost_net: try batch dequing from skb array



On 2017年03月23日 13:34, Jason Wang wrote:
>
>
>>
>>> +{
>>> + if (rvq->rh != rvq->rt)
>>> + goto out;
>>> +
>>> + rvq->rh = rvq->rt = 0;
>>> + rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
>>> + VHOST_RX_BATCH);
>> A comment explaining why is is -bh would be helpful.
>
> Ok.
>
> Thanks

Rethink about this. It looks like -bh is not needed in this case since
no consumer run in bh.

Thanks

2017-03-29 10:46:54

by Pankaj Gupta

[permalink] [raw]
Subject: Re: [PATCH net-next 7/8] vhost_net: try batch dequing from skb array

Hi Jason,

>
> On 2017年03月23日 13:34, Jason Wang wrote:
> >
> >
> >>
> >>> +{
> >>> + if (rvq->rh != rvq->rt)
> >>> + goto out;
> >>> +
> >>> + rvq->rh = rvq->rt = 0;
> >>> + rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
> >>> + VHOST_RX_BATCH);
> >> A comment explaining why is is -bh would be helpful.
> >
> > Ok.
> >
> > Thanks
>
> Rethink about this. It looks like -bh is not needed in this case since
> no consumer run in bh.

In that case do we need other variants of "ptr_ring_consume_batched_*()" functions.
Are we planning to use them in future?

>
> Thanks
>

2017-03-29 10:54:11

by Jason Wang

[permalink] [raw]
Subject: Re: [PATCH net-next 7/8] vhost_net: try batch dequing from skb array



On 2017年03月29日 18:46, Pankaj Gupta wrote:
> Hi Jason,
>
>> On 2017年03月23日 13:34, Jason Wang wrote:
>>>
>>>>> +{
>>>>> + if (rvq->rh != rvq->rt)
>>>>> + goto out;
>>>>> +
>>>>> + rvq->rh = rvq->rt = 0;
>>>>> + rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
>>>>> + VHOST_RX_BATCH);
>>>> A comment explaining why is is -bh would be helpful.
>>> Ok.
>>>
>>> Thanks
>> Rethink about this. It looks like -bh is not needed in this case since
>> no consumer run in bh.
> In that case do we need other variants of "ptr_ring_consume_batched_*()" functions.
> Are we planning to use them in future?

I think we'd better keep them, since it serves as helpers. You can see
that not all the helpers in ptr_ring has real users, but they were
prepared for the future use.

Thanks

>
>> Thanks
>>

2017-03-29 12:08:41

by Michael S. Tsirkin

[permalink] [raw]
Subject: Re: [PATCH net-next 8/8] vhost_net: use lockless peeking for skb array during busy polling

On Tue, Mar 21, 2017 at 12:04:47PM +0800, Jason Wang wrote:
> For the socket that exports its skb array, we can use lockless polling
> to avoid touching spinlock during busy polling.
>
> Signed-off-by: Jason Wang <[email protected]>
> ---
> drivers/vhost/net.c | 7 +++++--
> 1 file changed, 5 insertions(+), 2 deletions(-)
>
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index 53f09f2..41153a3 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -551,10 +551,13 @@ static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
> return len;
> }
>
> -static int sk_has_rx_data(struct sock *sk)
> +static int sk_has_rx_data(struct vhost_net_virtqueue *rvq, struct sock *sk)
> {
> struct socket *sock = sk->sk_socket;
>
> + if (rvq->rx_array)
> + return !__skb_array_empty(rvq->rx_array);
> +
> if (sock->ops->peek_len)
> return sock->ops->peek_len(sock);
>

I don't see which patch adds __skb_array_empty.

> @@ -579,7 +582,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net,
> endtime = busy_clock() + vq->busyloop_timeout;
>
> while (vhost_can_busy_poll(&net->dev, endtime) &&
> - !sk_has_rx_data(sk) &&
> + !sk_has_rx_data(rvq, sk) &&
> vhost_vq_avail_empty(&net->dev, vq))
> cpu_relax();
>
> --
> 2.7.4

2017-03-29 21:47:51

by Michael S. Tsirkin

[permalink] [raw]
Subject: Re: [PATCH net-next 7/8] vhost_net: try batch dequing from skb array

On Wed, Mar 29, 2017 at 06:53:27PM +0800, Jason Wang wrote:
>
>
> On 2017年03月29日 18:46, Pankaj Gupta wrote:
> > Hi Jason,
> >
> > > On 2017年03月23日 13:34, Jason Wang wrote:
> > > >
> > > > > > +{
> > > > > > + if (rvq->rh != rvq->rt)
> > > > > > + goto out;
> > > > > > +
> > > > > > + rvq->rh = rvq->rt = 0;
> > > > > > + rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
> > > > > > + VHOST_RX_BATCH);
> > > > > A comment explaining why is is -bh would be helpful.
> > > > Ok.
> > > >
> > > > Thanks
> > > Rethink about this. It looks like -bh is not needed in this case since
> > > no consumer run in bh.
> > In that case do we need other variants of "ptr_ring_consume_batched_*()" functions.
> > Are we planning to use them in future?
>
> I think we'd better keep them, since it serves as helpers. You can see that
> not all the helpers in ptr_ring has real users, but they were prepared for
> the future use.
>
> Thanks

Makes sense for basic building blocks but I'm not sure we
need to do it for all APIs.


> >
> > > Thanks
> > >

2017-03-30 02:16:24

by Jason Wang

[permalink] [raw]
Subject: Re: [PATCH net-next 8/8] vhost_net: use lockless peeking for skb array during busy polling



On 2017年03月29日 20:07, Michael S. Tsirkin wrote:
> On Tue, Mar 21, 2017 at 12:04:47PM +0800, Jason Wang wrote:
>> For the socket that exports its skb array, we can use lockless polling
>> to avoid touching spinlock during busy polling.
>>
>> Signed-off-by: Jason Wang <[email protected]>
>> ---
>> drivers/vhost/net.c | 7 +++++--
>> 1 file changed, 5 insertions(+), 2 deletions(-)
>>
>> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
>> index 53f09f2..41153a3 100644
>> --- a/drivers/vhost/net.c
>> +++ b/drivers/vhost/net.c
>> @@ -551,10 +551,13 @@ static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
>> return len;
>> }
>>
>> -static int sk_has_rx_data(struct sock *sk)
>> +static int sk_has_rx_data(struct vhost_net_virtqueue *rvq, struct sock *sk)
>> {
>> struct socket *sock = sk->sk_socket;
>>
>> + if (rvq->rx_array)
>> + return !__skb_array_empty(rvq->rx_array);
>> +
>> if (sock->ops->peek_len)
>> return sock->ops->peek_len(sock);
>>
> I don't see which patch adds __skb_array_empty.

This is not something new, it was introduced by ad69f35d1dc0a
("skb_array: array based FIFO for skbs").

Thanks

>
>> @@ -579,7 +582,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net,
>> endtime = busy_clock() + vq->busyloop_timeout;
>>
>> while (vhost_can_busy_poll(&net->dev, endtime) &&
>> - !sk_has_rx_data(sk) &&
>> + !sk_has_rx_data(rvq, sk) &&
>> vhost_vq_avail_empty(&net->dev, vq))
>> cpu_relax();
>>
>> --
>> 2.7.4

2017-03-30 02:33:37

by Michael S. Tsirkin

[permalink] [raw]
Subject: Re: [PATCH net-next 8/8] vhost_net: use lockless peeking for skb array during busy polling

On Thu, Mar 30, 2017 at 10:16:15AM +0800, Jason Wang wrote:
>
>
> On 2017年03月29日 20:07, Michael S. Tsirkin wrote:
> > On Tue, Mar 21, 2017 at 12:04:47PM +0800, Jason Wang wrote:
> > > For the socket that exports its skb array, we can use lockless polling
> > > to avoid touching spinlock during busy polling.
> > >
> > > Signed-off-by: Jason Wang <[email protected]>
> > > ---
> > > drivers/vhost/net.c | 7 +++++--
> > > 1 file changed, 5 insertions(+), 2 deletions(-)
> > >
> > > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> > > index 53f09f2..41153a3 100644
> > > --- a/drivers/vhost/net.c
> > > +++ b/drivers/vhost/net.c
> > > @@ -551,10 +551,13 @@ static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
> > > return len;
> > > }
> > > -static int sk_has_rx_data(struct sock *sk)
> > > +static int sk_has_rx_data(struct vhost_net_virtqueue *rvq, struct sock *sk)
> > > {
> > > struct socket *sock = sk->sk_socket;
> > > + if (rvq->rx_array)
> > > + return !__skb_array_empty(rvq->rx_array);
> > > +
> > > if (sock->ops->peek_len)
> > > return sock->ops->peek_len(sock);
> > I don't see which patch adds __skb_array_empty.
>
> This is not something new, it was introduced by ad69f35d1dc0a ("skb_array:
> array based FIFO for skbs").
>
> Thanks

Same comment about a compiler barrier applies then.

> >
> > > @@ -579,7 +582,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net,
> > > endtime = busy_clock() + vq->busyloop_timeout;
> > > while (vhost_can_busy_poll(&net->dev, endtime) &&
> > > - !sk_has_rx_data(sk) &&
> > > + !sk_has_rx_data(rvq, sk) &&
> > > vhost_vq_avail_empty(&net->dev, vq))
> > > cpu_relax();
> > > --
> > > 2.7.4

2017-03-30 03:53:25

by Jason Wang

[permalink] [raw]
Subject: Re: [PATCH net-next 8/8] vhost_net: use lockless peeking for skb array during busy polling



On 2017年03月30日 10:33, Michael S. Tsirkin wrote:
> On Thu, Mar 30, 2017 at 10:16:15AM +0800, Jason Wang wrote:
>>
>> On 2017年03月29日 20:07, Michael S. Tsirkin wrote:
>>> On Tue, Mar 21, 2017 at 12:04:47PM +0800, Jason Wang wrote:
>>>> For the socket that exports its skb array, we can use lockless polling
>>>> to avoid touching spinlock during busy polling.
>>>>
>>>> Signed-off-by: Jason Wang <[email protected]>
>>>> ---
>>>> drivers/vhost/net.c | 7 +++++--
>>>> 1 file changed, 5 insertions(+), 2 deletions(-)
>>>>
>>>> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
>>>> index 53f09f2..41153a3 100644
>>>> --- a/drivers/vhost/net.c
>>>> +++ b/drivers/vhost/net.c
>>>> @@ -551,10 +551,13 @@ static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
>>>> return len;
>>>> }
>>>> -static int sk_has_rx_data(struct sock *sk)
>>>> +static int sk_has_rx_data(struct vhost_net_virtqueue *rvq, struct sock *sk)
>>>> {
>>>> struct socket *sock = sk->sk_socket;
>>>> + if (rvq->rx_array)
>>>> + return !__skb_array_empty(rvq->rx_array);
>>>> +
>>>> if (sock->ops->peek_len)
>>>> return sock->ops->peek_len(sock);
>>> I don't see which patch adds __skb_array_empty.
>> This is not something new, it was introduced by ad69f35d1dc0a ("skb_array:
>> array based FIFO for skbs").
>>
>> Thanks
> Same comment about a compiler barrier applies then.

Ok, rethink about this, since skb_array could be resized, using lockless
version seems wrong.

For the comment of using compiler barrier, caller
(vhost_net_rx_peek_head_len) uses cpu_relax(). But I haven't figured out
why a compiler barrier is needed here. Could you please explain?

Thanks

>
>>>> @@ -579,7 +582,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net,
>>>> endtime = busy_clock() + vq->busyloop_timeout;
>>>> while (vhost_can_busy_poll(&net->dev, endtime) &&
>>>> - !sk_has_rx_data(sk) &&
>>>> + !sk_has_rx_data(rvq, sk) &&
>>>> vhost_vq_avail_empty(&net->dev, vq))
>>>> cpu_relax();
>>>> --
>>>> 2.7.4