2010-04-01 09:12:23

by Xin, Xiaohui

[permalink] [raw]
Subject: Re:[PATCH v1 2/3] Provides multiple submits and asynchronous notifications.

The vhost-net backend now only supports synchronous send/recv
operations. The patch provides multiple submits and asynchronous
notifications. This is needed for zero-copy case.

Signed-off-by: Xin Xiaohui <[email protected]>
---

Michael,
Now, I made vhost to alloc/destroy the kiocb, and transfer it from
sendmsg/recvmsg. I did not remove vq->receiver, since what the
callback does is related to the structures owned by mp device,
and I think isolation them to vhost is a good thing to us all.
And it will not prevent mp device to be independent of vhost
in future. Later, when mp device can be a real device which
provides asynchronous read/write operations and not just report
proto_ops, it will use another callback function which is not
related to vhost at all.

For the write logging, do you have a function in hand that we can
recompute the log? If that, I think I can use it to recompute the
log info when the logging is suddenly enabled.
For the outstanding requests, do you mean all the user buffers have
submitted before the logging ioctl changed? That may be a lot, and
some of them are still in NIC ring descriptors. Waiting them to be
finished may be need some time. I think when logging ioctl changed,
then the logging is changed just after that is also reasonable.

Thanks
Xiaohui

drivers/vhost/net.c | 189 +++++++++++++++++++++++++++++++++++++++++++++++--
drivers/vhost/vhost.h | 10 +++
2 files changed, 192 insertions(+), 7 deletions(-)

diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index 22d5fef..2aafd90 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -17,11 +17,13 @@
#include <linux/workqueue.h>
#include <linux/rcupdate.h>
#include <linux/file.h>
+#include <linux/aio.h>

#include <linux/net.h>
#include <linux/if_packet.h>
#include <linux/if_arp.h>
#include <linux/if_tun.h>
+#include <linux/mpassthru.h>

#include <net/sock.h>

@@ -47,6 +49,7 @@ struct vhost_net {
struct vhost_dev dev;
struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
struct vhost_poll poll[VHOST_NET_VQ_MAX];
+ struct kmem_cache *cache;
/* Tells us whether we are polling a socket for TX.
* We only do this when socket buffer fills up.
* Protected by tx vq lock. */
@@ -91,11 +94,88 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
net->tx_poll_state = VHOST_NET_POLL_STARTED;
}

+struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
+{
+ struct kiocb *iocb = NULL;
+ unsigned long flags;
+
+ spin_lock_irqsave(&vq->notify_lock, flags);
+ if (!list_empty(&vq->notifier)) {
+ iocb = list_first_entry(&vq->notifier,
+ struct kiocb, ki_list);
+ list_del(&iocb->ki_list);
+ }
+ spin_unlock_irqrestore(&vq->notify_lock, flags);
+ return iocb;
+}
+
+static void handle_async_rx_events_notify(struct vhost_net *net,
+ struct vhost_virtqueue *vq)
+{
+ struct kiocb *iocb = NULL;
+ struct vhost_log *vq_log = NULL;
+ int rx_total_len = 0;
+ int log, size;
+
+ if (vq->link_state != VHOST_VQ_LINK_ASYNC)
+ return;
+
+ if (vq->receiver)
+ vq->receiver(vq);
+
+ vq_log = unlikely(vhost_has_feature(
+ &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
+ while ((iocb = notify_dequeue(vq)) != NULL) {
+ vhost_add_used_and_signal(&net->dev, vq,
+ iocb->ki_pos, iocb->ki_nbytes);
+ log = (int)iocb->ki_user_data;
+ size = iocb->ki_nbytes;
+ rx_total_len += iocb->ki_nbytes;
+
+ if (iocb->ki_dtor)
+ iocb->ki_dtor(iocb);
+ kmem_cache_free(net->cache, iocb);
+
+ if (unlikely(vq_log))
+ vhost_log_write(vq, vq_log, log, size);
+ if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
+ vhost_poll_queue(&vq->poll);
+ break;
+ }
+ }
+}
+
+static void handle_async_tx_events_notify(struct vhost_net *net,
+ struct vhost_virtqueue *vq)
+{
+ struct kiocb *iocb = NULL;
+ int tx_total_len = 0;
+
+ if (vq->link_state != VHOST_VQ_LINK_ASYNC)
+ return;
+
+ while ((iocb = notify_dequeue(vq)) != NULL) {
+ vhost_add_used_and_signal(&net->dev, vq,
+ iocb->ki_pos, 0);
+ tx_total_len += iocb->ki_nbytes;
+
+ if (iocb->ki_dtor)
+ iocb->ki_dtor(iocb);
+
+ kmem_cache_free(net->cache, iocb);
+ if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
+ vhost_poll_queue(&vq->poll);
+ break;
+ }
+ }
+}
+
/* Expects to be always run from workqueue - which acts as
* read-size critical section for our kind of RCU. */
static void handle_tx(struct vhost_net *net)
{
struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
+ struct kiocb *iocb = NULL;
unsigned head, out, in, s;
struct msghdr msg = {
.msg_name = NULL,
@@ -124,6 +204,8 @@ static void handle_tx(struct vhost_net *net)
tx_poll_stop(net);
hdr_size = vq->hdr_size;

+ handle_async_tx_events_notify(net, vq);
+
for (;;) {
head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
ARRAY_SIZE(vq->iov),
@@ -151,6 +233,15 @@ static void handle_tx(struct vhost_net *net)
/* Skip header. TODO: support TSO. */
s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
msg.msg_iovlen = out;
+
+ if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
+ iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
+ if (!iocb)
+ break;
+ iocb->ki_pos = head;
+ iocb->private = (void *)vq;
+ }
+
len = iov_length(vq->iov, out);
/* Sanity check */
if (!len) {
@@ -160,12 +251,16 @@ static void handle_tx(struct vhost_net *net)
break;
}
/* TODO: Check specific error and bomb out unless ENOBUFS? */
- err = sock->ops->sendmsg(NULL, sock, &msg, len);
+ err = sock->ops->sendmsg(iocb, sock, &msg, len);
if (unlikely(err < 0)) {
vhost_discard_vq_desc(vq);
tx_poll_start(net, sock);
break;
}
+
+ if (vq->link_state == VHOST_VQ_LINK_ASYNC)
+ continue;
+
if (err != len)
pr_err("Truncated TX packet: "
" len %d != %zd\n", err, len);
@@ -177,6 +272,8 @@ static void handle_tx(struct vhost_net *net)
}
}

+ handle_async_tx_events_notify(net, vq);
+
mutex_unlock(&vq->mutex);
unuse_mm(net->dev.mm);
}
@@ -186,6 +283,7 @@ static void handle_tx(struct vhost_net *net)
static void handle_rx(struct vhost_net *net)
{
struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
+ struct kiocb *iocb = NULL;
unsigned head, out, in, log, s;
struct vhost_log *vq_log;
struct msghdr msg = {
@@ -206,7 +304,8 @@ static void handle_rx(struct vhost_net *net)
int err;
size_t hdr_size;
struct socket *sock = rcu_dereference(vq->private_data);
- if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
+ if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
+ vq->link_state == VHOST_VQ_LINK_SYNC))
return;

use_mm(net->dev.mm);
@@ -214,9 +313,18 @@ static void handle_rx(struct vhost_net *net)
vhost_disable_notify(vq);
hdr_size = vq->hdr_size;

- vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
+ /* In async cases, for write logging, the simple way is to get
+ * the log info always, and really logging is decided later.
+ * Thus, when logging enabled, we can get log, and when logging
+ * disabled, we can get log disabled accordingly.
+ */
+
+ vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
+ (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
vq->log : NULL;

+ handle_async_rx_events_notify(net, vq);
+
for (;;) {
head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
ARRAY_SIZE(vq->iov),
@@ -245,6 +353,14 @@ static void handle_rx(struct vhost_net *net)
s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
msg.msg_iovlen = in;
len = iov_length(vq->iov, in);
+ if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
+ iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
+ if (!iocb)
+ break;
+ iocb->private = vq;
+ iocb->ki_pos = head;
+ iocb->ki_user_data = log;
+ }
/* Sanity check */
if (!len) {
vq_err(vq, "Unexpected header len for RX: "
@@ -252,13 +368,18 @@ static void handle_rx(struct vhost_net *net)
iov_length(vq->hdr, s), hdr_size);
break;
}
- err = sock->ops->recvmsg(NULL, sock, &msg,
+
+ err = sock->ops->recvmsg(iocb, sock, &msg,
len, MSG_DONTWAIT | MSG_TRUNC);
/* TODO: Check specific error and bomb out unless EAGAIN? */
if (err < 0) {
vhost_discard_vq_desc(vq);
break;
}
+
+ if (vq->link_state == VHOST_VQ_LINK_ASYNC)
+ continue;
+
/* TODO: Should check and handle checksum. */
if (err > len) {
pr_err("Discarded truncated rx packet: "
@@ -284,10 +405,13 @@ static void handle_rx(struct vhost_net *net)
}
}

+ handle_async_rx_events_notify(net, vq);
+
mutex_unlock(&vq->mutex);
unuse_mm(net->dev.mm);
}

+
static void handle_tx_kick(struct work_struct *work)
{
struct vhost_virtqueue *vq;
@@ -338,6 +462,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
n->tx_poll_state = VHOST_NET_POLL_DISABLED;
+ n->cache = NULL;
return 0;
}

@@ -398,6 +523,17 @@ static void vhost_net_flush(struct vhost_net *n)
vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
}

+static void vhost_notifier_cleanup(struct vhost_net *n)
+{
+ struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
+ struct kiocb *iocb = NULL;
+ if (n->cache) {
+ while ((iocb = notify_dequeue(vq)) != NULL)
+ kmem_cache_free(n->cache, iocb);
+ kmem_cache_destroy(n->cache);
+ }
+}
+
static int vhost_net_release(struct inode *inode, struct file *f)
{
struct vhost_net *n = f->private_data;
@@ -414,6 +550,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
/* We do an extra flush before freeing memory,
* since jobs can re-queue themselves. */
vhost_net_flush(n);
+ vhost_notifier_cleanup(n);
kfree(n);
return 0;
}
@@ -462,7 +599,19 @@ static struct socket *get_tun_socket(int fd)
return sock;
}

-static struct socket *get_socket(int fd)
+static struct socket *get_mp_socket(int fd)
+{
+ struct file *file = fget(fd);
+ struct socket *sock;
+ if (!file)
+ return ERR_PTR(-EBADF);
+ sock = mp_get_socket(file);
+ if (IS_ERR(sock))
+ fput(file);
+ return sock;
+}
+
+static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
{
struct socket *sock;
if (fd == -1)
@@ -473,9 +622,31 @@ static struct socket *get_socket(int fd)
sock = get_tun_socket(fd);
if (!IS_ERR(sock))
return sock;
+ sock = get_mp_socket(fd);
+ if (!IS_ERR(sock)) {
+ vq->link_state = VHOST_VQ_LINK_ASYNC;
+ return sock;
+ }
return ERR_PTR(-ENOTSOCK);
}

+static void vhost_init_link_state(struct vhost_net *n, int index)
+{
+ struct vhost_virtqueue *vq = n->vqs + index;
+
+ WARN_ON(!mutex_is_locked(&vq->mutex));
+ if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
+ vq->receiver = NULL;
+ INIT_LIST_HEAD(&vq->notifier);
+ spin_lock_init(&vq->notify_lock);
+ if (!n->cache) {
+ n->cache = kmem_cache_create("vhost_kiocb",
+ sizeof(struct kiocb), 0,
+ SLAB_HWCACHE_ALIGN, NULL);
+ }
+ }
+}
+
static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
{
struct socket *sock, *oldsock;
@@ -493,12 +664,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
}
vq = n->vqs + index;
mutex_lock(&vq->mutex);
- sock = get_socket(fd);
+ vq->link_state = VHOST_VQ_LINK_SYNC;
+ sock = get_socket(vq, fd);
if (IS_ERR(sock)) {
r = PTR_ERR(sock);
goto err;
}

+ vhost_init_link_state(n, index);
+
/* start polling new socket */
oldsock = vq->private_data;
if (sock == oldsock)
@@ -507,8 +681,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
vhost_net_disable_vq(n, vq);
rcu_assign_pointer(vq->private_data, sock);
vhost_net_enable_vq(n, vq);
- mutex_unlock(&vq->mutex);
done:
+ mutex_unlock(&vq->mutex);
mutex_unlock(&n->dev.mutex);
if (oldsock) {
vhost_net_flush_vq(n, index);
@@ -516,6 +690,7 @@ done:
}
return r;
err:
+ mutex_unlock(&vq->mutex);
mutex_unlock(&n->dev.mutex);
return r;
}
diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
index d1f0453..cffe39a 100644
--- a/drivers/vhost/vhost.h
+++ b/drivers/vhost/vhost.h
@@ -43,6 +43,11 @@ struct vhost_log {
u64 len;
};

+enum vhost_vq_link_state {
+ VHOST_VQ_LINK_SYNC = 0,
+ VHOST_VQ_LINK_ASYNC = 1,
+};
+
/* The virtqueue structure describes a queue attached to a device. */
struct vhost_virtqueue {
struct vhost_dev *dev;
@@ -96,6 +101,11 @@ struct vhost_virtqueue {
/* Log write descriptors */
void __user *log_base;
struct vhost_log log[VHOST_NET_MAX_SG];
+ /*Differiate async socket for 0-copy from normal*/
+ enum vhost_vq_link_state link_state;
+ struct list_head notifier;
+ spinlock_t notify_lock;
+ void (*receiver)(struct vhost_virtqueue *);
};

struct vhost_dev {
--
1.5.4.4


2010-04-01 16:45:02

by Michael S. Tsirkin

[permalink] [raw]
Subject: Re: [PATCH v1 2/3] Provides multiple submits and asynchronous notifications.

On Thu, Apr 01, 2010 at 05:14:56PM +0800, Xin Xiaohui wrote:
> The vhost-net backend now only supports synchronous send/recv
> operations. The patch provides multiple submits and asynchronous
> notifications. This is needed for zero-copy case.
>
> Signed-off-by: Xin Xiaohui <[email protected]>
> ---
>
> Michael,
> Now, I made vhost to alloc/destroy the kiocb, and transfer it from
> sendmsg/recvmsg. I did not remove vq->receiver, since what the
> callback does is related to the structures owned by mp device,
> and I think isolation them to vhost is a good thing to us all.
> And it will not prevent mp device to be independent of vhost
> in future. Later, when mp device can be a real device which
> provides asynchronous read/write operations and not just report
> proto_ops, it will use another callback function which is not
> related to vhost at all.

Thanks, I'll look at the code!

> For the write logging, do you have a function in hand that we can
> recompute the log? If that, I think I can use it to recompute the
> log info when the logging is suddenly enabled.
> For the outstanding requests, do you mean all the user buffers have
> submitted before the logging ioctl changed? That may be a lot, and
> some of them are still in NIC ring descriptors. Waiting them to be
> finished may be need some time. I think when logging ioctl changed,
> then the logging is changed just after that is also reasonable.

The key point is that after loggin ioctl returns, any
subsequent change to memory must be logged. It does not
matter when was the request submitted, otherwise we will
get memory corruption on migration.

> Thanks
> Xiaohui
>
> drivers/vhost/net.c | 189 +++++++++++++++++++++++++++++++++++++++++++++++--
> drivers/vhost/vhost.h | 10 +++
> 2 files changed, 192 insertions(+), 7 deletions(-)
>
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index 22d5fef..2aafd90 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -17,11 +17,13 @@
> #include <linux/workqueue.h>
> #include <linux/rcupdate.h>
> #include <linux/file.h>
> +#include <linux/aio.h>
>
> #include <linux/net.h>
> #include <linux/if_packet.h>
> #include <linux/if_arp.h>
> #include <linux/if_tun.h>
> +#include <linux/mpassthru.h>
>
> #include <net/sock.h>
>
> @@ -47,6 +49,7 @@ struct vhost_net {
> struct vhost_dev dev;
> struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
> struct vhost_poll poll[VHOST_NET_VQ_MAX];
> + struct kmem_cache *cache;
> /* Tells us whether we are polling a socket for TX.
> * We only do this when socket buffer fills up.
> * Protected by tx vq lock. */
> @@ -91,11 +94,88 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
> net->tx_poll_state = VHOST_NET_POLL_STARTED;
> }
>
> +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + unsigned long flags;
> +
> + spin_lock_irqsave(&vq->notify_lock, flags);
> + if (!list_empty(&vq->notifier)) {
> + iocb = list_first_entry(&vq->notifier,
> + struct kiocb, ki_list);
> + list_del(&iocb->ki_list);
> + }
> + spin_unlock_irqrestore(&vq->notify_lock, flags);
> + return iocb;
> +}
> +
> +static void handle_async_rx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + struct vhost_log *vq_log = NULL;
> + int rx_total_len = 0;
> + int log, size;
> +
> + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> + return;
> +
> + if (vq->receiver)
> + vq->receiver(vq);
> +
> + vq_log = unlikely(vhost_has_feature(
> + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
> + while ((iocb = notify_dequeue(vq)) != NULL) {
> + vhost_add_used_and_signal(&net->dev, vq,
> + iocb->ki_pos, iocb->ki_nbytes);
> + log = (int)iocb->ki_user_data;
> + size = iocb->ki_nbytes;
> + rx_total_len += iocb->ki_nbytes;
> +
> + if (iocb->ki_dtor)
> + iocb->ki_dtor(iocb);
> + kmem_cache_free(net->cache, iocb);
> +
> + if (unlikely(vq_log))
> + vhost_log_write(vq, vq_log, log, size);
> + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> + vhost_poll_queue(&vq->poll);
> + break;
> + }
> + }
> +}
> +
> +static void handle_async_tx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + int tx_total_len = 0;
> +
> + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> + return;
> +
> + while ((iocb = notify_dequeue(vq)) != NULL) {
> + vhost_add_used_and_signal(&net->dev, vq,
> + iocb->ki_pos, 0);
> + tx_total_len += iocb->ki_nbytes;
> +
> + if (iocb->ki_dtor)
> + iocb->ki_dtor(iocb);
> +
> + kmem_cache_free(net->cache, iocb);
> + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> + vhost_poll_queue(&vq->poll);
> + break;
> + }
> + }
> +}
> +
> /* Expects to be always run from workqueue - which acts as
> * read-size critical section for our kind of RCU. */
> static void handle_tx(struct vhost_net *net)
> {
> struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
> + struct kiocb *iocb = NULL;
> unsigned head, out, in, s;
> struct msghdr msg = {
> .msg_name = NULL,
> @@ -124,6 +204,8 @@ static void handle_tx(struct vhost_net *net)
> tx_poll_stop(net);
> hdr_size = vq->hdr_size;
>
> + handle_async_tx_events_notify(net, vq);
> +
> for (;;) {
> head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> ARRAY_SIZE(vq->iov),
> @@ -151,6 +233,15 @@ static void handle_tx(struct vhost_net *net)
> /* Skip header. TODO: support TSO. */
> s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
> msg.msg_iovlen = out;
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> + if (!iocb)
> + break;
> + iocb->ki_pos = head;
> + iocb->private = (void *)vq;
> + }
> +
> len = iov_length(vq->iov, out);
> /* Sanity check */
> if (!len) {
> @@ -160,12 +251,16 @@ static void handle_tx(struct vhost_net *net)
> break;
> }
> /* TODO: Check specific error and bomb out unless ENOBUFS? */
> - err = sock->ops->sendmsg(NULL, sock, &msg, len);
> + err = sock->ops->sendmsg(iocb, sock, &msg, len);
> if (unlikely(err < 0)) {
> vhost_discard_vq_desc(vq);
> tx_poll_start(net, sock);
> break;
> }
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> + continue;
> +
> if (err != len)
> pr_err("Truncated TX packet: "
> " len %d != %zd\n", err, len);
> @@ -177,6 +272,8 @@ static void handle_tx(struct vhost_net *net)
> }
> }
>
> + handle_async_tx_events_notify(net, vq);
> +
> mutex_unlock(&vq->mutex);
> unuse_mm(net->dev.mm);
> }
> @@ -186,6 +283,7 @@ static void handle_tx(struct vhost_net *net)
> static void handle_rx(struct vhost_net *net)
> {
> struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> + struct kiocb *iocb = NULL;
> unsigned head, out, in, log, s;
> struct vhost_log *vq_log;
> struct msghdr msg = {
> @@ -206,7 +304,8 @@ static void handle_rx(struct vhost_net *net)
> int err;
> size_t hdr_size;
> struct socket *sock = rcu_dereference(vq->private_data);
> - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
> + vq->link_state == VHOST_VQ_LINK_SYNC))
> return;
>
> use_mm(net->dev.mm);
> @@ -214,9 +313,18 @@ static void handle_rx(struct vhost_net *net)
> vhost_disable_notify(vq);
> hdr_size = vq->hdr_size;
>
> - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> + /* In async cases, for write logging, the simple way is to get
> + * the log info always, and really logging is decided later.
> + * Thus, when logging enabled, we can get log, and when logging
> + * disabled, we can get log disabled accordingly.
> + */
> +
> + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
> + (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
> vq->log : NULL;
>
> + handle_async_rx_events_notify(net, vq);
> +
> for (;;) {
> head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> ARRAY_SIZE(vq->iov),
> @@ -245,6 +353,14 @@ static void handle_rx(struct vhost_net *net)
> s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> msg.msg_iovlen = in;
> len = iov_length(vq->iov, in);
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> + if (!iocb)
> + break;
> + iocb->private = vq;
> + iocb->ki_pos = head;
> + iocb->ki_user_data = log;
> + }
> /* Sanity check */
> if (!len) {
> vq_err(vq, "Unexpected header len for RX: "
> @@ -252,13 +368,18 @@ static void handle_rx(struct vhost_net *net)
> iov_length(vq->hdr, s), hdr_size);
> break;
> }
> - err = sock->ops->recvmsg(NULL, sock, &msg,
> +
> + err = sock->ops->recvmsg(iocb, sock, &msg,
> len, MSG_DONTWAIT | MSG_TRUNC);
> /* TODO: Check specific error and bomb out unless EAGAIN? */
> if (err < 0) {
> vhost_discard_vq_desc(vq);
> break;
> }
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> + continue;
> +
> /* TODO: Should check and handle checksum. */
> if (err > len) {
> pr_err("Discarded truncated rx packet: "
> @@ -284,10 +405,13 @@ static void handle_rx(struct vhost_net *net)
> }
> }
>
> + handle_async_rx_events_notify(net, vq);
> +
> mutex_unlock(&vq->mutex);
> unuse_mm(net->dev.mm);
> }
>
> +
> static void handle_tx_kick(struct work_struct *work)
> {
> struct vhost_virtqueue *vq;
> @@ -338,6 +462,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
> vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
> vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
> n->tx_poll_state = VHOST_NET_POLL_DISABLED;
> + n->cache = NULL;
> return 0;
> }
>
> @@ -398,6 +523,17 @@ static void vhost_net_flush(struct vhost_net *n)
> vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
> }
>
> +static void vhost_notifier_cleanup(struct vhost_net *n)
> +{
> + struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
> + struct kiocb *iocb = NULL;
> + if (n->cache) {
> + while ((iocb = notify_dequeue(vq)) != NULL)
> + kmem_cache_free(n->cache, iocb);
> + kmem_cache_destroy(n->cache);
> + }
> +}
> +
> static int vhost_net_release(struct inode *inode, struct file *f)
> {
> struct vhost_net *n = f->private_data;
> @@ -414,6 +550,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
> /* We do an extra flush before freeing memory,
> * since jobs can re-queue themselves. */
> vhost_net_flush(n);
> + vhost_notifier_cleanup(n);
> kfree(n);
> return 0;
> }
> @@ -462,7 +599,19 @@ static struct socket *get_tun_socket(int fd)
> return sock;
> }
>
> -static struct socket *get_socket(int fd)
> +static struct socket *get_mp_socket(int fd)
> +{
> + struct file *file = fget(fd);
> + struct socket *sock;
> + if (!file)
> + return ERR_PTR(-EBADF);
> + sock = mp_get_socket(file);
> + if (IS_ERR(sock))
> + fput(file);
> + return sock;
> +}
> +
> +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
> {
> struct socket *sock;
> if (fd == -1)
> @@ -473,9 +622,31 @@ static struct socket *get_socket(int fd)
> sock = get_tun_socket(fd);
> if (!IS_ERR(sock))
> return sock;
> + sock = get_mp_socket(fd);
> + if (!IS_ERR(sock)) {
> + vq->link_state = VHOST_VQ_LINK_ASYNC;
> + return sock;
> + }
> return ERR_PTR(-ENOTSOCK);
> }
>
> +static void vhost_init_link_state(struct vhost_net *n, int index)
> +{
> + struct vhost_virtqueue *vq = n->vqs + index;
> +
> + WARN_ON(!mutex_is_locked(&vq->mutex));
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + vq->receiver = NULL;
> + INIT_LIST_HEAD(&vq->notifier);
> + spin_lock_init(&vq->notify_lock);
> + if (!n->cache) {
> + n->cache = kmem_cache_create("vhost_kiocb",
> + sizeof(struct kiocb), 0,
> + SLAB_HWCACHE_ALIGN, NULL);
> + }
> + }
> +}
> +
> static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> {
> struct socket *sock, *oldsock;
> @@ -493,12 +664,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> }
> vq = n->vqs + index;
> mutex_lock(&vq->mutex);
> - sock = get_socket(fd);
> + vq->link_state = VHOST_VQ_LINK_SYNC;
> + sock = get_socket(vq, fd);
> if (IS_ERR(sock)) {
> r = PTR_ERR(sock);
> goto err;
> }
>
> + vhost_init_link_state(n, index);
> +
> /* start polling new socket */
> oldsock = vq->private_data;
> if (sock == oldsock)
> @@ -507,8 +681,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> vhost_net_disable_vq(n, vq);
> rcu_assign_pointer(vq->private_data, sock);
> vhost_net_enable_vq(n, vq);
> - mutex_unlock(&vq->mutex);
> done:
> + mutex_unlock(&vq->mutex);
> mutex_unlock(&n->dev.mutex);
> if (oldsock) {
> vhost_net_flush_vq(n, index);
> @@ -516,6 +690,7 @@ done:
> }
> return r;
> err:
> + mutex_unlock(&vq->mutex);
> mutex_unlock(&n->dev.mutex);
> return r;
> }
> diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> index d1f0453..cffe39a 100644
> --- a/drivers/vhost/vhost.h
> +++ b/drivers/vhost/vhost.h
> @@ -43,6 +43,11 @@ struct vhost_log {
> u64 len;
> };
>
> +enum vhost_vq_link_state {
> + VHOST_VQ_LINK_SYNC = 0,
> + VHOST_VQ_LINK_ASYNC = 1,
> +};
> +
> /* The virtqueue structure describes a queue attached to a device. */
> struct vhost_virtqueue {
> struct vhost_dev *dev;
> @@ -96,6 +101,11 @@ struct vhost_virtqueue {
> /* Log write descriptors */
> void __user *log_base;
> struct vhost_log log[VHOST_NET_MAX_SG];
> + /*Differiate async socket for 0-copy from normal*/
> + enum vhost_vq_link_state link_state;
> + struct list_head notifier;
> + spinlock_t notify_lock;
> + void (*receiver)(struct vhost_virtqueue *);
> };
>
> struct vhost_dev {
> --
> 1.5.4.4

2010-04-02 02:16:35

by Xin, Xiaohui

[permalink] [raw]
Subject: RE: [PATCH v1 2/3] Provides multiple submits and asynchronous notifications.


>> For the write logging, do you have a function in hand that we can
>> recompute the log? If that, I think I can use it to recompute the
>>log info when the logging is suddenly enabled.
>> For the outstanding requests, do you mean all the user buffers have
>>submitted before the logging ioctl changed? That may be a lot, and
>> some of them are still in NIC ring descriptors. Waiting them to be
>>finished may be need some time. I think when logging ioctl changed,
>> then the logging is changed just after that is also reasonable.

>The key point is that after loggin ioctl returns, any
>subsequent change to memory must be logged. It does not
>matter when was the request submitted, otherwise we will
>get memory corruption on migration.

The change to memory happens when vhost_add_used_and_signal(), right?
So after ioctl returns, just recompute the log info to the events in the async queue,
is ok. Since the ioctl and write log operations are all protected by vq->mutex.

Thanks
Xiaohui

> Thanks
> Xiaohui
>
> drivers/vhost/net.c | 189 +++++++++++++++++++++++++++++++++++++++++++++++--
> drivers/vhost/vhost.h | 10 +++
> 2 files changed, 192 insertions(+), 7 deletions(-)
>
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index 22d5fef..2aafd90 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -17,11 +17,13 @@
> #include <linux/workqueue.h>
> #include <linux/rcupdate.h>
> #include <linux/file.h>
> +#include <linux/aio.h>
>
> #include <linux/net.h>
> #include <linux/if_packet.h>
> #include <linux/if_arp.h>
> #include <linux/if_tun.h>
> +#include <linux/mpassthru.h>
>
> #include <net/sock.h>
>
> @@ -47,6 +49,7 @@ struct vhost_net {
> struct vhost_dev dev;
> struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
> struct vhost_poll poll[VHOST_NET_VQ_MAX];
> + struct kmem_cache *cache;
> /* Tells us whether we are polling a socket for TX.
> * We only do this when socket buffer fills up.
> * Protected by tx vq lock. */
> @@ -91,11 +94,88 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
> net->tx_poll_state = VHOST_NET_POLL_STARTED;
> }
>
> +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + unsigned long flags;
> +
> + spin_lock_irqsave(&vq->notify_lock, flags);
> + if (!list_empty(&vq->notifier)) {
> + iocb = list_first_entry(&vq->notifier,
> + struct kiocb, ki_list);
> + list_del(&iocb->ki_list);
> + }
> + spin_unlock_irqrestore(&vq->notify_lock, flags);
> + return iocb;
> +}
> +
> +static void handle_async_rx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + struct vhost_log *vq_log = NULL;
> + int rx_total_len = 0;
> + int log, size;
> +
> + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> + return;
> +
> + if (vq->receiver)
> + vq->receiver(vq);
> +
> + vq_log = unlikely(vhost_has_feature(
> + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
> + while ((iocb = notify_dequeue(vq)) != NULL) {
> + vhost_add_used_and_signal(&net->dev, vq,
> + iocb->ki_pos, iocb->ki_nbytes);
> + log = (int)iocb->ki_user_data;
> + size = iocb->ki_nbytes;
> + rx_total_len += iocb->ki_nbytes;
> +
> + if (iocb->ki_dtor)
> + iocb->ki_dtor(iocb);
> + kmem_cache_free(net->cache, iocb);
> +
> + if (unlikely(vq_log))
> + vhost_log_write(vq, vq_log, log, size);
> + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> + vhost_poll_queue(&vq->poll);
> + break;
> + }
> + }
> +}
> +
> +static void handle_async_tx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + int tx_total_len = 0;
> +
> + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> + return;
> +
> + while ((iocb = notify_dequeue(vq)) != NULL) {
> + vhost_add_used_and_signal(&net->dev, vq,
> + iocb->ki_pos, 0);
> + tx_total_len += iocb->ki_nbytes;
> +
> + if (iocb->ki_dtor)
> + iocb->ki_dtor(iocb);
> +
> + kmem_cache_free(net->cache, iocb);
> + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> + vhost_poll_queue(&vq->poll);
> + break;
> + }
> + }
> +}
> +
> /* Expects to be always run from workqueue - which acts as
> * read-size critical section for our kind of RCU. */
> static void handle_tx(struct vhost_net *net)
> {
> struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
> + struct kiocb *iocb = NULL;
> unsigned head, out, in, s;
> struct msghdr msg = {
> .msg_name = NULL,
> @@ -124,6 +204,8 @@ static void handle_tx(struct vhost_net *net)
> tx_poll_stop(net);
> hdr_size = vq->hdr_size;
>
> + handle_async_tx_events_notify(net, vq);
> +
> for (;;) {
> head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> ARRAY_SIZE(vq->iov),
> @@ -151,6 +233,15 @@ static void handle_tx(struct vhost_net *net)
> /* Skip header. TODO: support TSO. */
> s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
> msg.msg_iovlen = out;
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> + if (!iocb)
> + break;
> + iocb->ki_pos = head;
> + iocb->private = (void *)vq;
> + }
> +
> len = iov_length(vq->iov, out);
> /* Sanity check */
> if (!len) {
> @@ -160,12 +251,16 @@ static void handle_tx(struct vhost_net *net)
> break;
> }
> /* TODO: Check specific error and bomb out unless ENOBUFS? */
> - err = sock->ops->sendmsg(NULL, sock, &msg, len);
> + err = sock->ops->sendmsg(iocb, sock, &msg, len);
> if (unlikely(err < 0)) {
> vhost_discard_vq_desc(vq);
> tx_poll_start(net, sock);
> break;
> }
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> + continue;
> +
> if (err != len)
> pr_err("Truncated TX packet: "
> " len %d != %zd\n", err, len);
> @@ -177,6 +272,8 @@ static void handle_tx(struct vhost_net *net)
> }
> }
>
> + handle_async_tx_events_notify(net, vq);
> +
> mutex_unlock(&vq->mutex);
> unuse_mm(net->dev.mm);
> }
> @@ -186,6 +283,7 @@ static void handle_tx(struct vhost_net *net)
> static void handle_rx(struct vhost_net *net)
> {
> struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> + struct kiocb *iocb = NULL;
> unsigned head, out, in, log, s;
> struct vhost_log *vq_log;
> struct msghdr msg = {
> @@ -206,7 +304,8 @@ static void handle_rx(struct vhost_net *net)
> int err;
> size_t hdr_size;
> struct socket *sock = rcu_dereference(vq->private_data);
> - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
> + vq->link_state == VHOST_VQ_LINK_SYNC))
> return;
>
> use_mm(net->dev.mm);
> @@ -214,9 +313,18 @@ static void handle_rx(struct vhost_net *net)
> vhost_disable_notify(vq);
> hdr_size = vq->hdr_size;
>
> - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> + /* In async cases, for write logging, the simple way is to get
> + * the log info always, and really logging is decided later.
> + * Thus, when logging enabled, we can get log, and when logging
> + * disabled, we can get log disabled accordingly.
> + */
> +
> + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
> + (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
> vq->log : NULL;
>
> + handle_async_rx_events_notify(net, vq);
> +
> for (;;) {
> head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> ARRAY_SIZE(vq->iov),
> @@ -245,6 +353,14 @@ static void handle_rx(struct vhost_net *net)
> s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> msg.msg_iovlen = in;
> len = iov_length(vq->iov, in);
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> + if (!iocb)
> + break;
> + iocb->private = vq;
> + iocb->ki_pos = head;
> + iocb->ki_user_data = log;
> + }
> /* Sanity check */
> if (!len) {
> vq_err(vq, "Unexpected header len for RX: "
> @@ -252,13 +368,18 @@ static void handle_rx(struct vhost_net *net)
> iov_length(vq->hdr, s), hdr_size);
> break;
> }
> - err = sock->ops->recvmsg(NULL, sock, &msg,
> +
> + err = sock->ops->recvmsg(iocb, sock, &msg,
> len, MSG_DONTWAIT | MSG_TRUNC);
> /* TODO: Check specific error and bomb out unless EAGAIN? */
> if (err < 0) {
> vhost_discard_vq_desc(vq);
> break;
> }
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> + continue;
> +
> /* TODO: Should check and handle checksum. */
> if (err > len) {
> pr_err("Discarded truncated rx packet: "
> @@ -284,10 +405,13 @@ static void handle_rx(struct vhost_net *net)
> }
> }
>
> + handle_async_rx_events_notify(net, vq);
> +
> mutex_unlock(&vq->mutex);
> unuse_mm(net->dev.mm);
> }
>
> +
> static void handle_tx_kick(struct work_struct *work)
> {
> struct vhost_virtqueue *vq;
> @@ -338,6 +462,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
> vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
> vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
> n->tx_poll_state = VHOST_NET_POLL_DISABLED;
> + n->cache = NULL;
> return 0;
> }
>
> @@ -398,6 +523,17 @@ static void vhost_net_flush(struct vhost_net *n)
> vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
> }
>
> +static void vhost_notifier_cleanup(struct vhost_net *n)
> +{
> + struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
> + struct kiocb *iocb = NULL;
> + if (n->cache) {
> + while ((iocb = notify_dequeue(vq)) != NULL)
> + kmem_cache_free(n->cache, iocb);
> + kmem_cache_destroy(n->cache);
> + }
> +}
> +
> static int vhost_net_release(struct inode *inode, struct file *f)
> {
> struct vhost_net *n = f->private_data;
> @@ -414,6 +550,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
> /* We do an extra flush before freeing memory,
> * since jobs can re-queue themselves. */
> vhost_net_flush(n);
> + vhost_notifier_cleanup(n);
> kfree(n);
> return 0;
> }
> @@ -462,7 +599,19 @@ static struct socket *get_tun_socket(int fd)
> return sock;
> }
>
> -static struct socket *get_socket(int fd)
> +static struct socket *get_mp_socket(int fd)
> +{
> + struct file *file = fget(fd);
> + struct socket *sock;
> + if (!file)
> + return ERR_PTR(-EBADF);
> + sock = mp_get_socket(file);
> + if (IS_ERR(sock))
> + fput(file);
> + return sock;
> +}
> +
> +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
> {
> struct socket *sock;
> if (fd == -1)
> @@ -473,9 +622,31 @@ static struct socket *get_socket(int fd)
> sock = get_tun_socket(fd);
> if (!IS_ERR(sock))
> return sock;
> + sock = get_mp_socket(fd);
> + if (!IS_ERR(sock)) {
> + vq->link_state = VHOST_VQ_LINK_ASYNC;
> + return sock;
> + }
> return ERR_PTR(-ENOTSOCK);
> }
>
> +static void vhost_init_link_state(struct vhost_net *n, int index)
> +{
> + struct vhost_virtqueue *vq = n->vqs + index;
> +
> + WARN_ON(!mutex_is_locked(&vq->mutex));
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + vq->receiver = NULL;
> + INIT_LIST_HEAD(&vq->notifier);
> + spin_lock_init(&vq->notify_lock);
> + if (!n->cache) {
> + n->cache = kmem_cache_create("vhost_kiocb",
> + sizeof(struct kiocb), 0,
> + SLAB_HWCACHE_ALIGN, NULL);
> + }
> + }
> +}
> +
> static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> {
> struct socket *sock, *oldsock;
> @@ -493,12 +664,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> }
> vq = n->vqs + index;
> mutex_lock(&vq->mutex);
> - sock = get_socket(fd);
> + vq->link_state = VHOST_VQ_LINK_SYNC;
> + sock = get_socket(vq, fd);
> if (IS_ERR(sock)) {
> r = PTR_ERR(sock);
> goto err;
> }
>
> + vhost_init_link_state(n, index);
> +
> /* start polling new socket */
> oldsock = vq->private_data;
> if (sock == oldsock)
> @@ -507,8 +681,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> vhost_net_disable_vq(n, vq);
> rcu_assign_pointer(vq->private_data, sock);
> vhost_net_enable_vq(n, vq);
> - mutex_unlock(&vq->mutex);
> done:
> + mutex_unlock(&vq->mutex);
> mutex_unlock(&n->dev.mutex);
> if (oldsock) {
> vhost_net_flush_vq(n, index);
> @@ -516,6 +690,7 @@ done:
> }
> return r;
> err:
> + mutex_unlock(&vq->mutex);
> mutex_unlock(&n->dev.mutex);
> return r;
> }
> diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> index d1f0453..cffe39a 100644
> --- a/drivers/vhost/vhost.h
> +++ b/drivers/vhost/vhost.h
> @@ -43,6 +43,11 @@ struct vhost_log {
> u64 len;
> };
>
> +enum vhost_vq_link_state {
> + VHOST_VQ_LINK_SYNC = 0,
> + VHOST_VQ_LINK_ASYNC = 1,
> +};
> +
> /* The virtqueue structure describes a queue attached to a device. */
> struct vhost_virtqueue {
> struct vhost_dev *dev;
> @@ -96,6 +101,11 @@ struct vhost_virtqueue {
> /* Log write descriptors */
> void __user *log_base;
> struct vhost_log log[VHOST_NET_MAX_SG];
> + /*Differiate async socket for 0-copy from normal*/
> + enum vhost_vq_link_state link_state;
> + struct list_head notifier;
> + spinlock_t notify_lock;
> + void (*receiver)(struct vhost_virtqueue *);
> };
>
> struct vhost_dev {
> --
> 1.5.4.4

2010-04-04 11:44:47

by Michael S. Tsirkin

[permalink] [raw]
Subject: Re: [PATCH v1 2/3] Provides multiple submits and asynchronous notifications.

On Fri, Apr 02, 2010 at 10:16:16AM +0800, Xin, Xiaohui wrote:
>
> >> For the write logging, do you have a function in hand that we can
> >> recompute the log? If that, I think I can use it to recompute the
> >>log info when the logging is suddenly enabled.
> >> For the outstanding requests, do you mean all the user buffers have
> >>submitted before the logging ioctl changed? That may be a lot, and
> >> some of them are still in NIC ring descriptors. Waiting them to be
> >>finished may be need some time. I think when logging ioctl changed,
> >> then the logging is changed just after that is also reasonable.
>
> >The key point is that after loggin ioctl returns, any
> >subsequent change to memory must be logged. It does not
> >matter when was the request submitted, otherwise we will
> >get memory corruption on migration.
>
> The change to memory happens when vhost_add_used_and_signal(), right?
> So after ioctl returns, just recompute the log info to the events in the async queue,
> is ok. Since the ioctl and write log operations are all protected by vq->mutex.
>
> Thanks
> Xiaohui

Yes, I think this will work.

> > Thanks
> > Xiaohui
> >
> > drivers/vhost/net.c | 189 +++++++++++++++++++++++++++++++++++++++++++++++--
> > drivers/vhost/vhost.h | 10 +++
> > 2 files changed, 192 insertions(+), 7 deletions(-)
> >
> > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> > index 22d5fef..2aafd90 100644
> > --- a/drivers/vhost/net.c
> > +++ b/drivers/vhost/net.c
> > @@ -17,11 +17,13 @@
> > #include <linux/workqueue.h>
> > #include <linux/rcupdate.h>
> > #include <linux/file.h>
> > +#include <linux/aio.h>
> >
> > #include <linux/net.h>
> > #include <linux/if_packet.h>
> > #include <linux/if_arp.h>
> > #include <linux/if_tun.h>
> > +#include <linux/mpassthru.h>
> >
> > #include <net/sock.h>
> >
> > @@ -47,6 +49,7 @@ struct vhost_net {
> > struct vhost_dev dev;
> > struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
> > struct vhost_poll poll[VHOST_NET_VQ_MAX];
> > + struct kmem_cache *cache;
> > /* Tells us whether we are polling a socket for TX.
> > * We only do this when socket buffer fills up.
> > * Protected by tx vq lock. */
> > @@ -91,11 +94,88 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
> > net->tx_poll_state = VHOST_NET_POLL_STARTED;
> > }
> >
> > +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> > +{
> > + struct kiocb *iocb = NULL;
> > + unsigned long flags;
> > +
> > + spin_lock_irqsave(&vq->notify_lock, flags);
> > + if (!list_empty(&vq->notifier)) {
> > + iocb = list_first_entry(&vq->notifier,
> > + struct kiocb, ki_list);
> > + list_del(&iocb->ki_list);
> > + }
> > + spin_unlock_irqrestore(&vq->notify_lock, flags);
> > + return iocb;
> > +}
> > +
> > +static void handle_async_rx_events_notify(struct vhost_net *net,
> > + struct vhost_virtqueue *vq)
> > +{
> > + struct kiocb *iocb = NULL;
> > + struct vhost_log *vq_log = NULL;
> > + int rx_total_len = 0;
> > + int log, size;
> > +
> > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > + return;
> > +
> > + if (vq->receiver)
> > + vq->receiver(vq);
> > +
> > + vq_log = unlikely(vhost_has_feature(
> > + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
> > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > + vhost_add_used_and_signal(&net->dev, vq,
> > + iocb->ki_pos, iocb->ki_nbytes);
> > + log = (int)iocb->ki_user_data;
> > + size = iocb->ki_nbytes;
> > + rx_total_len += iocb->ki_nbytes;
> > +
> > + if (iocb->ki_dtor)
> > + iocb->ki_dtor(iocb);
> > + kmem_cache_free(net->cache, iocb);
> > +
> > + if (unlikely(vq_log))
> > + vhost_log_write(vq, vq_log, log, size);
> > + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> > + vhost_poll_queue(&vq->poll);
> > + break;
> > + }
> > + }
> > +}
> > +
> > +static void handle_async_tx_events_notify(struct vhost_net *net,
> > + struct vhost_virtqueue *vq)
> > +{
> > + struct kiocb *iocb = NULL;
> > + int tx_total_len = 0;
> > +
> > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > + return;
> > +
> > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > + vhost_add_used_and_signal(&net->dev, vq,
> > + iocb->ki_pos, 0);
> > + tx_total_len += iocb->ki_nbytes;
> > +
> > + if (iocb->ki_dtor)
> > + iocb->ki_dtor(iocb);
> > +
> > + kmem_cache_free(net->cache, iocb);
> > + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> > + vhost_poll_queue(&vq->poll);
> > + break;
> > + }
> > + }
> > +}
> > +
> > /* Expects to be always run from workqueue - which acts as
> > * read-size critical section for our kind of RCU. */
> > static void handle_tx(struct vhost_net *net)
> > {
> > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
> > + struct kiocb *iocb = NULL;
> > unsigned head, out, in, s;
> > struct msghdr msg = {
> > .msg_name = NULL,
> > @@ -124,6 +204,8 @@ static void handle_tx(struct vhost_net *net)
> > tx_poll_stop(net);
> > hdr_size = vq->hdr_size;
> >
> > + handle_async_tx_events_notify(net, vq);
> > +
> > for (;;) {
> > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > ARRAY_SIZE(vq->iov),
> > @@ -151,6 +233,15 @@ static void handle_tx(struct vhost_net *net)
> > /* Skip header. TODO: support TSO. */
> > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
> > msg.msg_iovlen = out;
> > +
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > + if (!iocb)
> > + break;
> > + iocb->ki_pos = head;
> > + iocb->private = (void *)vq;
> > + }
> > +
> > len = iov_length(vq->iov, out);
> > /* Sanity check */
> > if (!len) {
> > @@ -160,12 +251,16 @@ static void handle_tx(struct vhost_net *net)
> > break;
> > }
> > /* TODO: Check specific error and bomb out unless ENOBUFS? */
> > - err = sock->ops->sendmsg(NULL, sock, &msg, len);
> > + err = sock->ops->sendmsg(iocb, sock, &msg, len);
> > if (unlikely(err < 0)) {
> > vhost_discard_vq_desc(vq);
> > tx_poll_start(net, sock);
> > break;
> > }
> > +
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > + continue;
> > +
> > if (err != len)
> > pr_err("Truncated TX packet: "
> > " len %d != %zd\n", err, len);
> > @@ -177,6 +272,8 @@ static void handle_tx(struct vhost_net *net)
> > }
> > }
> >
> > + handle_async_tx_events_notify(net, vq);
> > +
> > mutex_unlock(&vq->mutex);
> > unuse_mm(net->dev.mm);
> > }
> > @@ -186,6 +283,7 @@ static void handle_tx(struct vhost_net *net)
> > static void handle_rx(struct vhost_net *net)
> > {
> > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> > + struct kiocb *iocb = NULL;
> > unsigned head, out, in, log, s;
> > struct vhost_log *vq_log;
> > struct msghdr msg = {
> > @@ -206,7 +304,8 @@ static void handle_rx(struct vhost_net *net)
> > int err;
> > size_t hdr_size;
> > struct socket *sock = rcu_dereference(vq->private_data);
> > - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> > + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
> > + vq->link_state == VHOST_VQ_LINK_SYNC))
> > return;
> >
> > use_mm(net->dev.mm);
> > @@ -214,9 +313,18 @@ static void handle_rx(struct vhost_net *net)
> > vhost_disable_notify(vq);
> > hdr_size = vq->hdr_size;
> >
> > - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> > + /* In async cases, for write logging, the simple way is to get
> > + * the log info always, and really logging is decided later.
> > + * Thus, when logging enabled, we can get log, and when logging
> > + * disabled, we can get log disabled accordingly.
> > + */
> > +
> > + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
> > + (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
> > vq->log : NULL;
> >
> > + handle_async_rx_events_notify(net, vq);
> > +
> > for (;;) {
> > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > ARRAY_SIZE(vq->iov),
> > @@ -245,6 +353,14 @@ static void handle_rx(struct vhost_net *net)
> > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> > msg.msg_iovlen = in;
> > len = iov_length(vq->iov, in);
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > + if (!iocb)
> > + break;
> > + iocb->private = vq;
> > + iocb->ki_pos = head;
> > + iocb->ki_user_data = log;
> > + }
> > /* Sanity check */
> > if (!len) {
> > vq_err(vq, "Unexpected header len for RX: "
> > @@ -252,13 +368,18 @@ static void handle_rx(struct vhost_net *net)
> > iov_length(vq->hdr, s), hdr_size);
> > break;
> > }
> > - err = sock->ops->recvmsg(NULL, sock, &msg,
> > +
> > + err = sock->ops->recvmsg(iocb, sock, &msg,
> > len, MSG_DONTWAIT | MSG_TRUNC);
> > /* TODO: Check specific error and bomb out unless EAGAIN? */
> > if (err < 0) {
> > vhost_discard_vq_desc(vq);
> > break;
> > }
> > +
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > + continue;
> > +
> > /* TODO: Should check and handle checksum. */
> > if (err > len) {
> > pr_err("Discarded truncated rx packet: "
> > @@ -284,10 +405,13 @@ static void handle_rx(struct vhost_net *net)
> > }
> > }
> >
> > + handle_async_rx_events_notify(net, vq);
> > +
> > mutex_unlock(&vq->mutex);
> > unuse_mm(net->dev.mm);
> > }
> >
> > +
> > static void handle_tx_kick(struct work_struct *work)
> > {
> > struct vhost_virtqueue *vq;
> > @@ -338,6 +462,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
> > vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
> > vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
> > n->tx_poll_state = VHOST_NET_POLL_DISABLED;
> > + n->cache = NULL;
> > return 0;
> > }
> >
> > @@ -398,6 +523,17 @@ static void vhost_net_flush(struct vhost_net *n)
> > vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
> > }
> >
> > +static void vhost_notifier_cleanup(struct vhost_net *n)
> > +{
> > + struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
> > + struct kiocb *iocb = NULL;
> > + if (n->cache) {
> > + while ((iocb = notify_dequeue(vq)) != NULL)
> > + kmem_cache_free(n->cache, iocb);
> > + kmem_cache_destroy(n->cache);
> > + }
> > +}
> > +
> > static int vhost_net_release(struct inode *inode, struct file *f)
> > {
> > struct vhost_net *n = f->private_data;
> > @@ -414,6 +550,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
> > /* We do an extra flush before freeing memory,
> > * since jobs can re-queue themselves. */
> > vhost_net_flush(n);
> > + vhost_notifier_cleanup(n);
> > kfree(n);
> > return 0;
> > }
> > @@ -462,7 +599,19 @@ static struct socket *get_tun_socket(int fd)
> > return sock;
> > }
> >
> > -static struct socket *get_socket(int fd)
> > +static struct socket *get_mp_socket(int fd)
> > +{
> > + struct file *file = fget(fd);
> > + struct socket *sock;
> > + if (!file)
> > + return ERR_PTR(-EBADF);
> > + sock = mp_get_socket(file);
> > + if (IS_ERR(sock))
> > + fput(file);
> > + return sock;
> > +}
> > +
> > +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
> > {
> > struct socket *sock;
> > if (fd == -1)
> > @@ -473,9 +622,31 @@ static struct socket *get_socket(int fd)
> > sock = get_tun_socket(fd);
> > if (!IS_ERR(sock))
> > return sock;
> > + sock = get_mp_socket(fd);
> > + if (!IS_ERR(sock)) {
> > + vq->link_state = VHOST_VQ_LINK_ASYNC;
> > + return sock;
> > + }
> > return ERR_PTR(-ENOTSOCK);
> > }
> >
> > +static void vhost_init_link_state(struct vhost_net *n, int index)
> > +{
> > + struct vhost_virtqueue *vq = n->vqs + index;
> > +
> > + WARN_ON(!mutex_is_locked(&vq->mutex));
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > + vq->receiver = NULL;
> > + INIT_LIST_HEAD(&vq->notifier);
> > + spin_lock_init(&vq->notify_lock);
> > + if (!n->cache) {
> > + n->cache = kmem_cache_create("vhost_kiocb",
> > + sizeof(struct kiocb), 0,
> > + SLAB_HWCACHE_ALIGN, NULL);
> > + }
> > + }
> > +}
> > +
> > static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > {
> > struct socket *sock, *oldsock;
> > @@ -493,12 +664,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > }
> > vq = n->vqs + index;
> > mutex_lock(&vq->mutex);
> > - sock = get_socket(fd);
> > + vq->link_state = VHOST_VQ_LINK_SYNC;
> > + sock = get_socket(vq, fd);
> > if (IS_ERR(sock)) {
> > r = PTR_ERR(sock);
> > goto err;
> > }
> >
> > + vhost_init_link_state(n, index);
> > +
> > /* start polling new socket */
> > oldsock = vq->private_data;
> > if (sock == oldsock)
> > @@ -507,8 +681,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > vhost_net_disable_vq(n, vq);
> > rcu_assign_pointer(vq->private_data, sock);
> > vhost_net_enable_vq(n, vq);
> > - mutex_unlock(&vq->mutex);
> > done:
> > + mutex_unlock(&vq->mutex);
> > mutex_unlock(&n->dev.mutex);
> > if (oldsock) {
> > vhost_net_flush_vq(n, index);
> > @@ -516,6 +690,7 @@ done:
> > }
> > return r;
> > err:
> > + mutex_unlock(&vq->mutex);
> > mutex_unlock(&n->dev.mutex);
> > return r;
> > }
> > diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> > index d1f0453..cffe39a 100644
> > --- a/drivers/vhost/vhost.h
> > +++ b/drivers/vhost/vhost.h
> > @@ -43,6 +43,11 @@ struct vhost_log {
> > u64 len;
> > };
> >
> > +enum vhost_vq_link_state {
> > + VHOST_VQ_LINK_SYNC = 0,
> > + VHOST_VQ_LINK_ASYNC = 1,
> > +};
> > +
> > /* The virtqueue structure describes a queue attached to a device. */
> > struct vhost_virtqueue {
> > struct vhost_dev *dev;
> > @@ -96,6 +101,11 @@ struct vhost_virtqueue {
> > /* Log write descriptors */
> > void __user *log_base;
> > struct vhost_log log[VHOST_NET_MAX_SG];
> > + /*Differiate async socket for 0-copy from normal*/
> > + enum vhost_vq_link_state link_state;
> > + struct list_head notifier;
> > + spinlock_t notify_lock;
> > + void (*receiver)(struct vhost_virtqueue *);
> > };
> >
> > struct vhost_dev {
> > --
> > 1.5.4.4

2010-04-06 05:47:11

by Xin, Xiaohui

[permalink] [raw]
Subject: RE: [PATCH v1 2/3] Provides multiple submits and asynchronous notifications.

Michael,
> >>> For the write logging, do you have a function in hand that we can
> >>> recompute the log? If that, I think I can use it to recompute the
> >>>log info when the logging is suddenly enabled.
> >>> For the outstanding requests, do you mean all the user buffers have
> >>>submitted before the logging ioctl changed? That may be a lot, and
> >> >some of them are still in NIC ring descriptors. Waiting them to be
> >>>finished may be need some time. I think when logging ioctl changed,
> >> >then the logging is changed just after that is also reasonable.

> >>The key point is that after loggin ioctl returns, any
> >>subsequent change to memory must be logged. It does not
> >>matter when was the request submitted, otherwise we will
> >>get memory corruption on migration.

> >The change to memory happens when vhost_add_used_and_signal(), right?
> >So after ioctl returns, just recompute the log info to the events in the async queue,
> >is ok. Since the ioctl and write log operations are all protected by vq->mutex.

>> Thanks
>> Xiaohui

>Yes, I think this will work.

Thanks, so do you have the function to recompute the log info in your hand that I can
use? I have weakly remembered that you have noticed it before some time.

> > Thanks
> > Xiaohui
> >
> > drivers/vhost/net.c | 189 +++++++++++++++++++++++++++++++++++++++++++++++--
> > drivers/vhost/vhost.h | 10 +++
> > 2 files changed, 192 insertions(+), 7 deletions(-)
> >
> > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> > index 22d5fef..2aafd90 100644
> > --- a/drivers/vhost/net.c
> > +++ b/drivers/vhost/net.c
> > @@ -17,11 +17,13 @@
> > #include <linux/workqueue.h>
> > #include <linux/rcupdate.h>
> > #include <linux/file.h>
> > +#include <linux/aio.h>
> >
> > #include <linux/net.h>
> > #include <linux/if_packet.h>
> > #include <linux/if_arp.h>
> > #include <linux/if_tun.h>
> > +#include <linux/mpassthru.h>
> >
> > #include <net/sock.h>
> >
> > @@ -47,6 +49,7 @@ struct vhost_net {
> > struct vhost_dev dev;
> > struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
> > struct vhost_poll poll[VHOST_NET_VQ_MAX];
> > + struct kmem_cache *cache;
> > /* Tells us whether we are polling a socket for TX.
> > * We only do this when socket buffer fills up.
> > * Protected by tx vq lock. */
> > @@ -91,11 +94,88 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
> > net->tx_poll_state = VHOST_NET_POLL_STARTED;
> > }
> >
> > +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> > +{
> > + struct kiocb *iocb = NULL;
> > + unsigned long flags;
> > +
> > + spin_lock_irqsave(&vq->notify_lock, flags);
> > + if (!list_empty(&vq->notifier)) {
> > + iocb = list_first_entry(&vq->notifier,
> > + struct kiocb, ki_list);
> > + list_del(&iocb->ki_list);
> > + }
> > + spin_unlock_irqrestore(&vq->notify_lock, flags);
> > + return iocb;
> > +}
> > +
> > +static void handle_async_rx_events_notify(struct vhost_net *net,
> > + struct vhost_virtqueue *vq)
> > +{
> > + struct kiocb *iocb = NULL;
> > + struct vhost_log *vq_log = NULL;
> > + int rx_total_len = 0;
> > + int log, size;
> > +
> > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > + return;
> > +
> > + if (vq->receiver)
> > + vq->receiver(vq);
> > +
> > + vq_log = unlikely(vhost_has_feature(
> > + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
> > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > + vhost_add_used_and_signal(&net->dev, vq,
> > + iocb->ki_pos, iocb->ki_nbytes);
> > + log = (int)iocb->ki_user_data;
> > + size = iocb->ki_nbytes;
> > + rx_total_len += iocb->ki_nbytes;
> > +
> > + if (iocb->ki_dtor)
> > + iocb->ki_dtor(iocb);
> > + kmem_cache_free(net->cache, iocb);
> > +
> > + if (unlikely(vq_log))
> > + vhost_log_write(vq, vq_log, log, size);
> > + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> > + vhost_poll_queue(&vq->poll);
> > + break;
> > + }
> > + }
> > +}
> > +
> > +static void handle_async_tx_events_notify(struct vhost_net *net,
> > + struct vhost_virtqueue *vq)
> > +{
> > + struct kiocb *iocb = NULL;
> > + int tx_total_len = 0;
> > +
> > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > + return;
> > +
> > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > + vhost_add_used_and_signal(&net->dev, vq,
> > + iocb->ki_pos, 0);
> > + tx_total_len += iocb->ki_nbytes;
> > +
> > + if (iocb->ki_dtor)
> > + iocb->ki_dtor(iocb);
> > +
> > + kmem_cache_free(net->cache, iocb);
> > + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> > + vhost_poll_queue(&vq->poll);
> > + break;
> > + }
> > + }
> > +}
> > +
> > /* Expects to be always run from workqueue - which acts as
> > * read-size critical section for our kind of RCU. */
> > static void handle_tx(struct vhost_net *net)
> > {
> > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
> > + struct kiocb *iocb = NULL;
> > unsigned head, out, in, s;
> > struct msghdr msg = {
> > .msg_name = NULL,
> > @@ -124,6 +204,8 @@ static void handle_tx(struct vhost_net *net)
> > tx_poll_stop(net);
> > hdr_size = vq->hdr_size;
> >
> > + handle_async_tx_events_notify(net, vq);
> > +
> > for (;;) {
> > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > ARRAY_SIZE(vq->iov),
> > @@ -151,6 +233,15 @@ static void handle_tx(struct vhost_net *net)
> > /* Skip header. TODO: support TSO. */
> > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
> > msg.msg_iovlen = out;
> > +
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > + if (!iocb)
> > + break;
> > + iocb->ki_pos = head;
> > + iocb->private = (void *)vq;
> > + }
> > +
> > len = iov_length(vq->iov, out);
> > /* Sanity check */
> > if (!len) {
> > @@ -160,12 +251,16 @@ static void handle_tx(struct vhost_net *net)
> > break;
> > }
> > /* TODO: Check specific error and bomb out unless ENOBUFS? */
> > - err = sock->ops->sendmsg(NULL, sock, &msg, len);
> > + err = sock->ops->sendmsg(iocb, sock, &msg, len);
> > if (unlikely(err < 0)) {
> > vhost_discard_vq_desc(vq);
> > tx_poll_start(net, sock);
> > break;
> > }
> > +
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > + continue;
> > +
> > if (err != len)
> > pr_err("Truncated TX packet: "
> > " len %d != %zd\n", err, len);
> > @@ -177,6 +272,8 @@ static void handle_tx(struct vhost_net *net)
> > }
> > }
> >
> > + handle_async_tx_events_notify(net, vq);
> > +
> > mutex_unlock(&vq->mutex);
> > unuse_mm(net->dev.mm);
> > }
> > @@ -186,6 +283,7 @@ static void handle_tx(struct vhost_net *net)
> > static void handle_rx(struct vhost_net *net)
> > {
> > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> > + struct kiocb *iocb = NULL;
> > unsigned head, out, in, log, s;
> > struct vhost_log *vq_log;
> > struct msghdr msg = {
> > @@ -206,7 +304,8 @@ static void handle_rx(struct vhost_net *net)
> > int err;
> > size_t hdr_size;
> > struct socket *sock = rcu_dereference(vq->private_data);
> > - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> > + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
> > + vq->link_state == VHOST_VQ_LINK_SYNC))
> > return;
> >
> > use_mm(net->dev.mm);
> > @@ -214,9 +313,18 @@ static void handle_rx(struct vhost_net *net)
> > vhost_disable_notify(vq);
> > hdr_size = vq->hdr_size;
> >
> > - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> > + /* In async cases, for write logging, the simple way is to get
> > + * the log info always, and really logging is decided later.
> > + * Thus, when logging enabled, we can get log, and when logging
> > + * disabled, we can get log disabled accordingly.
> > + */
> > +
> > + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
> > + (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
> > vq->log : NULL;
> >
> > + handle_async_rx_events_notify(net, vq);
> > +
> > for (;;) {
> > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > ARRAY_SIZE(vq->iov),
> > @@ -245,6 +353,14 @@ static void handle_rx(struct vhost_net *net)
> > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> > msg.msg_iovlen = in;
> > len = iov_length(vq->iov, in);
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > + if (!iocb)
> > + break;
> > + iocb->private = vq;
> > + iocb->ki_pos = head;
> > + iocb->ki_user_data = log;
> > + }
> > /* Sanity check */
> > if (!len) {
> > vq_err(vq, "Unexpected header len for RX: "
> > @@ -252,13 +368,18 @@ static void handle_rx(struct vhost_net *net)
> > iov_length(vq->hdr, s), hdr_size);
> > break;
> > }
> > - err = sock->ops->recvmsg(NULL, sock, &msg,
> > +
> > + err = sock->ops->recvmsg(iocb, sock, &msg,
> > len, MSG_DONTWAIT | MSG_TRUNC);
> > /* TODO: Check specific error and bomb out unless EAGAIN? */
> > if (err < 0) {
> > vhost_discard_vq_desc(vq);
> > break;
> > }
> > +
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > + continue;
> > +
> > /* TODO: Should check and handle checksum. */
> > if (err > len) {
> > pr_err("Discarded truncated rx packet: "
> > @@ -284,10 +405,13 @@ static void handle_rx(struct vhost_net *net)
> > }
> > }
> >
> > + handle_async_rx_events_notify(net, vq);
> > +
> > mutex_unlock(&vq->mutex);
> > unuse_mm(net->dev.mm);
> > }
> >
> > +
> > static void handle_tx_kick(struct work_struct *work)
> > {
> > struct vhost_virtqueue *vq;
> > @@ -338,6 +462,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
> > vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
> > vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
> > n->tx_poll_state = VHOST_NET_POLL_DISABLED;
> > + n->cache = NULL;
> > return 0;
> > }
> >
> > @@ -398,6 +523,17 @@ static void vhost_net_flush(struct vhost_net *n)
> > vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
> > }
> >
> > +static void vhost_notifier_cleanup(struct vhost_net *n)
> > +{
> > + struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
> > + struct kiocb *iocb = NULL;
> > + if (n->cache) {
> > + while ((iocb = notify_dequeue(vq)) != NULL)
> > + kmem_cache_free(n->cache, iocb);
> > + kmem_cache_destroy(n->cache);
> > + }
> > +}
> > +
> > static int vhost_net_release(struct inode *inode, struct file *f)
> > {
> > struct vhost_net *n = f->private_data;
> > @@ -414,6 +550,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
> > /* We do an extra flush before freeing memory,
> > * since jobs can re-queue themselves. */
> > vhost_net_flush(n);
> > + vhost_notifier_cleanup(n);
> > kfree(n);
> > return 0;
> > }
> > @@ -462,7 +599,19 @@ static struct socket *get_tun_socket(int fd)
> > return sock;
> > }
> >
> > -static struct socket *get_socket(int fd)
> > +static struct socket *get_mp_socket(int fd)
> > +{
> > + struct file *file = fget(fd);
> > + struct socket *sock;
> > + if (!file)
> > + return ERR_PTR(-EBADF);
> > + sock = mp_get_socket(file);
> > + if (IS_ERR(sock))
> > + fput(file);
> > + return sock;
> > +}
> > +
> > +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
> > {
> > struct socket *sock;
> > if (fd == -1)
> > @@ -473,9 +622,31 @@ static struct socket *get_socket(int fd)
> > sock = get_tun_socket(fd);
> > if (!IS_ERR(sock))
> > return sock;
> > + sock = get_mp_socket(fd);
> > + if (!IS_ERR(sock)) {
> > + vq->link_state = VHOST_VQ_LINK_ASYNC;
> > + return sock;
> > + }
> > return ERR_PTR(-ENOTSOCK);
> > }
> >
> > +static void vhost_init_link_state(struct vhost_net *n, int index)
> > +{
> > + struct vhost_virtqueue *vq = n->vqs + index;
> > +
> > + WARN_ON(!mutex_is_locked(&vq->mutex));
> > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > + vq->receiver = NULL;
> > + INIT_LIST_HEAD(&vq->notifier);
> > + spin_lock_init(&vq->notify_lock);
> > + if (!n->cache) {
> > + n->cache = kmem_cache_create("vhost_kiocb",
> > + sizeof(struct kiocb), 0,
> > + SLAB_HWCACHE_ALIGN, NULL);
> > + }
> > + }
> > +}
> > +
> > static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > {
> > struct socket *sock, *oldsock;
> > @@ -493,12 +664,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > }
> > vq = n->vqs + index;
> > mutex_lock(&vq->mutex);
> > - sock = get_socket(fd);
> > + vq->link_state = VHOST_VQ_LINK_SYNC;
> > + sock = get_socket(vq, fd);
> > if (IS_ERR(sock)) {
> > r = PTR_ERR(sock);
> > goto err;
> > }
> >
> > + vhost_init_link_state(n, index);
> > +
> > /* start polling new socket */
> > oldsock = vq->private_data;
> > if (sock == oldsock)
> > @@ -507,8 +681,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > vhost_net_disable_vq(n, vq);
> > rcu_assign_pointer(vq->private_data, sock);
> > vhost_net_enable_vq(n, vq);
> > - mutex_unlock(&vq->mutex);
> > done:
> > + mutex_unlock(&vq->mutex);
> > mutex_unlock(&n->dev.mutex);
> > if (oldsock) {
> > vhost_net_flush_vq(n, index);
> > @@ -516,6 +690,7 @@ done:
> > }
> > return r;
> > err:
> > + mutex_unlock(&vq->mutex);
> > mutex_unlock(&n->dev.mutex);
> > return r;
> > }
> > diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> > index d1f0453..cffe39a 100644
> > --- a/drivers/vhost/vhost.h
> > +++ b/drivers/vhost/vhost.h
> > @@ -43,6 +43,11 @@ struct vhost_log {
> > u64 len;
> > };
> >
> > +enum vhost_vq_link_state {
> > + VHOST_VQ_LINK_SYNC = 0,
> > + VHOST_VQ_LINK_ASYNC = 1,
> > +};
> > +
> > /* The virtqueue structure describes a queue attached to a device. */
> > struct vhost_virtqueue {
> > struct vhost_dev *dev;
> > @@ -96,6 +101,11 @@ struct vhost_virtqueue {
> > /* Log write descriptors */
> > void __user *log_base;
> > struct vhost_log log[VHOST_NET_MAX_SG];
> > + /*Differiate async socket for 0-copy from normal*/
> > + enum vhost_vq_link_state link_state;
> > + struct list_head notifier;
> > + spinlock_t notify_lock;
> > + void (*receiver)(struct vhost_virtqueue *);
> > };
> >
> > struct vhost_dev {
> > --
> > 1.5.4.4

2010-04-06 07:55:21

by Michael S. Tsirkin

[permalink] [raw]
Subject: Re: [PATCH v1 2/3] Provides multiple submits and asynchronous notifications.

On Tue, Apr 06, 2010 at 01:46:56PM +0800, Xin, Xiaohui wrote:
> Michael,
> > >>> For the write logging, do you have a function in hand that we can
> > >>> recompute the log? If that, I think I can use it to recompute the
> > >>>log info when the logging is suddenly enabled.
> > >>> For the outstanding requests, do you mean all the user buffers have
> > >>>submitted before the logging ioctl changed? That may be a lot, and
> > >> >some of them are still in NIC ring descriptors. Waiting them to be
> > >>>finished may be need some time. I think when logging ioctl changed,
> > >> >then the logging is changed just after that is also reasonable.
>
> > >>The key point is that after loggin ioctl returns, any
> > >>subsequent change to memory must be logged. It does not
> > >>matter when was the request submitted, otherwise we will
> > >>get memory corruption on migration.
>
> > >The change to memory happens when vhost_add_used_and_signal(), right?
> > >So after ioctl returns, just recompute the log info to the events in the async queue,
> > >is ok. Since the ioctl and write log operations are all protected by vq->mutex.
>
> >> Thanks
> >> Xiaohui
>
> >Yes, I think this will work.
>
> Thanks, so do you have the function to recompute the log info in your hand that I can
> use? I have weakly remembered that you have noticed it before some time.

Doesn't just rerunning vhost_get_vq_desc work?

> > > Thanks
> > > Xiaohui
> > >
> > > drivers/vhost/net.c | 189 +++++++++++++++++++++++++++++++++++++++++++++++--
> > > drivers/vhost/vhost.h | 10 +++
> > > 2 files changed, 192 insertions(+), 7 deletions(-)
> > >
> > > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> > > index 22d5fef..2aafd90 100644
> > > --- a/drivers/vhost/net.c
> > > +++ b/drivers/vhost/net.c
> > > @@ -17,11 +17,13 @@
> > > #include <linux/workqueue.h>
> > > #include <linux/rcupdate.h>
> > > #include <linux/file.h>
> > > +#include <linux/aio.h>
> > >
> > > #include <linux/net.h>
> > > #include <linux/if_packet.h>
> > > #include <linux/if_arp.h>
> > > #include <linux/if_tun.h>
> > > +#include <linux/mpassthru.h>
> > >
> > > #include <net/sock.h>
> > >
> > > @@ -47,6 +49,7 @@ struct vhost_net {
> > > struct vhost_dev dev;
> > > struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
> > > struct vhost_poll poll[VHOST_NET_VQ_MAX];
> > > + struct kmem_cache *cache;
> > > /* Tells us whether we are polling a socket for TX.
> > > * We only do this when socket buffer fills up.
> > > * Protected by tx vq lock. */
> > > @@ -91,11 +94,88 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
> > > net->tx_poll_state = VHOST_NET_POLL_STARTED;
> > > }
> > >
> > > +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> > > +{
> > > + struct kiocb *iocb = NULL;
> > > + unsigned long flags;
> > > +
> > > + spin_lock_irqsave(&vq->notify_lock, flags);
> > > + if (!list_empty(&vq->notifier)) {
> > > + iocb = list_first_entry(&vq->notifier,
> > > + struct kiocb, ki_list);
> > > + list_del(&iocb->ki_list);
> > > + }
> > > + spin_unlock_irqrestore(&vq->notify_lock, flags);
> > > + return iocb;
> > > +}
> > > +
> > > +static void handle_async_rx_events_notify(struct vhost_net *net,
> > > + struct vhost_virtqueue *vq)
> > > +{
> > > + struct kiocb *iocb = NULL;
> > > + struct vhost_log *vq_log = NULL;
> > > + int rx_total_len = 0;
> > > + int log, size;
> > > +
> > > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > > + return;
> > > +
> > > + if (vq->receiver)
> > > + vq->receiver(vq);
> > > +
> > > + vq_log = unlikely(vhost_has_feature(
> > > + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
> > > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > > + vhost_add_used_and_signal(&net->dev, vq,
> > > + iocb->ki_pos, iocb->ki_nbytes);
> > > + log = (int)iocb->ki_user_data;
> > > + size = iocb->ki_nbytes;
> > > + rx_total_len += iocb->ki_nbytes;
> > > +
> > > + if (iocb->ki_dtor)
> > > + iocb->ki_dtor(iocb);
> > > + kmem_cache_free(net->cache, iocb);
> > > +
> > > + if (unlikely(vq_log))
> > > + vhost_log_write(vq, vq_log, log, size);
> > > + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> > > + vhost_poll_queue(&vq->poll);
> > > + break;
> > > + }
> > > + }
> > > +}
> > > +
> > > +static void handle_async_tx_events_notify(struct vhost_net *net,
> > > + struct vhost_virtqueue *vq)
> > > +{
> > > + struct kiocb *iocb = NULL;
> > > + int tx_total_len = 0;
> > > +
> > > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > > + return;
> > > +
> > > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > > + vhost_add_used_and_signal(&net->dev, vq,
> > > + iocb->ki_pos, 0);
> > > + tx_total_len += iocb->ki_nbytes;
> > > +
> > > + if (iocb->ki_dtor)
> > > + iocb->ki_dtor(iocb);
> > > +
> > > + kmem_cache_free(net->cache, iocb);
> > > + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> > > + vhost_poll_queue(&vq->poll);
> > > + break;
> > > + }
> > > + }
> > > +}
> > > +
> > > /* Expects to be always run from workqueue - which acts as
> > > * read-size critical section for our kind of RCU. */
> > > static void handle_tx(struct vhost_net *net)
> > > {
> > > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
> > > + struct kiocb *iocb = NULL;
> > > unsigned head, out, in, s;
> > > struct msghdr msg = {
> > > .msg_name = NULL,
> > > @@ -124,6 +204,8 @@ static void handle_tx(struct vhost_net *net)
> > > tx_poll_stop(net);
> > > hdr_size = vq->hdr_size;
> > >
> > > + handle_async_tx_events_notify(net, vq);
> > > +
> > > for (;;) {
> > > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > > ARRAY_SIZE(vq->iov),
> > > @@ -151,6 +233,15 @@ static void handle_tx(struct vhost_net *net)
> > > /* Skip header. TODO: support TSO. */
> > > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
> > > msg.msg_iovlen = out;
> > > +
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > > + if (!iocb)
> > > + break;
> > > + iocb->ki_pos = head;
> > > + iocb->private = (void *)vq;
> > > + }
> > > +
> > > len = iov_length(vq->iov, out);
> > > /* Sanity check */
> > > if (!len) {
> > > @@ -160,12 +251,16 @@ static void handle_tx(struct vhost_net *net)
> > > break;
> > > }
> > > /* TODO: Check specific error and bomb out unless ENOBUFS? */
> > > - err = sock->ops->sendmsg(NULL, sock, &msg, len);
> > > + err = sock->ops->sendmsg(iocb, sock, &msg, len);
> > > if (unlikely(err < 0)) {
> > > vhost_discard_vq_desc(vq);
> > > tx_poll_start(net, sock);
> > > break;
> > > }
> > > +
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > > + continue;
> > > +
> > > if (err != len)
> > > pr_err("Truncated TX packet: "
> > > " len %d != %zd\n", err, len);
> > > @@ -177,6 +272,8 @@ static void handle_tx(struct vhost_net *net)
> > > }
> > > }
> > >
> > > + handle_async_tx_events_notify(net, vq);
> > > +
> > > mutex_unlock(&vq->mutex);
> > > unuse_mm(net->dev.mm);
> > > }
> > > @@ -186,6 +283,7 @@ static void handle_tx(struct vhost_net *net)
> > > static void handle_rx(struct vhost_net *net)
> > > {
> > > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> > > + struct kiocb *iocb = NULL;
> > > unsigned head, out, in, log, s;
> > > struct vhost_log *vq_log;
> > > struct msghdr msg = {
> > > @@ -206,7 +304,8 @@ static void handle_rx(struct vhost_net *net)
> > > int err;
> > > size_t hdr_size;
> > > struct socket *sock = rcu_dereference(vq->private_data);
> > > - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> > > + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
> > > + vq->link_state == VHOST_VQ_LINK_SYNC))
> > > return;
> > >
> > > use_mm(net->dev.mm);
> > > @@ -214,9 +313,18 @@ static void handle_rx(struct vhost_net *net)
> > > vhost_disable_notify(vq);
> > > hdr_size = vq->hdr_size;
> > >
> > > - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> > > + /* In async cases, for write logging, the simple way is to get
> > > + * the log info always, and really logging is decided later.
> > > + * Thus, when logging enabled, we can get log, and when logging
> > > + * disabled, we can get log disabled accordingly.
> > > + */
> > > +
> > > + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
> > > + (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
> > > vq->log : NULL;
> > >
> > > + handle_async_rx_events_notify(net, vq);
> > > +
> > > for (;;) {
> > > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > > ARRAY_SIZE(vq->iov),
> > > @@ -245,6 +353,14 @@ static void handle_rx(struct vhost_net *net)
> > > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> > > msg.msg_iovlen = in;
> > > len = iov_length(vq->iov, in);
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > > + if (!iocb)
> > > + break;
> > > + iocb->private = vq;
> > > + iocb->ki_pos = head;
> > > + iocb->ki_user_data = log;
> > > + }
> > > /* Sanity check */
> > > if (!len) {
> > > vq_err(vq, "Unexpected header len for RX: "
> > > @@ -252,13 +368,18 @@ static void handle_rx(struct vhost_net *net)
> > > iov_length(vq->hdr, s), hdr_size);
> > > break;
> > > }
> > > - err = sock->ops->recvmsg(NULL, sock, &msg,
> > > +
> > > + err = sock->ops->recvmsg(iocb, sock, &msg,
> > > len, MSG_DONTWAIT | MSG_TRUNC);
> > > /* TODO: Check specific error and bomb out unless EAGAIN? */
> > > if (err < 0) {
> > > vhost_discard_vq_desc(vq);
> > > break;
> > > }
> > > +
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > > + continue;
> > > +
> > > /* TODO: Should check and handle checksum. */
> > > if (err > len) {
> > > pr_err("Discarded truncated rx packet: "
> > > @@ -284,10 +405,13 @@ static void handle_rx(struct vhost_net *net)
> > > }
> > > }
> > >
> > > + handle_async_rx_events_notify(net, vq);
> > > +
> > > mutex_unlock(&vq->mutex);
> > > unuse_mm(net->dev.mm);
> > > }
> > >
> > > +
> > > static void handle_tx_kick(struct work_struct *work)
> > > {
> > > struct vhost_virtqueue *vq;
> > > @@ -338,6 +462,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
> > > vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
> > > vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
> > > n->tx_poll_state = VHOST_NET_POLL_DISABLED;
> > > + n->cache = NULL;
> > > return 0;
> > > }
> > >
> > > @@ -398,6 +523,17 @@ static void vhost_net_flush(struct vhost_net *n)
> > > vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
> > > }
> > >
> > > +static void vhost_notifier_cleanup(struct vhost_net *n)
> > > +{
> > > + struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
> > > + struct kiocb *iocb = NULL;
> > > + if (n->cache) {
> > > + while ((iocb = notify_dequeue(vq)) != NULL)
> > > + kmem_cache_free(n->cache, iocb);
> > > + kmem_cache_destroy(n->cache);
> > > + }
> > > +}
> > > +
> > > static int vhost_net_release(struct inode *inode, struct file *f)
> > > {
> > > struct vhost_net *n = f->private_data;
> > > @@ -414,6 +550,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
> > > /* We do an extra flush before freeing memory,
> > > * since jobs can re-queue themselves. */
> > > vhost_net_flush(n);
> > > + vhost_notifier_cleanup(n);
> > > kfree(n);
> > > return 0;
> > > }
> > > @@ -462,7 +599,19 @@ static struct socket *get_tun_socket(int fd)
> > > return sock;
> > > }
> > >
> > > -static struct socket *get_socket(int fd)
> > > +static struct socket *get_mp_socket(int fd)
> > > +{
> > > + struct file *file = fget(fd);
> > > + struct socket *sock;
> > > + if (!file)
> > > + return ERR_PTR(-EBADF);
> > > + sock = mp_get_socket(file);
> > > + if (IS_ERR(sock))
> > > + fput(file);
> > > + return sock;
> > > +}
> > > +
> > > +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
> > > {
> > > struct socket *sock;
> > > if (fd == -1)
> > > @@ -473,9 +622,31 @@ static struct socket *get_socket(int fd)
> > > sock = get_tun_socket(fd);
> > > if (!IS_ERR(sock))
> > > return sock;
> > > + sock = get_mp_socket(fd);
> > > + if (!IS_ERR(sock)) {
> > > + vq->link_state = VHOST_VQ_LINK_ASYNC;
> > > + return sock;
> > > + }
> > > return ERR_PTR(-ENOTSOCK);
> > > }
> > >
> > > +static void vhost_init_link_state(struct vhost_net *n, int index)
> > > +{
> > > + struct vhost_virtqueue *vq = n->vqs + index;
> > > +
> > > + WARN_ON(!mutex_is_locked(&vq->mutex));
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > > + vq->receiver = NULL;
> > > + INIT_LIST_HEAD(&vq->notifier);
> > > + spin_lock_init(&vq->notify_lock);
> > > + if (!n->cache) {
> > > + n->cache = kmem_cache_create("vhost_kiocb",
> > > + sizeof(struct kiocb), 0,
> > > + SLAB_HWCACHE_ALIGN, NULL);
> > > + }
> > > + }
> > > +}
> > > +
> > > static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > > {
> > > struct socket *sock, *oldsock;
> > > @@ -493,12 +664,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > > }
> > > vq = n->vqs + index;
> > > mutex_lock(&vq->mutex);
> > > - sock = get_socket(fd);
> > > + vq->link_state = VHOST_VQ_LINK_SYNC;
> > > + sock = get_socket(vq, fd);
> > > if (IS_ERR(sock)) {
> > > r = PTR_ERR(sock);
> > > goto err;
> > > }
> > >
> > > + vhost_init_link_state(n, index);
> > > +
> > > /* start polling new socket */
> > > oldsock = vq->private_data;
> > > if (sock == oldsock)
> > > @@ -507,8 +681,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > > vhost_net_disable_vq(n, vq);
> > > rcu_assign_pointer(vq->private_data, sock);
> > > vhost_net_enable_vq(n, vq);
> > > - mutex_unlock(&vq->mutex);
> > > done:
> > > + mutex_unlock(&vq->mutex);
> > > mutex_unlock(&n->dev.mutex);
> > > if (oldsock) {
> > > vhost_net_flush_vq(n, index);
> > > @@ -516,6 +690,7 @@ done:
> > > }
> > > return r;
> > > err:
> > > + mutex_unlock(&vq->mutex);
> > > mutex_unlock(&n->dev.mutex);
> > > return r;
> > > }
> > > diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> > > index d1f0453..cffe39a 100644
> > > --- a/drivers/vhost/vhost.h
> > > +++ b/drivers/vhost/vhost.h
> > > @@ -43,6 +43,11 @@ struct vhost_log {
> > > u64 len;
> > > };
> > >
> > > +enum vhost_vq_link_state {
> > > + VHOST_VQ_LINK_SYNC = 0,
> > > + VHOST_VQ_LINK_ASYNC = 1,
> > > +};
> > > +
> > > /* The virtqueue structure describes a queue attached to a device. */
> > > struct vhost_virtqueue {
> > > struct vhost_dev *dev;
> > > @@ -96,6 +101,11 @@ struct vhost_virtqueue {
> > > /* Log write descriptors */
> > > void __user *log_base;
> > > struct vhost_log log[VHOST_NET_MAX_SG];
> > > + /*Differiate async socket for 0-copy from normal*/
> > > + enum vhost_vq_link_state link_state;
> > > + struct list_head notifier;
> > > + spinlock_t notify_lock;
> > > + void (*receiver)(struct vhost_virtqueue *);
> > > };
> > >
> > > struct vhost_dev {
> > > --
> > > 1.5.4.4

2010-04-07 01:36:57

by Xin, Xiaohui

[permalink] [raw]
Subject: RE: [PATCH v1 2/3] Provides multiple submits and asynchronous notifications.

Michael,
> > >>>> For the write logging, do you have a function in hand that we can
> > >>>> recompute the log? If that, I think I can use it to recompute the
> > >>>>log info when the logging is suddenly enabled.
> > >>>> For the outstanding requests, do you mean all the user buffers have
> > >>>>submitted before the logging ioctl changed? That may be a lot, and
> > >> >>some of them are still in NIC ring descriptors. Waiting them to be
> > >>>>finished may be need some time. I think when logging ioctl changed,
> > >> >>then the logging is changed just after that is also reasonable.

> > >>>The key point is that after loggin ioctl returns, any
> > >>>subsequent change to memory must be logged. It does not
> > >>>matter when was the request submitted, otherwise we will
> > >>>get memory corruption on migration.

> > >>The change to memory happens when vhost_add_used_and_signal(), right?
> > >>So after ioctl returns, just recompute the log info to the events in the async queue,
> > >>is ok. Since the ioctl and write log operations are all protected by vq->mutex.

> >>> Thanks
> >> >Xiaohui

> >>Yes, I think this will work.

>> Thanks, so do you have the function to recompute the log info in your hand that I can
>>use? I have weakly remembered that you have noticed it before some time.

>Doesn't just rerunning vhost_get_vq_desc work?

Am I missing something here?
The vhost_get_vq_desc() looks in vq, and finds the first available buffers, and converts it
to an iovec. I think the first available buffer is not the buffers in the async queue, so I
think rerunning vhost_get_vq_desc() cannot work.

Thanks
Xiaohui

> > > Thanks
> > > Xiaohui
> > >
> > > drivers/vhost/net.c | 189 +++++++++++++++++++++++++++++++++++++++++++++++--
> > > drivers/vhost/vhost.h | 10 +++
> > > 2 files changed, 192 insertions(+), 7 deletions(-)
> > >
> > > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> > > index 22d5fef..2aafd90 100644
> > > --- a/drivers/vhost/net.c
> > > +++ b/drivers/vhost/net.c
> > > @@ -17,11 +17,13 @@
> > > #include <linux/workqueue.h>
> > > #include <linux/rcupdate.h>
> > > #include <linux/file.h>
> > > +#include <linux/aio.h>
> > >
> > > #include <linux/net.h>
> > > #include <linux/if_packet.h>
> > > #include <linux/if_arp.h>
> > > #include <linux/if_tun.h>
> > > +#include <linux/mpassthru.h>
> > >
> > > #include <net/sock.h>
> > >
> > > @@ -47,6 +49,7 @@ struct vhost_net {
> > > struct vhost_dev dev;
> > > struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
> > > struct vhost_poll poll[VHOST_NET_VQ_MAX];
> > > + struct kmem_cache *cache;
> > > /* Tells us whether we are polling a socket for TX.
> > > * We only do this when socket buffer fills up.
> > > * Protected by tx vq lock. */
> > > @@ -91,11 +94,88 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
> > > net->tx_poll_state = VHOST_NET_POLL_STARTED;
> > > }
> > >
> > > +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> > > +{
> > > + struct kiocb *iocb = NULL;
> > > + unsigned long flags;
> > > +
> > > + spin_lock_irqsave(&vq->notify_lock, flags);
> > > + if (!list_empty(&vq->notifier)) {
> > > + iocb = list_first_entry(&vq->notifier,
> > > + struct kiocb, ki_list);
> > > + list_del(&iocb->ki_list);
> > > + }
> > > + spin_unlock_irqrestore(&vq->notify_lock, flags);
> > > + return iocb;
> > > +}
> > > +
> > > +static void handle_async_rx_events_notify(struct vhost_net *net,
> > > + struct vhost_virtqueue *vq)
> > > +{
> > > + struct kiocb *iocb = NULL;
> > > + struct vhost_log *vq_log = NULL;
> > > + int rx_total_len = 0;
> > > + int log, size;
> > > +
> > > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > > + return;
> > > +
> > > + if (vq->receiver)
> > > + vq->receiver(vq);
> > > +
> > > + vq_log = unlikely(vhost_has_feature(
> > > + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
> > > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > > + vhost_add_used_and_signal(&net->dev, vq,
> > > + iocb->ki_pos, iocb->ki_nbytes);
> > > + log = (int)iocb->ki_user_data;
> > > + size = iocb->ki_nbytes;
> > > + rx_total_len += iocb->ki_nbytes;
> > > +
> > > + if (iocb->ki_dtor)
> > > + iocb->ki_dtor(iocb);
> > > + kmem_cache_free(net->cache, iocb);
> > > +
> > > + if (unlikely(vq_log))
> > > + vhost_log_write(vq, vq_log, log, size);
> > > + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> > > + vhost_poll_queue(&vq->poll);
> > > + break;
> > > + }
> > > + }
> > > +}
> > > +
> > > +static void handle_async_tx_events_notify(struct vhost_net *net,
> > > + struct vhost_virtqueue *vq)
> > > +{
> > > + struct kiocb *iocb = NULL;
> > > + int tx_total_len = 0;
> > > +
> > > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > > + return;
> > > +
> > > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > > + vhost_add_used_and_signal(&net->dev, vq,
> > > + iocb->ki_pos, 0);
> > > + tx_total_len += iocb->ki_nbytes;
> > > +
> > > + if (iocb->ki_dtor)
> > > + iocb->ki_dtor(iocb);
> > > +
> > > + kmem_cache_free(net->cache, iocb);
> > > + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> > > + vhost_poll_queue(&vq->poll);
> > > + break;
> > > + }
> > > + }
> > > +}
> > > +
> > > /* Expects to be always run from workqueue - which acts as
> > > * read-size critical section for our kind of RCU. */
> > > static void handle_tx(struct vhost_net *net)
> > > {
> > > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
> > > + struct kiocb *iocb = NULL;
> > > unsigned head, out, in, s;
> > > struct msghdr msg = {
> > > .msg_name = NULL,
> > > @@ -124,6 +204,8 @@ static void handle_tx(struct vhost_net *net)
> > > tx_poll_stop(net);
> > > hdr_size = vq->hdr_size;
> > >
> > > + handle_async_tx_events_notify(net, vq);
> > > +
> > > for (;;) {
> > > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > > ARRAY_SIZE(vq->iov),
> > > @@ -151,6 +233,15 @@ static void handle_tx(struct vhost_net *net)
> > > /* Skip header. TODO: support TSO. */
> > > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
> > > msg.msg_iovlen = out;
> > > +
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > > + if (!iocb)
> > > + break;
> > > + iocb->ki_pos = head;
> > > + iocb->private = (void *)vq;
> > > + }
> > > +
> > > len = iov_length(vq->iov, out);
> > > /* Sanity check */
> > > if (!len) {
> > > @@ -160,12 +251,16 @@ static void handle_tx(struct vhost_net *net)
> > > break;
> > > }
> > > /* TODO: Check specific error and bomb out unless ENOBUFS? */
> > > - err = sock->ops->sendmsg(NULL, sock, &msg, len);
> > > + err = sock->ops->sendmsg(iocb, sock, &msg, len);
> > > if (unlikely(err < 0)) {
> > > vhost_discard_vq_desc(vq);
> > > tx_poll_start(net, sock);
> > > break;
> > > }
> > > +
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > > + continue;
> > > +
> > > if (err != len)
> > > pr_err("Truncated TX packet: "
> > > " len %d != %zd\n", err, len);
> > > @@ -177,6 +272,8 @@ static void handle_tx(struct vhost_net *net)
> > > }
> > > }
> > >
> > > + handle_async_tx_events_notify(net, vq);
> > > +
> > > mutex_unlock(&vq->mutex);
> > > unuse_mm(net->dev.mm);
> > > }
> > > @@ -186,6 +283,7 @@ static void handle_tx(struct vhost_net *net)
> > > static void handle_rx(struct vhost_net *net)
> > > {
> > > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> > > + struct kiocb *iocb = NULL;
> > > unsigned head, out, in, log, s;
> > > struct vhost_log *vq_log;
> > > struct msghdr msg = {
> > > @@ -206,7 +304,8 @@ static void handle_rx(struct vhost_net *net)
> > > int err;
> > > size_t hdr_size;
> > > struct socket *sock = rcu_dereference(vq->private_data);
> > > - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> > > + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
> > > + vq->link_state == VHOST_VQ_LINK_SYNC))
> > > return;
> > >
> > > use_mm(net->dev.mm);
> > > @@ -214,9 +313,18 @@ static void handle_rx(struct vhost_net *net)
> > > vhost_disable_notify(vq);
> > > hdr_size = vq->hdr_size;
> > >
> > > - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> > > + /* In async cases, for write logging, the simple way is to get
> > > + * the log info always, and really logging is decided later.
> > > + * Thus, when logging enabled, we can get log, and when logging
> > > + * disabled, we can get log disabled accordingly.
> > > + */
> > > +
> > > + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
> > > + (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
> > > vq->log : NULL;
> > >
> > > + handle_async_rx_events_notify(net, vq);
> > > +
> > > for (;;) {
> > > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > > ARRAY_SIZE(vq->iov),
> > > @@ -245,6 +353,14 @@ static void handle_rx(struct vhost_net *net)
> > > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> > > msg.msg_iovlen = in;
> > > len = iov_length(vq->iov, in);
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > > + if (!iocb)
> > > + break;
> > > + iocb->private = vq;
> > > + iocb->ki_pos = head;
> > > + iocb->ki_user_data = log;
> > > + }
> > > /* Sanity check */
> > > if (!len) {
> > > vq_err(vq, "Unexpected header len for RX: "
> > > @@ -252,13 +368,18 @@ static void handle_rx(struct vhost_net *net)
> > > iov_length(vq->hdr, s), hdr_size);
> > > break;
> > > }
> > > - err = sock->ops->recvmsg(NULL, sock, &msg,
> > > +
> > > + err = sock->ops->recvmsg(iocb, sock, &msg,
> > > len, MSG_DONTWAIT | MSG_TRUNC);
> > > /* TODO: Check specific error and bomb out unless EAGAIN? */
> > > if (err < 0) {
> > > vhost_discard_vq_desc(vq);
> > > break;
> > > }
> > > +
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > > + continue;
> > > +
> > > /* TODO: Should check and handle checksum. */
> > > if (err > len) {
> > > pr_err("Discarded truncated rx packet: "
> > > @@ -284,10 +405,13 @@ static void handle_rx(struct vhost_net *net)
> > > }
> > > }
> > >
> > > + handle_async_rx_events_notify(net, vq);
> > > +
> > > mutex_unlock(&vq->mutex);
> > > unuse_mm(net->dev.mm);
> > > }
> > >
> > > +
> > > static void handle_tx_kick(struct work_struct *work)
> > > {
> > > struct vhost_virtqueue *vq;
> > > @@ -338,6 +462,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
> > > vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
> > > vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
> > > n->tx_poll_state = VHOST_NET_POLL_DISABLED;
> > > + n->cache = NULL;
> > > return 0;
> > > }
> > >
> > > @@ -398,6 +523,17 @@ static void vhost_net_flush(struct vhost_net *n)
> > > vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
> > > }
> > >
> > > +static void vhost_notifier_cleanup(struct vhost_net *n)
> > > +{
> > > + struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
> > > + struct kiocb *iocb = NULL;
> > > + if (n->cache) {
> > > + while ((iocb = notify_dequeue(vq)) != NULL)
> > > + kmem_cache_free(n->cache, iocb);
> > > + kmem_cache_destroy(n->cache);
> > > + }
> > > +}
> > > +
> > > static int vhost_net_release(struct inode *inode, struct file *f)
> > > {
> > > struct vhost_net *n = f->private_data;
> > > @@ -414,6 +550,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
> > > /* We do an extra flush before freeing memory,
> > > * since jobs can re-queue themselves. */
> > > vhost_net_flush(n);
> > > + vhost_notifier_cleanup(n);
> > > kfree(n);
> > > return 0;
> > > }
> > > @@ -462,7 +599,19 @@ static struct socket *get_tun_socket(int fd)
> > > return sock;
> > > }
> > >
> > > -static struct socket *get_socket(int fd)
> > > +static struct socket *get_mp_socket(int fd)
> > > +{
> > > + struct file *file = fget(fd);
> > > + struct socket *sock;
> > > + if (!file)
> > > + return ERR_PTR(-EBADF);
> > > + sock = mp_get_socket(file);
> > > + if (IS_ERR(sock))
> > > + fput(file);
> > > + return sock;
> > > +}
> > > +
> > > +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
> > > {
> > > struct socket *sock;
> > > if (fd == -1)
> > > @@ -473,9 +622,31 @@ static struct socket *get_socket(int fd)
> > > sock = get_tun_socket(fd);
> > > if (!IS_ERR(sock))
> > > return sock;
> > > + sock = get_mp_socket(fd);
> > > + if (!IS_ERR(sock)) {
> > > + vq->link_state = VHOST_VQ_LINK_ASYNC;
> > > + return sock;
> > > + }
> > > return ERR_PTR(-ENOTSOCK);
> > > }
> > >
> > > +static void vhost_init_link_state(struct vhost_net *n, int index)
> > > +{
> > > + struct vhost_virtqueue *vq = n->vqs + index;
> > > +
> > > + WARN_ON(!mutex_is_locked(&vq->mutex));
> > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > > + vq->receiver = NULL;
> > > + INIT_LIST_HEAD(&vq->notifier);
> > > + spin_lock_init(&vq->notify_lock);
> > > + if (!n->cache) {
> > > + n->cache = kmem_cache_create("vhost_kiocb",
> > > + sizeof(struct kiocb), 0,
> > > + SLAB_HWCACHE_ALIGN, NULL);
> > > + }
> > > + }
> > > +}
> > > +
> > > static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > > {
> > > struct socket *sock, *oldsock;
> > > @@ -493,12 +664,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > > }
> > > vq = n->vqs + index;
> > > mutex_lock(&vq->mutex);
> > > - sock = get_socket(fd);
> > > + vq->link_state = VHOST_VQ_LINK_SYNC;
> > > + sock = get_socket(vq, fd);
> > > if (IS_ERR(sock)) {
> > > r = PTR_ERR(sock);
> > > goto err;
> > > }
> > >
> > > + vhost_init_link_state(n, index);
> > > +
> > > /* start polling new socket */
> > > oldsock = vq->private_data;
> > > if (sock == oldsock)
> > > @@ -507,8 +681,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > > vhost_net_disable_vq(n, vq);
> > > rcu_assign_pointer(vq->private_data, sock);
> > > vhost_net_enable_vq(n, vq);
> > > - mutex_unlock(&vq->mutex);
> > > done:
> > > + mutex_unlock(&vq->mutex);
> > > mutex_unlock(&n->dev.mutex);
> > > if (oldsock) {
> > > vhost_net_flush_vq(n, index);
> > > @@ -516,6 +690,7 @@ done:
> > > }
> > > return r;
> > > err:
> > > + mutex_unlock(&vq->mutex);
> > > mutex_unlock(&n->dev.mutex);
> > > return r;
> > > }
> > > diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> > > index d1f0453..cffe39a 100644
> > > --- a/drivers/vhost/vhost.h
> > > +++ b/drivers/vhost/vhost.h
> > > @@ -43,6 +43,11 @@ struct vhost_log {
> > > u64 len;
> > > };
> > >
> > > +enum vhost_vq_link_state {
> > > + VHOST_VQ_LINK_SYNC = 0,
> > > + VHOST_VQ_LINK_ASYNC = 1,
> > > +};
> > > +
> > > /* The virtqueue structure describes a queue attached to a device. */
> > > struct vhost_virtqueue {
> > > struct vhost_dev *dev;
> > > @@ -96,6 +101,11 @@ struct vhost_virtqueue {
> > > /* Log write descriptors */
> > > void __user *log_base;
> > > struct vhost_log log[VHOST_NET_MAX_SG];
> > > + /*Differiate async socket for 0-copy from normal*/
> > > + enum vhost_vq_link_state link_state;
> > > + struct list_head notifier;
> > > + spinlock_t notify_lock;
> > > + void (*receiver)(struct vhost_virtqueue *);
> > > };
> > >
> > > struct vhost_dev {
> > > --
> > > 1.5.4.4

2010-04-07 08:22:14

by Michael S. Tsirkin

[permalink] [raw]
Subject: Re: [PATCH v1 2/3] Provides multiple submits and asynchronous notifications.

On Wed, Apr 07, 2010 at 09:36:36AM +0800, Xin, Xiaohui wrote:
> Michael,
> > > >>>> For the write logging, do you have a function in hand that we can
> > > >>>> recompute the log? If that, I think I can use it to recompute the
> > > >>>>log info when the logging is suddenly enabled.
> > > >>>> For the outstanding requests, do you mean all the user buffers have
> > > >>>>submitted before the logging ioctl changed? That may be a lot, and
> > > >> >>some of them are still in NIC ring descriptors. Waiting them to be
> > > >>>>finished may be need some time. I think when logging ioctl changed,
> > > >> >>then the logging is changed just after that is also reasonable.
>
> > > >>>The key point is that after loggin ioctl returns, any
> > > >>>subsequent change to memory must be logged. It does not
> > > >>>matter when was the request submitted, otherwise we will
> > > >>>get memory corruption on migration.
>
> > > >>The change to memory happens when vhost_add_used_and_signal(), right?
> > > >>So after ioctl returns, just recompute the log info to the events in the async queue,
> > > >>is ok. Since the ioctl and write log operations are all protected by vq->mutex.
>
> > >>> Thanks
> > >> >Xiaohui
>
> > >>Yes, I think this will work.
>
> >> Thanks, so do you have the function to recompute the log info in your hand that I can
> >>use? I have weakly remembered that you have noticed it before some time.
>
> >Doesn't just rerunning vhost_get_vq_desc work?
>
> Am I missing something here?
> The vhost_get_vq_desc() looks in vq, and finds the first available buffers, and converts it
> to an iovec. I think the first available buffer is not the buffers in the async queue, so I
> think rerunning vhost_get_vq_desc() cannot work.
>
> Thanks
> Xiaohui

Right, but we can move the head back, so we'll find the same buffers
again, or add a variant of vhost_get_vq_desc that will process
descriptors already consumed.

> > > > Thanks
> > > > Xiaohui
> > > >
> > > > drivers/vhost/net.c | 189 +++++++++++++++++++++++++++++++++++++++++++++++--
> > > > drivers/vhost/vhost.h | 10 +++
> > > > 2 files changed, 192 insertions(+), 7 deletions(-)
> > > >
> > > > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> > > > index 22d5fef..2aafd90 100644
> > > > --- a/drivers/vhost/net.c
> > > > +++ b/drivers/vhost/net.c
> > > > @@ -17,11 +17,13 @@
> > > > #include <linux/workqueue.h>
> > > > #include <linux/rcupdate.h>
> > > > #include <linux/file.h>
> > > > +#include <linux/aio.h>
> > > >
> > > > #include <linux/net.h>
> > > > #include <linux/if_packet.h>
> > > > #include <linux/if_arp.h>
> > > > #include <linux/if_tun.h>
> > > > +#include <linux/mpassthru.h>
> > > >
> > > > #include <net/sock.h>
> > > >
> > > > @@ -47,6 +49,7 @@ struct vhost_net {
> > > > struct vhost_dev dev;
> > > > struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX];
> > > > struct vhost_poll poll[VHOST_NET_VQ_MAX];
> > > > + struct kmem_cache *cache;
> > > > /* Tells us whether we are polling a socket for TX.
> > > > * We only do this when socket buffer fills up.
> > > > * Protected by tx vq lock. */
> > > > @@ -91,11 +94,88 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
> > > > net->tx_poll_state = VHOST_NET_POLL_STARTED;
> > > > }
> > > >
> > > > +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> > > > +{
> > > > + struct kiocb *iocb = NULL;
> > > > + unsigned long flags;
> > > > +
> > > > + spin_lock_irqsave(&vq->notify_lock, flags);
> > > > + if (!list_empty(&vq->notifier)) {
> > > > + iocb = list_first_entry(&vq->notifier,
> > > > + struct kiocb, ki_list);
> > > > + list_del(&iocb->ki_list);
> > > > + }
> > > > + spin_unlock_irqrestore(&vq->notify_lock, flags);
> > > > + return iocb;
> > > > +}
> > > > +
> > > > +static void handle_async_rx_events_notify(struct vhost_net *net,
> > > > + struct vhost_virtqueue *vq)
> > > > +{
> > > > + struct kiocb *iocb = NULL;
> > > > + struct vhost_log *vq_log = NULL;
> > > > + int rx_total_len = 0;
> > > > + int log, size;
> > > > +
> > > > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > > > + return;
> > > > +
> > > > + if (vq->receiver)
> > > > + vq->receiver(vq);
> > > > +
> > > > + vq_log = unlikely(vhost_has_feature(
> > > > + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
> > > > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > > > + vhost_add_used_and_signal(&net->dev, vq,
> > > > + iocb->ki_pos, iocb->ki_nbytes);
> > > > + log = (int)iocb->ki_user_data;
> > > > + size = iocb->ki_nbytes;
> > > > + rx_total_len += iocb->ki_nbytes;
> > > > +
> > > > + if (iocb->ki_dtor)
> > > > + iocb->ki_dtor(iocb);
> > > > + kmem_cache_free(net->cache, iocb);
> > > > +
> > > > + if (unlikely(vq_log))
> > > > + vhost_log_write(vq, vq_log, log, size);
> > > > + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> > > > + vhost_poll_queue(&vq->poll);
> > > > + break;
> > > > + }
> > > > + }
> > > > +}
> > > > +
> > > > +static void handle_async_tx_events_notify(struct vhost_net *net,
> > > > + struct vhost_virtqueue *vq)
> > > > +{
> > > > + struct kiocb *iocb = NULL;
> > > > + int tx_total_len = 0;
> > > > +
> > > > + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> > > > + return;
> > > > +
> > > > + while ((iocb = notify_dequeue(vq)) != NULL) {
> > > > + vhost_add_used_and_signal(&net->dev, vq,
> > > > + iocb->ki_pos, 0);
> > > > + tx_total_len += iocb->ki_nbytes;
> > > > +
> > > > + if (iocb->ki_dtor)
> > > > + iocb->ki_dtor(iocb);
> > > > +
> > > > + kmem_cache_free(net->cache, iocb);
> > > > + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> > > > + vhost_poll_queue(&vq->poll);
> > > > + break;
> > > > + }
> > > > + }
> > > > +}
> > > > +
> > > > /* Expects to be always run from workqueue - which acts as
> > > > * read-size critical section for our kind of RCU. */
> > > > static void handle_tx(struct vhost_net *net)
> > > > {
> > > > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
> > > > + struct kiocb *iocb = NULL;
> > > > unsigned head, out, in, s;
> > > > struct msghdr msg = {
> > > > .msg_name = NULL,
> > > > @@ -124,6 +204,8 @@ static void handle_tx(struct vhost_net *net)
> > > > tx_poll_stop(net);
> > > > hdr_size = vq->hdr_size;
> > > >
> > > > + handle_async_tx_events_notify(net, vq);
> > > > +
> > > > for (;;) {
> > > > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > > > ARRAY_SIZE(vq->iov),
> > > > @@ -151,6 +233,15 @@ static void handle_tx(struct vhost_net *net)
> > > > /* Skip header. TODO: support TSO. */
> > > > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
> > > > msg.msg_iovlen = out;
> > > > +
> > > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > > > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > > > + if (!iocb)
> > > > + break;
> > > > + iocb->ki_pos = head;
> > > > + iocb->private = (void *)vq;
> > > > + }
> > > > +
> > > > len = iov_length(vq->iov, out);
> > > > /* Sanity check */
> > > > if (!len) {
> > > > @@ -160,12 +251,16 @@ static void handle_tx(struct vhost_net *net)
> > > > break;
> > > > }
> > > > /* TODO: Check specific error and bomb out unless ENOBUFS? */
> > > > - err = sock->ops->sendmsg(NULL, sock, &msg, len);
> > > > + err = sock->ops->sendmsg(iocb, sock, &msg, len);
> > > > if (unlikely(err < 0)) {
> > > > vhost_discard_vq_desc(vq);
> > > > tx_poll_start(net, sock);
> > > > break;
> > > > }
> > > > +
> > > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > > > + continue;
> > > > +
> > > > if (err != len)
> > > > pr_err("Truncated TX packet: "
> > > > " len %d != %zd\n", err, len);
> > > > @@ -177,6 +272,8 @@ static void handle_tx(struct vhost_net *net)
> > > > }
> > > > }
> > > >
> > > > + handle_async_tx_events_notify(net, vq);
> > > > +
> > > > mutex_unlock(&vq->mutex);
> > > > unuse_mm(net->dev.mm);
> > > > }
> > > > @@ -186,6 +283,7 @@ static void handle_tx(struct vhost_net *net)
> > > > static void handle_rx(struct vhost_net *net)
> > > > {
> > > > struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> > > > + struct kiocb *iocb = NULL;
> > > > unsigned head, out, in, log, s;
> > > > struct vhost_log *vq_log;
> > > > struct msghdr msg = {
> > > > @@ -206,7 +304,8 @@ static void handle_rx(struct vhost_net *net)
> > > > int err;
> > > > size_t hdr_size;
> > > > struct socket *sock = rcu_dereference(vq->private_data);
> > > > - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> > > > + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
> > > > + vq->link_state == VHOST_VQ_LINK_SYNC))
> > > > return;
> > > >
> > > > use_mm(net->dev.mm);
> > > > @@ -214,9 +313,18 @@ static void handle_rx(struct vhost_net *net)
> > > > vhost_disable_notify(vq);
> > > > hdr_size = vq->hdr_size;
> > > >
> > > > - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> > > > + /* In async cases, for write logging, the simple way is to get
> > > > + * the log info always, and really logging is decided later.
> > > > + * Thus, when logging enabled, we can get log, and when logging
> > > > + * disabled, we can get log disabled accordingly.
> > > > + */
> > > > +
> > > > + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
> > > > + (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
> > > > vq->log : NULL;
> > > >
> > > > + handle_async_rx_events_notify(net, vq);
> > > > +
> > > > for (;;) {
> > > > head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > > > ARRAY_SIZE(vq->iov),
> > > > @@ -245,6 +353,14 @@ static void handle_rx(struct vhost_net *net)
> > > > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> > > > msg.msg_iovlen = in;
> > > > len = iov_length(vq->iov, in);
> > > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > > > + iocb = kmem_cache_zalloc(net->cache, GFP_KERNEL);
> > > > + if (!iocb)
> > > > + break;
> > > > + iocb->private = vq;
> > > > + iocb->ki_pos = head;
> > > > + iocb->ki_user_data = log;
> > > > + }
> > > > /* Sanity check */
> > > > if (!len) {
> > > > vq_err(vq, "Unexpected header len for RX: "
> > > > @@ -252,13 +368,18 @@ static void handle_rx(struct vhost_net *net)
> > > > iov_length(vq->hdr, s), hdr_size);
> > > > break;
> > > > }
> > > > - err = sock->ops->recvmsg(NULL, sock, &msg,
> > > > +
> > > > + err = sock->ops->recvmsg(iocb, sock, &msg,
> > > > len, MSG_DONTWAIT | MSG_TRUNC);
> > > > /* TODO: Check specific error and bomb out unless EAGAIN? */
> > > > if (err < 0) {
> > > > vhost_discard_vq_desc(vq);
> > > > break;
> > > > }
> > > > +
> > > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> > > > + continue;
> > > > +
> > > > /* TODO: Should check and handle checksum. */
> > > > if (err > len) {
> > > > pr_err("Discarded truncated rx packet: "
> > > > @@ -284,10 +405,13 @@ static void handle_rx(struct vhost_net *net)
> > > > }
> > > > }
> > > >
> > > > + handle_async_rx_events_notify(net, vq);
> > > > +
> > > > mutex_unlock(&vq->mutex);
> > > > unuse_mm(net->dev.mm);
> > > > }
> > > >
> > > > +
> > > > static void handle_tx_kick(struct work_struct *work)
> > > > {
> > > > struct vhost_virtqueue *vq;
> > > > @@ -338,6 +462,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
> > > > vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT);
> > > > vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN);
> > > > n->tx_poll_state = VHOST_NET_POLL_DISABLED;
> > > > + n->cache = NULL;
> > > > return 0;
> > > > }
> > > >
> > > > @@ -398,6 +523,17 @@ static void vhost_net_flush(struct vhost_net *n)
> > > > vhost_net_flush_vq(n, VHOST_NET_VQ_RX);
> > > > }
> > > >
> > > > +static void vhost_notifier_cleanup(struct vhost_net *n)
> > > > +{
> > > > + struct vhost_virtqueue *vq = &n->dev.vqs[VHOST_NET_VQ_RX];
> > > > + struct kiocb *iocb = NULL;
> > > > + if (n->cache) {
> > > > + while ((iocb = notify_dequeue(vq)) != NULL)
> > > > + kmem_cache_free(n->cache, iocb);
> > > > + kmem_cache_destroy(n->cache);
> > > > + }
> > > > +}
> > > > +
> > > > static int vhost_net_release(struct inode *inode, struct file *f)
> > > > {
> > > > struct vhost_net *n = f->private_data;
> > > > @@ -414,6 +550,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
> > > > /* We do an extra flush before freeing memory,
> > > > * since jobs can re-queue themselves. */
> > > > vhost_net_flush(n);
> > > > + vhost_notifier_cleanup(n);
> > > > kfree(n);
> > > > return 0;
> > > > }
> > > > @@ -462,7 +599,19 @@ static struct socket *get_tun_socket(int fd)
> > > > return sock;
> > > > }
> > > >
> > > > -static struct socket *get_socket(int fd)
> > > > +static struct socket *get_mp_socket(int fd)
> > > > +{
> > > > + struct file *file = fget(fd);
> > > > + struct socket *sock;
> > > > + if (!file)
> > > > + return ERR_PTR(-EBADF);
> > > > + sock = mp_get_socket(file);
> > > > + if (IS_ERR(sock))
> > > > + fput(file);
> > > > + return sock;
> > > > +}
> > > > +
> > > > +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
> > > > {
> > > > struct socket *sock;
> > > > if (fd == -1)
> > > > @@ -473,9 +622,31 @@ static struct socket *get_socket(int fd)
> > > > sock = get_tun_socket(fd);
> > > > if (!IS_ERR(sock))
> > > > return sock;
> > > > + sock = get_mp_socket(fd);
> > > > + if (!IS_ERR(sock)) {
> > > > + vq->link_state = VHOST_VQ_LINK_ASYNC;
> > > > + return sock;
> > > > + }
> > > > return ERR_PTR(-ENOTSOCK);
> > > > }
> > > >
> > > > +static void vhost_init_link_state(struct vhost_net *n, int index)
> > > > +{
> > > > + struct vhost_virtqueue *vq = n->vqs + index;
> > > > +
> > > > + WARN_ON(!mutex_is_locked(&vq->mutex));
> > > > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> > > > + vq->receiver = NULL;
> > > > + INIT_LIST_HEAD(&vq->notifier);
> > > > + spin_lock_init(&vq->notify_lock);
> > > > + if (!n->cache) {
> > > > + n->cache = kmem_cache_create("vhost_kiocb",
> > > > + sizeof(struct kiocb), 0,
> > > > + SLAB_HWCACHE_ALIGN, NULL);
> > > > + }
> > > > + }
> > > > +}
> > > > +
> > > > static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > > > {
> > > > struct socket *sock, *oldsock;
> > > > @@ -493,12 +664,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > > > }
> > > > vq = n->vqs + index;
> > > > mutex_lock(&vq->mutex);
> > > > - sock = get_socket(fd);
> > > > + vq->link_state = VHOST_VQ_LINK_SYNC;
> > > > + sock = get_socket(vq, fd);
> > > > if (IS_ERR(sock)) {
> > > > r = PTR_ERR(sock);
> > > > goto err;
> > > > }
> > > >
> > > > + vhost_init_link_state(n, index);
> > > > +
> > > > /* start polling new socket */
> > > > oldsock = vq->private_data;
> > > > if (sock == oldsock)
> > > > @@ -507,8 +681,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> > > > vhost_net_disable_vq(n, vq);
> > > > rcu_assign_pointer(vq->private_data, sock);
> > > > vhost_net_enable_vq(n, vq);
> > > > - mutex_unlock(&vq->mutex);
> > > > done:
> > > > + mutex_unlock(&vq->mutex);
> > > > mutex_unlock(&n->dev.mutex);
> > > > if (oldsock) {
> > > > vhost_net_flush_vq(n, index);
> > > > @@ -516,6 +690,7 @@ done:
> > > > }
> > > > return r;
> > > > err:
> > > > + mutex_unlock(&vq->mutex);
> > > > mutex_unlock(&n->dev.mutex);
> > > > return r;
> > > > }
> > > > diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> > > > index d1f0453..cffe39a 100644
> > > > --- a/drivers/vhost/vhost.h
> > > > +++ b/drivers/vhost/vhost.h
> > > > @@ -43,6 +43,11 @@ struct vhost_log {
> > > > u64 len;
> > > > };
> > > >
> > > > +enum vhost_vq_link_state {
> > > > + VHOST_VQ_LINK_SYNC = 0,
> > > > + VHOST_VQ_LINK_ASYNC = 1,
> > > > +};
> > > > +
> > > > /* The virtqueue structure describes a queue attached to a device. */
> > > > struct vhost_virtqueue {
> > > > struct vhost_dev *dev;
> > > > @@ -96,6 +101,11 @@ struct vhost_virtqueue {
> > > > /* Log write descriptors */
> > > > void __user *log_base;
> > > > struct vhost_log log[VHOST_NET_MAX_SG];
> > > > + /*Differiate async socket for 0-copy from normal*/
> > > > + enum vhost_vq_link_state link_state;
> > > > + struct list_head notifier;
> > > > + spinlock_t notify_lock;
> > > > + void (*receiver)(struct vhost_virtqueue *);
> > > > };
> > > >
> > > > struct vhost_dev {
> > > > --
> > > > 1.5.4.4

2010-04-08 09:05:07

by Xin, Xiaohui

[permalink] [raw]
Subject: Re:[PATCH v1 2/3] Provides multiple submits and asynchronous notifications.

From: Xin Xiaohui <[email protected]>

---
Michael,
This is a small patch for the write logging issue with async queue.
I have made a __vhost_get_vq_desc() func which may compute the log
info with any valid buffer index. The __vhost_get_vq_desc() is
coming from the code in vq_get_vq_desc().
And I use it to recompute the log info when logging is enabled.

Thanks
Xiaohui

drivers/vhost/net.c | 27 ++++++++---
drivers/vhost/vhost.c | 115 ++++++++++++++++++++++++++++---------------------
drivers/vhost/vhost.h | 5 ++
3 files changed, 90 insertions(+), 57 deletions(-)

diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index 2aafd90..00a45ef 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -115,7 +115,8 @@ static void handle_async_rx_events_notify(struct vhost_net *net,
struct kiocb *iocb = NULL;
struct vhost_log *vq_log = NULL;
int rx_total_len = 0;
- int log, size;
+ unsigned int head, log, in, out;
+ int size;

if (vq->link_state != VHOST_VQ_LINK_ASYNC)
return;
@@ -130,14 +131,25 @@ static void handle_async_rx_events_notify(struct vhost_net *net,
iocb->ki_pos, iocb->ki_nbytes);
log = (int)iocb->ki_user_data;
size = iocb->ki_nbytes;
+ head = iocb->ki_pos;
rx_total_len += iocb->ki_nbytes;

if (iocb->ki_dtor)
iocb->ki_dtor(iocb);
kmem_cache_free(net->cache, iocb);

- if (unlikely(vq_log))
+ /* when log is enabled, recomputing the log info is needed,
+ * since these buffers are in async queue, and may not get
+ * the log info before.
+ */
+ if (unlikely(vq_log)) {
+ if (!log)
+ __vhost_get_vq_desc(&net->dev, vq, vq->iov,
+ ARRAY_SIZE(vq->iov),
+ &out, &in, vq_log,
+ &log, head);
vhost_log_write(vq, vq_log, log, size);
+ }
if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
vhost_poll_queue(&vq->poll);
break;
@@ -313,14 +325,13 @@ static void handle_rx(struct vhost_net *net)
vhost_disable_notify(vq);
hdr_size = vq->hdr_size;

- /* In async cases, for write logging, the simple way is to get
- * the log info always, and really logging is decided later.
- * Thus, when logging enabled, we can get log, and when logging
- * disabled, we can get log disabled accordingly.
+ /* In async cases, when write log is enabled, in case the submitted
+ * buffers did not get log info before the log enabling, so we'd
+ * better recompute the log info when needed. We do this in
+ * handle_async_rx_events_notify().
*/

- vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
- (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
+ vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
vq->log : NULL;

handle_async_rx_events_notify(net, vq);
diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
index 97233d5..53dab80 100644
--- a/drivers/vhost/vhost.c
+++ b/drivers/vhost/vhost.c
@@ -715,66 +715,21 @@ static unsigned get_indirect(struct vhost_dev *dev, struct vhost_virtqueue *vq,
return 0;
}

-/* This looks in the virtqueue and for the first available buffer, and converts
- * it to an iovec for convenient access. Since descriptors consist of some
- * number of output then some number of input descriptors, it's actually two
- * iovecs, but we pack them into one and note how many of each there were.
- *
- * This function returns the descriptor number found, or vq->num (which
- * is never a valid descriptor number) if none was found. */
-unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
+unsigned __vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
struct iovec iov[], unsigned int iov_size,
unsigned int *out_num, unsigned int *in_num,
- struct vhost_log *log, unsigned int *log_num)
+ struct vhost_log *log, unsigned int *log_num,
+ unsigned int head)
{
struct vring_desc desc;
- unsigned int i, head, found = 0;
- u16 last_avail_idx;
+ unsigned int i = head, found = 0;
int ret;

- /* Check it isn't doing very strange things with descriptor numbers. */
- last_avail_idx = vq->last_avail_idx;
- if (get_user(vq->avail_idx, &vq->avail->idx)) {
- vq_err(vq, "Failed to access avail idx at %p\n",
- &vq->avail->idx);
- return vq->num;
- }
-
- if ((u16)(vq->avail_idx - last_avail_idx) > vq->num) {
- vq_err(vq, "Guest moved used index from %u to %u",
- last_avail_idx, vq->avail_idx);
- return vq->num;
- }
-
- /* If there's nothing new since last we looked, return invalid. */
- if (vq->avail_idx == last_avail_idx)
- return vq->num;
-
- /* Only get avail ring entries after they have been exposed by guest. */
- rmb();
-
- /* Grab the next descriptor number they're advertising, and increment
- * the index we've seen. */
- if (get_user(head, &vq->avail->ring[last_avail_idx % vq->num])) {
- vq_err(vq, "Failed to read head: idx %d address %p\n",
- last_avail_idx,
- &vq->avail->ring[last_avail_idx % vq->num]);
- return vq->num;
- }
-
- /* If their number is silly, that's an error. */
- if (head >= vq->num) {
- vq_err(vq, "Guest says index %u > %u is available",
- head, vq->num);
- return vq->num;
- }
-
/* When we start there are none of either input nor output. */
*out_num = *in_num = 0;
if (unlikely(log))
*log_num = 0;

- i = head;
do {
unsigned iov_count = *in_num + *out_num;
if (i >= vq->num) {
@@ -833,8 +788,70 @@ unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
*out_num += ret;
}
} while ((i = next_desc(&desc)) != -1);
+ return head;
+}
+
+/* This looks in the virtqueue and for the first available buffer, and converts
+ * it to an iovec for convenient access. Since descriptors consist of some
+ * number of output then some number of input descriptors, it's actually two
+ * iovecs, but we pack them into one and note how many of each there were.
+ *
+ * This function returns the descriptor number found, or vq->num (which
+ * is never a valid descriptor number) if none was found. */
+unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct vhost_virtqueue *vq,
+ struct iovec iov[], unsigned int iov_size,
+ unsigned int *out_num, unsigned int *in_num,
+ struct vhost_log *log, unsigned int *log_num)
+{
+ struct vring_desc desc;
+ unsigned int i, head, found = 0;
+ u16 last_avail_idx;
+ unsigned int ret;
+
+ /* Check it isn't doing very strange things with descriptor numbers. */
+ last_avail_idx = vq->last_avail_idx;
+ if (get_user(vq->avail_idx, &vq->avail->idx)) {
+ vq_err(vq, "Failed to access avail idx at %p\n",
+ &vq->avail->idx);
+ return vq->num;
+ }
+
+ if ((u16)(vq->avail_idx - last_avail_idx) > vq->num) {
+ vq_err(vq, "Guest moved used index from %u to %u",
+ last_avail_idx, vq->avail_idx);
+ return vq->num;
+ }
+
+ /* If there's nothing new since last we looked, return invalid. */
+ if (vq->avail_idx == last_avail_idx)
+ return vq->num;
+
+ /* Only get avail ring entries after they have been exposed by guest. */
+ rmb();
+
+ /* Grab the next descriptor number they're advertising, and increment
+ * the index we've seen. */
+ if (get_user(head, &vq->avail->ring[last_avail_idx % vq->num])) {
+ vq_err(vq, "Failed to read head: idx %d address %p\n",
+ last_avail_idx,
+ &vq->avail->ring[last_avail_idx % vq->num]);
+ return vq->num;
+ }
+
+ /* If their number is silly, that's an error. */
+ if (head >= vq->num) {
+ vq_err(vq, "Guest says index %u > %u is available",
+ head, vq->num);
+ return vq->num;
+ }
+
+ ret = __vhost_get_vq_desc(dev, vq, iov, iov_size,
+ out_num, in_num,
+ log, log_num, head);

/* On success, increment avail index. */
+ if (ret == vq->num)
+ return ret;
vq->last_avail_idx++;
return head;
}
diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
index cffe39a..a74a6d4 100644
--- a/drivers/vhost/vhost.h
+++ b/drivers/vhost/vhost.h
@@ -132,6 +132,11 @@ unsigned vhost_get_vq_desc(struct vhost_dev *, struct vhost_virtqueue *,
struct iovec iov[], unsigned int iov_count,
unsigned int *out_num, unsigned int *in_num,
struct vhost_log *log, unsigned int *log_num);
+unsigned __vhost_get_vq_desc(struct vhost_dev *, struct vhost_virtqueue *,
+ struct iovec iov[], unsigned int iov_count,
+ unsigned int *out_num, unsigned int *in_num,
+ struct vhost_log *log, unsigned int *log_num,
+ unsigned int head);
void vhost_discard_vq_desc(struct vhost_virtqueue *);

int vhost_add_used(struct vhost_virtqueue *, unsigned int head, int len);
--
1.5.4.4