2017-06-02 19:33:55

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 00/18] introduce the Xen PV Calls backend

Hi all,

this series introduces the backend for the newly introduced PV Calls
procotol.

PV Calls is a paravirtualized protocol that allows the implementation of
a set of POSIX functions in a different domain. The PV Calls frontend
sends POSIX function calls to the backend, which implements them and
returns a value to the frontend and acts on the function call.

For more information about PV Calls, please read:

https://xenbits.xen.org/docs/unstable/misc/pvcalls.html

I tried to split the source code into small pieces to make it easier to
read and understand. Please review!


Changes in v3:
- added reviewed-bys
- return err from pvcalls_back_probe
- remove old comments
- use a xenstore transaction in pvcalls_back_probe
- ignore errors from xenbus_switch_state
- rename pvcalls_back_priv to pvcalls_fedata
- remove addr from backend_connect
- remove priv->work, add comment about theoretical race
- use IPPROTO_IP
- refactor active socket allocation in a single new function

Changes in v2:
- allocate one ioworker per socket (rather than 1 per vcpu)
- rename privs to frontends
- add newlines
- define "1" in the public header
- better error returns in pvcalls_back_probe
- do not set XenbusStateClosed twice in set_backend_state
- add more comments
- replace rw_semaphore with semaphore
- rename pvcallss to socket_lock
- move xenbus_map_ring_valloc closer to first use in backend_connect
- use more traditional return codes from pvcalls_back_handle_cmd and
callees
- remove useless dev == NULL checks
- replace lock_sock with more appropriate and fine grained socket locks


Stefano Stabellini (18):
xen: introduce the pvcalls interface header
xen/pvcalls: introduce the pvcalls xenbus backend
xen/pvcalls: initialize the module and register the xenbus backend
xen/pvcalls: xenbus state handling
xen/pvcalls: connect to a frontend
xen/pvcalls: handle commands from the frontend
xen/pvcalls: implement socket command
xen/pvcalls: implement connect command
xen/pvcalls: implement bind command
xen/pvcalls: implement listen command
xen/pvcalls: implement accept command
xen/pvcalls: implement poll command
xen/pvcalls: implement release command
xen/pvcalls: disconnect and module_exit
xen/pvcalls: implement the ioworker functions
xen/pvcalls: implement read
xen/pvcalls: implement write
xen: introduce a Kconfig option to enable the pvcalls backend

drivers/xen/Kconfig | 12 +
drivers/xen/Makefile | 1 +
drivers/xen/pvcalls-back.c | 1228 ++++++++++++++++++++++++++++++++++++
include/xen/interface/io/pvcalls.h | 121 ++++
include/xen/interface/io/ring.h | 2 +
5 files changed, 1364 insertions(+)
create mode 100644 drivers/xen/pvcalls-back.c
create mode 100644 include/xen/interface/io/pvcalls.h


2017-06-02 19:32:04

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 08/18] xen/pvcalls: implement connect command

Allocate a socket. Keep track of socket <-> ring mappings with a new data
structure, called sock_mapping. Implement the connect command by calling
inet_stream_connect, and mapping the new indexes page and data ring.
Allocate a workqueue and a work_struct, called ioworker, to perform
reads and writes to the socket.

When an active socket is closed (sk_state_change), set in_error to
-ENOTCONN and notify the other end, as specified by the protocol.

sk_data_ready and pvcalls_back_ioworker will be implemented later.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 170 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 170 insertions(+)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index 1f2bb26..3eb84ef 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -56,6 +56,40 @@ struct pvcalls_fedata {
struct work_struct register_work;
};

+struct pvcalls_ioworker {
+ struct work_struct register_work;
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+};
+
+struct sock_mapping {
+ struct list_head list;
+ struct pvcalls_fedata *priv;
+ struct socket *sock;
+ uint64_t id;
+ grant_ref_t ref;
+ struct pvcalls_data_intf *ring;
+ void *bytes;
+ struct pvcalls_data data;
+ uint32_t ring_order;
+ int irq;
+ atomic_t read;
+ atomic_t write;
+ atomic_t io;
+ atomic_t release;
+ void (*saved_data_ready)(struct sock *sk);
+ struct pvcalls_ioworker ioworker;
+};
+
+static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map);
+static int pvcalls_back_release_active(struct xenbus_device *dev,
+ struct pvcalls_fedata *priv,
+ struct sock_mapping *map);
+
+static void pvcalls_back_ioworker(struct work_struct *work)
+{
+}
+
static int pvcalls_back_socket(struct xenbus_device *dev,
struct xen_pvcalls_request *req)
{
@@ -84,9 +118,140 @@ static int pvcalls_back_socket(struct xenbus_device *dev,
return ret;
}

+static void pvcalls_sk_state_change(struct sock *sock)
+{
+ struct sock_mapping *map = sock->sk_user_data;
+ struct pvcalls_data_intf *intf;
+
+ if (map == NULL)
+ return;
+
+ intf = map->ring;
+ intf->in_error = -ENOTCONN;
+ notify_remote_via_irq(map->irq);
+}
+
+static void pvcalls_sk_data_ready(struct sock *sock)
+{
+}
+
+static struct sock_mapping *pvcalls_new_active_socket(
+ struct pvcalls_fedata *priv,
+ uint64_t id,
+ grant_ref_t ref,
+ uint32_t evtchn,
+ struct socket *sock)
+{
+ int ret;
+ struct sock_mapping *map = NULL;
+ void *page;
+
+ map = kzalloc(sizeof(*map), GFP_KERNEL);
+ if (map == NULL)
+ return NULL;
+
+ map->priv = priv;
+ map->sock = sock;
+ map->id = id;
+ map->ref = ref;
+
+ ret = xenbus_map_ring_valloc(priv->dev, &ref, 1, &page);
+ if (ret < 0)
+ goto out;
+ map->ring = page;
+ map->ring_order = map->ring->ring_order;
+ /* first read the order, then map the data ring */
+ virt_rmb();
+ if (map->ring_order > MAX_RING_ORDER)
+ goto out;
+ ret = xenbus_map_ring_valloc(priv->dev, map->ring->ref,
+ (1 << map->ring_order), &page);
+ if (ret < 0)
+ goto out;
+ map->bytes = page;
+
+ ret = bind_interdomain_evtchn_to_irqhandler(priv->dev->otherend_id,
+ evtchn,
+ pvcalls_back_conn_event,
+ 0,
+ "pvcalls-backend",
+ map);
+ if (ret < 0)
+ goto out;
+ map->irq = ret;
+
+ map->data.in = map->bytes;
+ map->data.out = map->bytes + XEN_FLEX_RING_SIZE(map->ring_order);
+
+ map->ioworker.wq = alloc_workqueue("pvcalls_io", WQ_UNBOUND, 1);
+ if (!map->ioworker.wq)
+ goto out;
+ map->ioworker.cpu = get_random_int() % num_online_cpus();
+ atomic_set(&map->io, 1);
+ INIT_WORK(&map->ioworker.register_work, pvcalls_back_ioworker);
+
+ down(&priv->socket_lock);
+ list_add_tail(&map->list, &priv->socket_mappings);
+ up(&priv->socket_lock);
+
+ write_lock_bh(&map->sock->sk->sk_callback_lock);
+ map->saved_data_ready = map->sock->sk->sk_data_ready;
+ map->sock->sk->sk_user_data = map;
+ map->sock->sk->sk_data_ready = pvcalls_sk_data_ready;
+ map->sock->sk->sk_state_change = pvcalls_sk_state_change;
+ write_unlock_bh(&map->sock->sk->sk_callback_lock);
+
+ return map;
+out:
+ pvcalls_back_release_active(priv->dev, priv, map);
+ return NULL;
+}
+
static int pvcalls_back_connect(struct xenbus_device *dev,
struct xen_pvcalls_request *req)
{
+ struct pvcalls_fedata *priv;
+ int ret;
+ struct socket *sock;
+ struct sock_mapping *map = NULL;
+ struct xen_pvcalls_response *rsp;
+
+ priv = dev_get_drvdata(&dev->dev);
+
+ ret = sock_create(AF_INET, SOCK_STREAM, 0, &sock);
+ if (ret < 0)
+ goto out;
+ ret = inet_stream_connect(sock, (struct sockaddr *)&req->u.connect.addr,
+ req->u.connect.len, req->u.connect.flags);
+ if (ret < 0) {
+ sock_release(map->sock);
+ goto out;
+ }
+
+ map = pvcalls_new_active_socket(priv,
+ req->u.connect.id,
+ req->u.connect.ref,
+ req->u.connect.evtchn,
+ sock);
+ if (!map) {
+ sock_release(map->sock);
+ goto out;
+ }
+
+out:
+ rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
+ rsp->req_id = req->req_id;
+ rsp->cmd = req->cmd;
+ rsp->u.connect.id = req->u.connect.id;
+ rsp->ret = ret;
+
+ return ret;
+}
+
+static int pvcalls_back_release_active(struct xenbus_device *dev,
+ struct pvcalls_fedata *priv,
+ struct sock_mapping *map)
+{
return 0;
}

@@ -206,6 +371,11 @@ static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
return IRQ_HANDLED;
}

+static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map)
+{
+ return IRQ_HANDLED;
+}
+
static int backend_connect(struct xenbus_device *dev)
{
int err, evtchn;
--
1.9.1

2017-06-02 19:32:10

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 18/18] xen: introduce a Kconfig option to enable the pvcalls backend

Also add pvcalls-back to the Makefile.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/Kconfig | 12 ++++++++++++
drivers/xen/Makefile | 1 +
2 files changed, 13 insertions(+)

diff --git a/drivers/xen/Kconfig b/drivers/xen/Kconfig
index f15bb3b7..bbdf059 100644
--- a/drivers/xen/Kconfig
+++ b/drivers/xen/Kconfig
@@ -196,6 +196,18 @@ config XEN_PCIDEV_BACKEND

If in doubt, say m.

+config XEN_PVCALLS_BACKEND
+ bool "XEN PV Calls backend driver"
+ depends on INET && XEN
+ default n
+ help
+ Experimental backend for the Xen PV Calls protocol
+ (https://xenbits.xen.org/docs/unstable/misc/pvcalls.html). It
+ allows PV Calls frontends to send POSIX calls to the backend,
+ which implements them.
+
+ If in doubt, say n.
+
config XEN_SCSI_BACKEND
tristate "XEN SCSI backend driver"
depends on XEN && XEN_BACKEND && TARGET_CORE
diff --git a/drivers/xen/Makefile b/drivers/xen/Makefile
index 8feab810..480b928 100644
--- a/drivers/xen/Makefile
+++ b/drivers/xen/Makefile
@@ -38,6 +38,7 @@ obj-$(CONFIG_XEN_ACPI_PROCESSOR) += xen-acpi-processor.o
obj-$(CONFIG_XEN_EFI) += efi.o
obj-$(CONFIG_XEN_SCSI_BACKEND) += xen-scsiback.o
obj-$(CONFIG_XEN_AUTO_XLATE) += xlate_mmu.o
+obj-$(CONFIG_XEN_PVCALLS_BACKEND) += pvcalls-back.o
xen-evtchn-y := evtchn.o
xen-gntdev-y := gntdev.o
xen-gntalloc-y := gntalloc.o
--
1.9.1

2017-06-02 19:32:12

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 11/18] xen/pvcalls: implement accept command

Implement the accept command by calling inet_accept. To avoid blocking
in the kernel, call inet_accept(O_NONBLOCK) from a workqueue, which get
scheduled on sk_data_ready (for a passive socket, it means that there
are connections to accept).

Use the reqcopy field to store the request. Accept the new socket from
the delayed work function, create a new sock_mapping for it, map
the indexes page and data ring, and reply to the other end. Allocate an
ioworker for the socket.

Only support one outstanding blocking accept request for every socket at
any time.

Add a field to sock_mapping to remember the passive socket from which an
active socket was created.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 109 ++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 108 insertions(+), 1 deletion(-)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index a75586e..f1173f4 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -65,6 +65,7 @@ struct pvcalls_ioworker {
struct sock_mapping {
struct list_head list;
struct pvcalls_fedata *priv;
+ struct sockpass_mapping *sockpass;
struct socket *sock;
uint64_t id;
grant_ref_t ref;
@@ -275,10 +276,79 @@ static int pvcalls_back_release(struct xenbus_device *dev,

static void __pvcalls_back_accept(struct work_struct *work)
{
+ struct sockpass_mapping *mappass = container_of(
+ work, struct sockpass_mapping, register_work);
+ struct sock_mapping *map;
+ struct pvcalls_ioworker *iow;
+ struct pvcalls_fedata *priv;
+ struct socket *sock;
+ struct xen_pvcalls_response *rsp;
+ struct xen_pvcalls_request *req;
+ int notify;
+ int ret = -EINVAL;
+ unsigned long flags;
+
+ priv = mappass->priv;
+ /* We only need to check the value of "cmd" atomically on read. */
+ spin_lock_irqsave(&mappass->copy_lock, flags);
+ req = &mappass->reqcopy;
+ if (req->cmd != PVCALLS_ACCEPT) {
+ spin_unlock_irqrestore(&mappass->copy_lock, flags);
+ return;
+ }
+ spin_unlock_irqrestore(&mappass->copy_lock, flags);
+
+ sock = sock_alloc();
+ if (sock == NULL)
+ goto out_error;
+ sock->type = mappass->sock->type;
+ sock->ops = mappass->sock->ops;
+
+ ret = inet_accept(mappass->sock, sock, O_NONBLOCK, true);
+ if (ret == -EAGAIN) {
+ sock_release(sock);
+ goto out_error;
+ }
+
+ map = pvcalls_new_active_socket(priv,
+ req->u.accept.id_new,
+ req->u.accept.ref,
+ req->u.accept.evtchn,
+ sock);
+ if (!map) {
+ sock_release(sock);
+ goto out_error;
+ }
+
+ map->sockpass = mappass;
+ iow = &map->ioworker;
+ atomic_inc(&map->read);
+ atomic_inc(&map->io);
+ queue_work_on(iow->cpu, iow->wq, &iow->register_work);
+
+out_error:
+ rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
+ rsp->req_id = req->req_id;
+ rsp->cmd = req->cmd;
+ rsp->u.accept.id = req->u.accept.id;
+ rsp->ret = ret;
+ RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
+ if (notify)
+ notify_remote_via_irq(priv->irq);
+
+ spin_lock_irqsave(&mappass->copy_lock, flags);
+ mappass->reqcopy.cmd = 0;
+ spin_unlock_irqrestore(&mappass->copy_lock, flags);
}

static void pvcalls_pass_sk_data_ready(struct sock *sock)
{
+ struct sockpass_mapping *mappass = sock->sk_user_data;
+
+ if (mappass == NULL)
+ return;
+
+ queue_work(mappass->wq, &mappass->register_work);
}

static int pvcalls_back_bind(struct xenbus_device *dev,
@@ -380,7 +450,44 @@ static int pvcalls_back_listen(struct xenbus_device *dev,
static int pvcalls_back_accept(struct xenbus_device *dev,
struct xen_pvcalls_request *req)
{
- return 0;
+ struct pvcalls_fedata *priv;
+ struct sockpass_mapping *mappass;
+ int ret = -EINVAL;
+ struct xen_pvcalls_response *rsp;
+ unsigned long flags;
+
+ priv = dev_get_drvdata(&dev->dev);
+
+ mappass = radix_tree_lookup(&priv->socketpass_mappings,
+ req->u.accept.id);
+ if (mappass == NULL)
+ goto out_error;
+
+ /*
+ * Limitation of the current implementation: only support one
+ * concurrent accept or poll call on one socket.
+ */
+ spin_lock_irqsave(&mappass->copy_lock, flags);
+ if (mappass->reqcopy.cmd != 0) {
+ spin_unlock_irqrestore(&mappass->copy_lock, flags);
+ ret = -EINTR;
+ goto out_error;
+ }
+
+ mappass->reqcopy = *req;
+ spin_unlock_irqrestore(&mappass->copy_lock, flags);
+ queue_work(mappass->wq, &mappass->register_work);
+
+ /* Tell the caller we don't need to send back a notification yet */
+ return -1;
+
+out_error:
+ rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
+ rsp->req_id = req->req_id;
+ rsp->cmd = req->cmd;
+ rsp->u.accept.id = req->u.accept.id;
+ rsp->ret = ret;
+ return ret;
}

static int pvcalls_back_poll(struct xenbus_device *dev,
--
1.9.1

2017-06-02 19:32:42

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 17/18] xen/pvcalls: implement write

When the other end notifies us that there is data to be written
(pvcalls_back_conn_event), increment the io and write counters, and
schedule the ioworker.

Implement the write function called by ioworker by reading the data from
the data ring, writing it to the socket by calling inet_sendmsg.

Set out_error on error.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 74 +++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 73 insertions(+), 1 deletion(-)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index e7d2b85..fe3e70f 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -180,7 +180,66 @@ static void pvcalls_conn_back_read(unsigned long opaque)

static int pvcalls_conn_back_write(struct sock_mapping *map)
{
- return 0;
+ struct pvcalls_data_intf *intf = map->ring;
+ struct pvcalls_data *data = &map->data;
+ struct msghdr msg;
+ struct kvec vec[2];
+ RING_IDX cons, prod, size, ring_size;
+ int ret;
+
+ cons = intf->out_cons;
+ prod = intf->out_prod;
+ /* read the indexes before dealing with the data */
+ virt_mb();
+
+ ring_size = XEN_FLEX_RING_SIZE(map->ring_order);
+ size = pvcalls_queued(prod, cons, ring_size);
+ if (size == 0)
+ return 0;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_flags |= MSG_DONTWAIT;
+ msg.msg_iter.type = ITER_KVEC|READ;
+ msg.msg_iter.count = size;
+ if (pvcalls_mask(prod, ring_size) > pvcalls_mask(cons, ring_size)) {
+ vec[0].iov_base = data->out + pvcalls_mask(cons, ring_size);
+ vec[0].iov_len = size;
+ msg.msg_iter.kvec = vec;
+ msg.msg_iter.nr_segs = 1;
+ } else {
+ vec[0].iov_base = data->out + pvcalls_mask(cons, ring_size);
+ vec[0].iov_len = ring_size - pvcalls_mask(cons, ring_size);
+ vec[1].iov_base = data->out;
+ vec[1].iov_len = size - vec[0].iov_len;
+ msg.msg_iter.kvec = vec;
+ msg.msg_iter.nr_segs = 2;
+ }
+
+ atomic_set(&map->write, 0);
+ ret = inet_sendmsg(map->sock, &msg, size);
+ if (ret == -EAGAIN || ret < size) {
+ atomic_inc(&map->write);
+ atomic_inc(&map->io);
+ }
+ if (ret == -EAGAIN)
+ return ret;
+
+ /* write the data, then update the indexes */
+ virt_wmb();
+ if (ret < 0) {
+ intf->out_error = ret;
+ } else {
+ intf->out_error = 0;
+ intf->out_cons = cons + ret;
+ prod = intf->out_prod;
+ }
+ /* update the indexes, then notify the other end */
+ virt_wmb();
+ if (prod != cons + ret)
+ atomic_inc(&map->write);
+ notify_remote_via_irq(map->irq);
+
+ return ret;
}

static void pvcalls_back_ioworker(struct work_struct *work)
@@ -837,6 +896,19 @@ static irqreturn_t pvcalls_back_event(int irq, void *dev_id)

static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map)
{
+ struct sock_mapping *map = sock_map;
+ struct pvcalls_ioworker *iow;
+
+ if (map == NULL || map->sock == NULL || map->sock->sk == NULL ||
+ map->sock->sk->sk_user_data != map)
+ return IRQ_HANDLED;
+
+ iow = &map->ioworker;
+
+ atomic_inc(&map->write);
+ atomic_inc(&map->io);
+ queue_work_on(iow->cpu, iow->wq, &iow->register_work);
+
return IRQ_HANDLED;
}

--
1.9.1

2017-06-02 19:32:43

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 16/18] xen/pvcalls: implement read

When an active socket has data available, increment the io and read
counters, and schedule the ioworker.

Implement the read function by reading from the socket, writing the data
to the data ring.

Set in_error on error.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 85 insertions(+)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index 0283d49..e7d2b85 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -101,6 +101,81 @@ static int pvcalls_back_release_active(struct xenbus_device *dev,

static void pvcalls_conn_back_read(unsigned long opaque)
{
+ struct sock_mapping *map = (struct sock_mapping *)opaque;
+ struct msghdr msg;
+ struct kvec vec[2];
+ RING_IDX cons, prod, size, wanted, array_size, masked_prod, masked_cons;
+ int32_t error;
+ struct pvcalls_data_intf *intf = map->ring;
+ struct pvcalls_data *data = &map->data;
+ unsigned long flags;
+ int ret;
+
+ array_size = XEN_FLEX_RING_SIZE(map->ring_order);
+ cons = intf->in_cons;
+ prod = intf->in_prod;
+ error = intf->in_error;
+ /* read the indexes first, then deal with the data */
+ virt_mb();
+
+ if (error)
+ return;
+
+ size = pvcalls_queued(prod, cons, array_size);
+ if (size >= array_size)
+ return;
+ spin_lock_irqsave(&map->sock->sk->sk_receive_queue.lock, flags);
+ if (skb_queue_empty(&map->sock->sk->sk_receive_queue)) {
+ atomic_set(&map->read, 0);
+ spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock,
+ flags);
+ return;
+ }
+ spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock, flags);
+ wanted = array_size - size;
+ masked_prod = pvcalls_mask(prod, array_size);
+ masked_cons = pvcalls_mask(cons, array_size);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iter.type = ITER_KVEC|WRITE;
+ msg.msg_iter.count = wanted;
+ if (masked_prod < masked_cons) {
+ vec[0].iov_base = data->in + masked_prod;
+ vec[0].iov_len = wanted;
+ msg.msg_iter.kvec = vec;
+ msg.msg_iter.nr_segs = 1;
+ } else {
+ vec[0].iov_base = data->in + masked_prod;
+ vec[0].iov_len = array_size - masked_prod;
+ vec[1].iov_base = data->in;
+ vec[1].iov_len = wanted - vec[0].iov_len;
+ msg.msg_iter.kvec = vec;
+ msg.msg_iter.nr_segs = 2;
+ }
+
+ atomic_set(&map->read, 0);
+ ret = inet_recvmsg(map->sock, &msg, wanted, MSG_DONTWAIT);
+ WARN_ON(ret > 0 && ret > wanted);
+ if (ret == -EAGAIN) /* shouldn't happen */
+ return;
+ if (!ret)
+ ret = -ENOTCONN;
+ spin_lock_irqsave(&map->sock->sk->sk_receive_queue.lock, flags);
+ if (ret > 0 && !skb_queue_empty(&map->sock->sk->sk_receive_queue))
+ atomic_inc(&map->read);
+ spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock, flags);
+
+ /* write the data, then modify the indexes */
+ virt_wmb();
+ if (ret < 0)
+ intf->in_error = ret;
+ else
+ intf->in_prod = prod + ret;
+ /* update the indexes, then notify the other end */
+ virt_wmb();
+ notify_remote_via_irq(map->irq);
+
+ return;
}

static int pvcalls_conn_back_write(struct sock_mapping *map)
@@ -173,6 +248,16 @@ static void pvcalls_sk_state_change(struct sock *sock)

static void pvcalls_sk_data_ready(struct sock *sock)
{
+ struct sock_mapping *map = sock->sk_user_data;
+ struct pvcalls_ioworker *iow;
+
+ if (map == NULL)
+ return;
+
+ iow = &map->ioworker;
+ atomic_inc(&map->read);
+ atomic_inc(&map->io);
+ queue_work_on(iow->cpu, iow->wq, &iow->register_work);
}

static struct sock_mapping *pvcalls_new_active_socket(
--
1.9.1

2017-06-02 19:33:07

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 15/18] xen/pvcalls: implement the ioworker functions

We have one ioworker per socket. Each ioworker goes through the list of
outstanding read/write requests. Once all requests have been dealt with,
it returns.

We use one atomic counter per socket for "read" operations and one
for "write" operations to keep track of the reads/writes to do.

We also use one atomic counter ("io") per ioworker to keep track of how
many outstanding requests we have in total assigned to the ioworker. The
ioworker finishes when there are none.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 27 +++++++++++++++++++++++++++
1 file changed, 27 insertions(+)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index 6afe7a0..0283d49 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -99,8 +99,35 @@ static int pvcalls_back_release_active(struct xenbus_device *dev,
struct pvcalls_fedata *priv,
struct sock_mapping *map);

+static void pvcalls_conn_back_read(unsigned long opaque)
+{
+}
+
+static int pvcalls_conn_back_write(struct sock_mapping *map)
+{
+ return 0;
+}
+
static void pvcalls_back_ioworker(struct work_struct *work)
{
+ struct pvcalls_ioworker *ioworker = container_of(work,
+ struct pvcalls_ioworker, register_work);
+ struct sock_mapping *map = container_of(ioworker, struct sock_mapping,
+ ioworker);
+
+ while (atomic_read(&map->io) > 0) {
+ if (atomic_read(&map->release) > 0) {
+ atomic_set(&map->release, 0);
+ return;
+ }
+
+ if (atomic_read(&map->read) > 0)
+ pvcalls_conn_back_read((unsigned long)map);
+ if (atomic_read(&map->write) > 0)
+ pvcalls_conn_back_write(map);
+
+ atomic_dec(&map->io);
+ }
}

static int pvcalls_back_socket(struct xenbus_device *dev,
--
1.9.1

2017-06-02 19:33:24

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 14/18] xen/pvcalls: disconnect and module_exit

Implement backend_disconnect. Call pvcalls_back_release_active on active
sockets and pvcalls_back_release_passive on passive sockets.

Implement module_exit by calling backend_disconnect on frontend
connections.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 49 ++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 49 insertions(+)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index b541887..6afe7a0 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -800,6 +800,38 @@ static int backend_connect(struct xenbus_device *dev)

static int backend_disconnect(struct xenbus_device *dev)
{
+ struct pvcalls_fedata *priv;
+ struct sock_mapping *map, *n;
+ struct sockpass_mapping *mappass;
+ struct radix_tree_iter iter;
+ void **slot;
+
+
+ priv = dev_get_drvdata(&dev->dev);
+
+ list_for_each_entry_safe(map, n, &priv->socket_mappings, list) {
+ pvcalls_back_release_active(dev, priv, map);
+ }
+
+ radix_tree_for_each_slot(slot, &priv->socketpass_mappings, &iter, 0) {
+ mappass = radix_tree_deref_slot(slot);
+ if (!mappass || radix_tree_exception(mappass)) {
+ if (radix_tree_deref_retry(mappass)) {
+ slot = radix_tree_iter_retry(&iter);
+ continue;
+ }
+ } else
+ pvcalls_back_release_passive(dev, priv, mappass);
+ }
+
+ xenbus_unmap_ring_vfree(dev, (void *)priv->sring);
+ unbind_from_irqhandler(priv->irq, dev);
+
+ list_del(&priv->list);
+ destroy_workqueue(priv->wq);
+ kfree(priv);
+ dev_set_drvdata(&dev->dev, NULL);
+
return 0;
}

@@ -993,3 +1025,20 @@ static int __init pvcalls_back_init(void)
return 0;
}
module_init(pvcalls_back_init);
+
+static void __exit pvcalls_back_fin(void)
+{
+ struct pvcalls_fedata *priv, *npriv;
+
+ down(&pvcalls_back_global.frontends_lock);
+ list_for_each_entry_safe(priv, npriv, &pvcalls_back_global.frontends,
+ list) {
+ backend_disconnect(priv->dev);
+ }
+ up(&pvcalls_back_global.frontends_lock);
+
+ xenbus_unregister_driver(&pvcalls_back_driver);
+ memset(&pvcalls_back_global, 0, sizeof(pvcalls_back_global));
+}
+
+module_exit(pvcalls_back_fin);
--
1.9.1

2017-06-02 19:33:41

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 13/18] xen/pvcalls: implement release command

Release both active and passive sockets. For active sockets, make sure
to avoid possible conflicts with the ioworker reading/writing to those
sockets concurrently. Set map->release to let the ioworker know
atomically that the socket will be released soon, then wait until the
ioworker finishes (flush_work).

Unmap indexes pages and data rings.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 72 +++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 71 insertions(+), 1 deletion(-)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index 82f350d..b541887 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -265,13 +265,83 @@ static int pvcalls_back_release_active(struct xenbus_device *dev,
struct pvcalls_fedata *priv,
struct sock_mapping *map)
{
+ disable_irq(map->irq);
+ if (map->sock->sk != NULL) {
+ write_lock_bh(&map->sock->sk->sk_callback_lock);
+ map->sock->sk->sk_user_data = NULL;
+ map->sock->sk->sk_data_ready = map->saved_data_ready;
+ write_unlock_bh(&map->sock->sk->sk_callback_lock);
+ }
+
+ atomic_set(&map->release, 1);
+ flush_work(&map->ioworker.register_work);
+
+ down(&priv->socket_lock);
+ list_del(&map->list);
+ up(&priv->socket_lock);
+
+ xenbus_unmap_ring_vfree(dev, (void *)map->bytes);
+ xenbus_unmap_ring_vfree(dev, (void *)map->ring);
+ unbind_from_irqhandler(map->irq, map);
+
+ sock_release(map->sock);
+ kfree(map);
+
+ return 0;
+}
+
+static int pvcalls_back_release_passive(struct xenbus_device *dev,
+ struct pvcalls_fedata *priv,
+ struct sockpass_mapping *mappass)
+{
+ if (mappass->sock->sk != NULL) {
+ write_lock_bh(&mappass->sock->sk->sk_callback_lock);
+ mappass->sock->sk->sk_user_data = NULL;
+ mappass->sock->sk->sk_data_ready = mappass->saved_data_ready;
+ write_unlock_bh(&mappass->sock->sk->sk_callback_lock);
+ }
+ down(&priv->socket_lock);
+ radix_tree_delete(&priv->socketpass_mappings, mappass->id);
+ sock_release(mappass->sock);
+ flush_workqueue(mappass->wq);
+ destroy_workqueue(mappass->wq);
+ kfree(mappass);
+ up(&priv->socket_lock);
+
return 0;
}

static int pvcalls_back_release(struct xenbus_device *dev,
struct xen_pvcalls_request *req)
{
- return 0;
+ struct pvcalls_fedata *priv;
+ struct sock_mapping *map, *n;
+ struct sockpass_mapping *mappass;
+ int ret = 0;
+ struct xen_pvcalls_response *rsp;
+
+ priv = dev_get_drvdata(&dev->dev);
+
+ list_for_each_entry_safe(map, n, &priv->socket_mappings, list) {
+ if (map->id == req->u.release.id) {
+ ret = pvcalls_back_release_active(dev, priv, map);
+ goto out;
+ }
+ }
+ mappass = radix_tree_lookup(&priv->socketpass_mappings,
+ req->u.release.id);
+ if (mappass != NULL) {
+ ret = pvcalls_back_release_passive(dev, priv, mappass);
+ goto out;
+ }
+
+out:
+ rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
+ rsp->req_id = req->req_id;
+ rsp->u.release.id = req->u.release.id;
+ rsp->cmd = req->cmd;
+ rsp->ret = ret;
+ return ret;
}

static void __pvcalls_back_accept(struct work_struct *work)
--
1.9.1

2017-06-02 19:33:54

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 12/18] xen/pvcalls: implement poll command

Implement poll on passive sockets by requesting a delayed response with
mappass->reqcopy, and reply back when there is data on the passive
socket.

Poll on active socket is unimplemented as by the spec, as the frontend
should just wait for events and check the indexes on the indexes page.

Only support one outstanding poll (or accept) request for every passive
socket at any given time.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 75 ++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 73 insertions(+), 2 deletions(-)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index f1173f4..82f350d 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -344,11 +344,33 @@ static void __pvcalls_back_accept(struct work_struct *work)
static void pvcalls_pass_sk_data_ready(struct sock *sock)
{
struct sockpass_mapping *mappass = sock->sk_user_data;
+ struct pvcalls_fedata *priv;
+ struct xen_pvcalls_response *rsp;
+ unsigned long flags;
+ int notify;

if (mappass == NULL)
return;

- queue_work(mappass->wq, &mappass->register_work);
+ priv = mappass->priv;
+ spin_lock_irqsave(&mappass->copy_lock, flags);
+ if (mappass->reqcopy.cmd == PVCALLS_POLL) {
+ rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
+ rsp->req_id = mappass->reqcopy.req_id;
+ rsp->u.poll.id = mappass->reqcopy.u.poll.id;
+ rsp->cmd = mappass->reqcopy.cmd;
+ rsp->ret = 0;
+
+ mappass->reqcopy.cmd = 0;
+ spin_unlock_irqrestore(&mappass->copy_lock, flags);
+
+ RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
+ if (notify)
+ notify_remote_via_irq(mappass->priv->irq);
+ } else {
+ spin_unlock_irqrestore(&mappass->copy_lock, flags);
+ queue_work(mappass->wq, &mappass->register_work);
+ }
}

static int pvcalls_back_bind(struct xenbus_device *dev,
@@ -493,7 +515,56 @@ static int pvcalls_back_accept(struct xenbus_device *dev,
static int pvcalls_back_poll(struct xenbus_device *dev,
struct xen_pvcalls_request *req)
{
- return 0;
+ struct pvcalls_fedata *priv;
+ struct sockpass_mapping *mappass;
+ struct xen_pvcalls_response *rsp;
+ struct inet_connection_sock *icsk;
+ struct request_sock_queue *queue;
+ unsigned long flags;
+ int ret;
+ bool data;
+
+ priv = dev_get_drvdata(&dev->dev);
+
+ mappass = radix_tree_lookup(&priv->socketpass_mappings, req->u.poll.id);
+ if (mappass == NULL)
+ return -EINVAL;
+
+ /*
+ * Limitation of the current implementation: only support one
+ * concurrent accept or poll call on one socket.
+ */
+ spin_lock_irqsave(&mappass->copy_lock, flags);
+ if (mappass->reqcopy.cmd != 0) {
+ ret = -EINTR;
+ goto out;
+ }
+
+ mappass->reqcopy = *req;
+ icsk = inet_csk(mappass->sock->sk);
+ queue = &icsk->icsk_accept_queue;
+ spin_lock(&queue->rskq_lock);
+ data = queue->rskq_accept_head != NULL;
+ spin_unlock(&queue->rskq_lock);
+ if (data) {
+ mappass->reqcopy.cmd = 0;
+ ret = 0;
+ goto out;
+ }
+ spin_unlock_irqrestore(&mappass->copy_lock, flags);
+
+ /* Tell the caller we don't need to send back a notification yet */
+ return -1;
+
+out:
+ spin_unlock_irqrestore(&mappass->copy_lock, flags);
+
+ rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
+ rsp->req_id = req->req_id;
+ rsp->cmd = req->cmd;
+ rsp->u.poll.id = req->u.poll.id;
+ rsp->ret = ret;
+ return ret;
}

static int pvcalls_back_handle_cmd(struct xenbus_device *dev,
--
1.9.1

2017-06-02 19:32:01

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 02/18] xen/pvcalls: introduce the pvcalls xenbus backend

Introduce a xenbus backend for the pvcalls protocol, as defined by
https://xenbits.xen.org/docs/unstable/misc/pvcalls.html.

This patch only adds the stubs, the code will be added by the following
patches.

Signed-off-by: Stefano Stabellini <[email protected]>
Reviewed-by: Boris Ostrovsky <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 61 insertions(+)
create mode 100644 drivers/xen/pvcalls-back.c

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
new file mode 100644
index 0000000..f3d0daa
--- /dev/null
+++ b/drivers/xen/pvcalls-back.c
@@ -0,0 +1,61 @@
+/*
+ * (c) 2017 Stefano Stabellini <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/kthread.h>
+#include <linux/list.h>
+#include <linux/radix-tree.h>
+#include <linux/module.h>
+#include <linux/semaphore.h>
+#include <linux/wait.h>
+
+#include <xen/events.h>
+#include <xen/grant_table.h>
+#include <xen/xen.h>
+#include <xen/xenbus.h>
+#include <xen/interface/io/pvcalls.h>
+
+static int pvcalls_back_probe(struct xenbus_device *dev,
+ const struct xenbus_device_id *id)
+{
+ return 0;
+}
+
+static void pvcalls_back_changed(struct xenbus_device *dev,
+ enum xenbus_state frontend_state)
+{
+}
+
+static int pvcalls_back_remove(struct xenbus_device *dev)
+{
+ return 0;
+}
+
+static int pvcalls_back_uevent(struct xenbus_device *xdev,
+ struct kobj_uevent_env *env)
+{
+ return 0;
+}
+
+static const struct xenbus_device_id pvcalls_back_ids[] = {
+ { "pvcalls" },
+ { "" }
+};
+
+static struct xenbus_driver pvcalls_back_driver = {
+ .ids = pvcalls_back_ids,
+ .probe = pvcalls_back_probe,
+ .remove = pvcalls_back_remove,
+ .uevent = pvcalls_back_uevent,
+ .otherend_changed = pvcalls_back_changed,
+};
--
1.9.1

2017-06-02 19:34:26

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 10/18] xen/pvcalls: implement listen command

Call inet_listen to implement the listen command.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 21 ++++++++++++++++++++-
1 file changed, 20 insertions(+), 1 deletion(-)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index 4a0cfa3..a75586e 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -355,7 +355,26 @@ static int pvcalls_back_bind(struct xenbus_device *dev,
static int pvcalls_back_listen(struct xenbus_device *dev,
struct xen_pvcalls_request *req)
{
- return 0;
+ struct pvcalls_fedata *priv;
+ int ret = -EINVAL;
+ struct sockpass_mapping *map;
+ struct xen_pvcalls_response *rsp;
+
+ priv = dev_get_drvdata(&dev->dev);
+
+ map = radix_tree_lookup(&priv->socketpass_mappings, req->u.listen.id);
+ if (map == NULL)
+ goto out;
+
+ ret = inet_listen(map->sock, req->u.listen.backlog);
+
+out:
+ rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
+ rsp->req_id = req->req_id;
+ rsp->cmd = req->cmd;
+ rsp->u.listen.id = req->u.listen.id;
+ rsp->ret = ret;
+ return ret;
}

static int pvcalls_back_accept(struct xenbus_device *dev,
--
1.9.1

2017-06-02 19:34:45

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 09/18] xen/pvcalls: implement bind command

Allocate a socket. Track the allocated passive sockets with a new data
structure named sockpass_mapping. It contains an unbound workqueue to
schedule delayed work for the accept and poll commands. It also has a
reqcopy field to be used to store a copy of a request for delayed work.
Reads/writes to it are protected by a lock (the "copy_lock" spinlock).
Initialize the workqueue in pvcalls_back_bind.

Implement the bind command with inet_bind.

The pass_sk_data_ready event handler will be added later.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 87 +++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 86 insertions(+), 1 deletion(-)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index 3eb84ef..4a0cfa3 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -81,6 +81,18 @@ struct sock_mapping {
struct pvcalls_ioworker ioworker;
};

+struct sockpass_mapping {
+ struct list_head list;
+ struct pvcalls_fedata *priv;
+ struct socket *sock;
+ uint64_t id;
+ struct xen_pvcalls_request reqcopy;
+ spinlock_t copy_lock;
+ struct workqueue_struct *wq;
+ struct work_struct register_work;
+ void (*saved_data_ready)(struct sock *sk);
+};
+
static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map);
static int pvcalls_back_release_active(struct xenbus_device *dev,
struct pvcalls_fedata *priv,
@@ -261,10 +273,83 @@ static int pvcalls_back_release(struct xenbus_device *dev,
return 0;
}

+static void __pvcalls_back_accept(struct work_struct *work)
+{
+}
+
+static void pvcalls_pass_sk_data_ready(struct sock *sock)
+{
+}
+
static int pvcalls_back_bind(struct xenbus_device *dev,
struct xen_pvcalls_request *req)
{
- return 0;
+ struct pvcalls_fedata *priv;
+ int ret, err;
+ struct socket *sock;
+ struct sockpass_mapping *map = NULL;
+ struct xen_pvcalls_response *rsp;
+
+ priv = dev_get_drvdata(&dev->dev);
+
+ map = kzalloc(sizeof(*map), GFP_KERNEL);
+ if (map == NULL) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ INIT_WORK(&map->register_work, __pvcalls_back_accept);
+ spin_lock_init(&map->copy_lock);
+ map->wq = alloc_workqueue("pvcalls_wq", WQ_UNBOUND, 1);
+ if (!map->wq) {
+ ret = -ENOMEM;
+ kfree(map);
+ goto out;
+ }
+
+ ret = sock_create(AF_INET, SOCK_STREAM, 0, &sock);
+ if (ret < 0) {
+ destroy_workqueue(map->wq);
+ kfree(map);
+ goto out;
+ }
+
+ ret = inet_bind(sock, (struct sockaddr *)&req->u.bind.addr,
+ req->u.bind.len);
+ if (ret < 0) {
+ destroy_workqueue(map->wq);
+ kfree(map);
+ goto out;
+ }
+
+ map->priv = priv;
+ map->sock = sock;
+ map->id = req->u.bind.id;
+
+ down(&priv->socket_lock);
+ err = radix_tree_insert(&priv->socketpass_mappings, map->id,
+ map);
+ up(&priv->socket_lock);
+ if (err) {
+ ret = err;
+ destroy_workqueue(map->wq);
+ kfree(map);
+ goto out;
+ }
+
+ write_lock_bh(&sock->sk->sk_callback_lock);
+ map->saved_data_ready = sock->sk->sk_data_ready;
+ sock->sk->sk_user_data = map;
+ sock->sk->sk_data_ready = pvcalls_pass_sk_data_ready;
+ write_unlock_bh(&sock->sk->sk_callback_lock);
+
+out:
+ rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
+ rsp->req_id = req->req_id;
+ rsp->cmd = req->cmd;
+ rsp->u.bind.id = req->u.bind.id;
+ rsp->ret = ret;
+ return ret;
}

static int pvcalls_back_listen(struct xenbus_device *dev,
--
1.9.1

2017-06-02 19:31:59

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 01/18] xen: introduce the pvcalls interface header

Introduce the C header file which defines the PV Calls interface. It is
imported from xen/include/public/io/pvcalls.h.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
CC: [email protected]
---
include/xen/interface/io/pvcalls.h | 121 +++++++++++++++++++++++++++++++++++++
include/xen/interface/io/ring.h | 2 +
2 files changed, 123 insertions(+)
create mode 100644 include/xen/interface/io/pvcalls.h

diff --git a/include/xen/interface/io/pvcalls.h b/include/xen/interface/io/pvcalls.h
new file mode 100644
index 0000000..ccf97b8
--- /dev/null
+++ b/include/xen/interface/io/pvcalls.h
@@ -0,0 +1,121 @@
+#ifndef __XEN_PUBLIC_IO_XEN_PVCALLS_H__
+#define __XEN_PUBLIC_IO_XEN_PVCALLS_H__
+
+#include <linux/net.h>
+#include <xen/interface/io/ring.h>
+#include <xen/interface/grant_table.h>
+
+/* "1" means socket, connect, release, bind, listen, accept and poll */
+#define XENBUS_FUNCTIONS_CALLS "1"
+
+/*
+ * See docs/misc/pvcalls.markdown in xen.git for the full specification:
+ * https://xenbits.xen.org/docs/unstable/misc/pvcalls.html
+ */
+struct pvcalls_data_intf {
+ RING_IDX in_cons, in_prod, in_error;
+
+ uint8_t pad1[52];
+
+ RING_IDX out_cons, out_prod, out_error;
+
+ uint8_t pad2[52];
+
+ RING_IDX ring_order;
+ grant_ref_t ref[];
+};
+DEFINE_XEN_FLEX_RING(pvcalls);
+
+#define PVCALLS_SOCKET 0
+#define PVCALLS_CONNECT 1
+#define PVCALLS_RELEASE 2
+#define PVCALLS_BIND 3
+#define PVCALLS_LISTEN 4
+#define PVCALLS_ACCEPT 5
+#define PVCALLS_POLL 6
+
+struct xen_pvcalls_request {
+ uint32_t req_id; /* private to guest, echoed in response */
+ uint32_t cmd; /* command to execute */
+ union {
+ struct xen_pvcalls_socket {
+ uint64_t id;
+ uint32_t domain;
+ uint32_t type;
+ uint32_t protocol;
+ } socket;
+ struct xen_pvcalls_connect {
+ uint64_t id;
+ uint8_t addr[28];
+ uint32_t len;
+ uint32_t flags;
+ grant_ref_t ref;
+ uint32_t evtchn;
+ } connect;
+ struct xen_pvcalls_release {
+ uint64_t id;
+ uint8_t reuse;
+ } release;
+ struct xen_pvcalls_bind {
+ uint64_t id;
+ uint8_t addr[28];
+ uint32_t len;
+ } bind;
+ struct xen_pvcalls_listen {
+ uint64_t id;
+ uint32_t backlog;
+ } listen;
+ struct xen_pvcalls_accept {
+ uint64_t id;
+ uint64_t id_new;
+ grant_ref_t ref;
+ uint32_t evtchn;
+ } accept;
+ struct xen_pvcalls_poll {
+ uint64_t id;
+ } poll;
+ /* dummy member to force sizeof(struct xen_pvcalls_request)
+ * to match across archs */
+ struct xen_pvcalls_dummy {
+ uint8_t dummy[56];
+ } dummy;
+ } u;
+};
+
+struct xen_pvcalls_response {
+ uint32_t req_id;
+ uint32_t cmd;
+ int32_t ret;
+ uint32_t pad;
+ union {
+ struct _xen_pvcalls_socket {
+ uint64_t id;
+ } socket;
+ struct _xen_pvcalls_connect {
+ uint64_t id;
+ } connect;
+ struct _xen_pvcalls_release {
+ uint64_t id;
+ } release;
+ struct _xen_pvcalls_bind {
+ uint64_t id;
+ } bind;
+ struct _xen_pvcalls_listen {
+ uint64_t id;
+ } listen;
+ struct _xen_pvcalls_accept {
+ uint64_t id;
+ } accept;
+ struct _xen_pvcalls_poll {
+ uint64_t id;
+ } poll;
+ struct _xen_pvcalls_dummy {
+ uint8_t dummy[8];
+ } dummy;
+ } u;
+};
+
+DEFINE_RING_TYPES(xen_pvcalls, struct xen_pvcalls_request,
+ struct xen_pvcalls_response);
+
+#endif
diff --git a/include/xen/interface/io/ring.h b/include/xen/interface/io/ring.h
index c794568..e547088 100644
--- a/include/xen/interface/io/ring.h
+++ b/include/xen/interface/io/ring.h
@@ -9,6 +9,8 @@
#ifndef __XEN_PUBLIC_IO_RING_H__
#define __XEN_PUBLIC_IO_RING_H__

+#include <xen/interface/grant_table.h>
+
typedef unsigned int RING_IDX;

/* Round a 32-bit unsigned constant down to the nearest power of two. */
--
1.9.1

2017-06-02 19:34:59

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 07/18] xen/pvcalls: implement socket command

Just reply with success to the other end for now. Delay the allocation
of the actual socket to bind and/or connect.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 29 ++++++++++++++++++++++++++++-
1 file changed, 28 insertions(+), 1 deletion(-)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index 6057533..1f2bb26 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -12,12 +12,17 @@
* GNU General Public License for more details.
*/

+#include <linux/inet.h>
#include <linux/kthread.h>
#include <linux/list.h>
#include <linux/radix-tree.h>
#include <linux/module.h>
#include <linux/semaphore.h>
#include <linux/wait.h>
+#include <net/sock.h>
+#include <net/inet_common.h>
+#include <net/inet_connection_sock.h>
+#include <net/request_sock.h>

#include <xen/events.h>
#include <xen/grant_table.h>
@@ -54,7 +59,29 @@ struct pvcalls_fedata {
static int pvcalls_back_socket(struct xenbus_device *dev,
struct xen_pvcalls_request *req)
{
- return 0;
+ struct pvcalls_fedata *priv;
+ int ret;
+ struct xen_pvcalls_response *rsp;
+
+ priv = dev_get_drvdata(&dev->dev);
+
+ if (req->u.socket.domain != AF_INET ||
+ req->u.socket.type != SOCK_STREAM ||
+ (req->u.socket.protocol != IPPROTO_IP &&
+ req->u.socket.protocol != AF_INET))
+ ret = -EAFNOSUPPORT;
+ else
+ ret = 0;
+
+ /* leave the actual socket allocation for later */
+
+ rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
+ rsp->req_id = req->req_id;
+ rsp->cmd = req->cmd;
+ rsp->u.socket.id = req->u.socket.id;
+ rsp->ret = ret;
+
+ return ret;
}

static int pvcalls_back_connect(struct xenbus_device *dev,
--
1.9.1

2017-06-02 19:35:21

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 06/18] xen/pvcalls: handle commands from the frontend

When the other end notifies us that there are commands to be read
(pvcalls_back_event), wake up the backend thread to parse the command.

The command ring works like most other Xen rings, so use the usual
ring macros to read and write to it. The functions implementing the
commands are empty stubs for now.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 119 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 119 insertions(+)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index bfea25f..6057533 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -51,12 +51,131 @@ struct pvcalls_fedata {
struct work_struct register_work;
};

+static int pvcalls_back_socket(struct xenbus_device *dev,
+ struct xen_pvcalls_request *req)
+{
+ return 0;
+}
+
+static int pvcalls_back_connect(struct xenbus_device *dev,
+ struct xen_pvcalls_request *req)
+{
+ return 0;
+}
+
+static int pvcalls_back_release(struct xenbus_device *dev,
+ struct xen_pvcalls_request *req)
+{
+ return 0;
+}
+
+static int pvcalls_back_bind(struct xenbus_device *dev,
+ struct xen_pvcalls_request *req)
+{
+ return 0;
+}
+
+static int pvcalls_back_listen(struct xenbus_device *dev,
+ struct xen_pvcalls_request *req)
+{
+ return 0;
+}
+
+static int pvcalls_back_accept(struct xenbus_device *dev,
+ struct xen_pvcalls_request *req)
+{
+ return 0;
+}
+
+static int pvcalls_back_poll(struct xenbus_device *dev,
+ struct xen_pvcalls_request *req)
+{
+ return 0;
+}
+
+static int pvcalls_back_handle_cmd(struct xenbus_device *dev,
+ struct xen_pvcalls_request *req)
+{
+ int ret = 0;
+
+ switch (req->cmd) {
+ case PVCALLS_SOCKET:
+ ret = pvcalls_back_socket(dev, req);
+ break;
+ case PVCALLS_CONNECT:
+ ret = pvcalls_back_connect(dev, req);
+ break;
+ case PVCALLS_RELEASE:
+ ret = pvcalls_back_release(dev, req);
+ break;
+ case PVCALLS_BIND:
+ ret = pvcalls_back_bind(dev, req);
+ break;
+ case PVCALLS_LISTEN:
+ ret = pvcalls_back_listen(dev, req);
+ break;
+ case PVCALLS_ACCEPT:
+ ret = pvcalls_back_accept(dev, req);
+ break;
+ case PVCALLS_POLL:
+ ret = pvcalls_back_poll(dev, req);
+ break;
+ default:
+ ret = -ENOTSUPP;
+ break;
+ }
+ return ret;
+}
+
static void pvcalls_back_work(struct work_struct *work)
{
+ struct pvcalls_fedata *priv = container_of(work,
+ struct pvcalls_fedata, register_work);
+ int notify, notify_all = 0, more = 1;
+ struct xen_pvcalls_request req;
+ struct xenbus_device *dev = priv->dev;
+
+ while (more) {
+ while (RING_HAS_UNCONSUMED_REQUESTS(&priv->ring)) {
+ RING_COPY_REQUEST(&priv->ring,
+ priv->ring.req_cons++,
+ &req);
+
+ if (!pvcalls_back_handle_cmd(dev, &req)) {
+ RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(
+ &priv->ring, notify);
+ notify_all += notify;
+ }
+ }
+
+ if (notify_all)
+ notify_remote_via_irq(priv->irq);
+
+ RING_FINAL_CHECK_FOR_REQUESTS(&priv->ring, more);
+ }
}

static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
{
+ struct xenbus_device *dev = dev_id;
+ struct pvcalls_fedata *priv = NULL;
+
+ if (dev == NULL)
+ return IRQ_HANDLED;
+
+ priv = dev_get_drvdata(&dev->dev);
+ if (priv == NULL)
+ return IRQ_HANDLED;
+
+ /*
+ * TODO: a small theoretical race exists if we try to queue work
+ * after pvcalls_back_work checked for final requests and before
+ * it returns. The queuing will fail, and pvcalls_back_work
+ * won't do the work because it is about to return. In that
+ * case, we lose the notification.
+ */
+ queue_work(priv->wq, &priv->register_work);
+
return IRQ_HANDLED;
}

--
1.9.1

2017-06-02 19:35:34

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 05/18] xen/pvcalls: connect to a frontend

Introduce a per-frontend data structure named pvcalls_fedata. It
contains pointers to the command ring, its event channel, a list of
active sockets and a tree of passive sockets (passing sockets need to be
looked up from the id on listen, accept and poll commands, while active
sockets only on release).

It also has an unbound workqueue to schedule the work of parsing and
executing commands on the command ring. socket_lock protects the two
lists. In pvcalls_back_global, keep a list of connected frontends.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 92 ++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 92 insertions(+)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index 7bce750..bfea25f 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -33,9 +33,101 @@ struct pvcalls_back_global {
struct semaphore frontends_lock;
} pvcalls_back_global;

+/*
+ * Per-frontend data structure. It contains pointers to the command
+ * ring, its event channel, a list of active sockets and a tree of
+ * passive sockets.
+ */
+struct pvcalls_fedata {
+ struct list_head list;
+ struct xenbus_device *dev;
+ struct xen_pvcalls_sring *sring;
+ struct xen_pvcalls_back_ring ring;
+ int irq;
+ struct list_head socket_mappings;
+ struct radix_tree_root socketpass_mappings;
+ struct semaphore socket_lock;
+ struct workqueue_struct *wq;
+ struct work_struct register_work;
+};
+
+static void pvcalls_back_work(struct work_struct *work)
+{
+}
+
+static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
+{
+ return IRQ_HANDLED;
+}
+
static int backend_connect(struct xenbus_device *dev)
{
+ int err, evtchn;
+ grant_ref_t ring_ref;
+ struct pvcalls_fedata *priv = NULL;
+
+ priv = kzalloc(sizeof(struct pvcalls_fedata), GFP_KERNEL);
+ if (!priv)
+ return -ENOMEM;
+
+ err = xenbus_scanf(XBT_NIL, dev->otherend, "port", "%u",
+ &evtchn);
+ if (err != 1) {
+ err = -EINVAL;
+ xenbus_dev_fatal(dev, err, "reading %s/event-channel",
+ dev->otherend);
+ goto error;
+ }
+
+ err = xenbus_scanf(XBT_NIL, dev->otherend, "ring-ref", "%u", &ring_ref);
+ if (err != 1) {
+ err = -EINVAL;
+ xenbus_dev_fatal(dev, err, "reading %s/ring-ref",
+ dev->otherend);
+ goto error;
+ }
+
+ err = bind_interdomain_evtchn_to_irqhandler(dev->otherend_id, evtchn,
+ pvcalls_back_event, 0,
+ "pvcalls-backend", dev);
+ if (err < 0)
+ goto error;
+ priv->irq = err;
+
+ priv->wq = alloc_workqueue("pvcalls_back_wq", WQ_UNBOUND, 1);
+ if (!priv->wq) {
+ err = -ENOMEM;
+ goto error;
+ }
+
+ err = xenbus_map_ring_valloc(dev, &ring_ref, 1, (void**)&priv->sring);
+ if (err < 0)
+ goto error;
+
+ BACK_RING_INIT(&priv->ring, priv->sring, XEN_PAGE_SIZE * 1);
+ priv->dev = dev;
+
+ INIT_WORK(&priv->register_work, pvcalls_back_work);
+ INIT_LIST_HEAD(&priv->socket_mappings);
+ INIT_RADIX_TREE(&priv->socketpass_mappings, GFP_KERNEL);
+ sema_init(&priv->socket_lock, 1);
+ dev_set_drvdata(&dev->dev, priv);
+
+ down(&pvcalls_back_global.frontends_lock);
+ list_add_tail(&priv->list, &pvcalls_back_global.frontends);
+ up(&pvcalls_back_global.frontends_lock);
+ queue_work(priv->wq, &priv->register_work);
+
return 0;
+
+ error:
+ if (priv->sring != NULL)
+ xenbus_unmap_ring_vfree(dev, priv->sring);
+ if (priv->wq)
+ destroy_workqueue(priv->wq);
+ unbind_from_irqhandler(priv->irq, dev);
+ kfree(priv);
+ return err;
}

static int backend_disconnect(struct xenbus_device *dev)
--
1.9.1

2017-06-02 19:35:57

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 03/18] xen/pvcalls: initialize the module and register the xenbus backend

Keep a list of connected frontends. Use a semaphore to protect list
accesses.

Signed-off-by: Stefano Stabellini <[email protected]>
Reviewed-by: Boris Ostrovsky <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index f3d0daa..9044cf2 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -25,6 +25,11 @@
#include <xen/xenbus.h>
#include <xen/interface/io/pvcalls.h>

+struct pvcalls_back_global {
+ struct list_head frontends;
+ struct semaphore frontends_lock;
+} pvcalls_back_global;
+
static int pvcalls_back_probe(struct xenbus_device *dev,
const struct xenbus_device_id *id)
{
@@ -59,3 +64,20 @@ static int pvcalls_back_uevent(struct xenbus_device *xdev,
.uevent = pvcalls_back_uevent,
.otherend_changed = pvcalls_back_changed,
};
+
+static int __init pvcalls_back_init(void)
+{
+ int ret;
+
+ if (!xen_domain())
+ return -ENODEV;
+
+ ret = xenbus_register_backend(&pvcalls_back_driver);
+ if (ret < 0)
+ return ret;
+
+ sema_init(&pvcalls_back_global.frontends_lock, 1);
+ INIT_LIST_HEAD(&pvcalls_back_global.frontends);
+ return 0;
+}
+module_init(pvcalls_back_init);
--
1.9.1

2017-06-02 19:35:55

by Stefano Stabellini

[permalink] [raw]
Subject: [PATCH v3 04/18] xen/pvcalls: xenbus state handling

Introduce the code to handle xenbus state changes.

Implement the probe function for the pvcalls backend. Write the
supported versions, max-page-order and function-calls nodes to xenstore,
as required by the protocol.

Introduce stub functions for disconnecting/connecting to a frontend.

Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-back.c | 152 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 152 insertions(+)

diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
index 9044cf2..7bce750 100644
--- a/drivers/xen/pvcalls-back.c
+++ b/drivers/xen/pvcalls-back.c
@@ -25,20 +25,172 @@
#include <xen/xenbus.h>
#include <xen/interface/io/pvcalls.h>

+#define PVCALLS_VERSIONS "1"
+#define MAX_RING_ORDER XENBUS_MAX_RING_GRANT_ORDER
+
struct pvcalls_back_global {
struct list_head frontends;
struct semaphore frontends_lock;
} pvcalls_back_global;

+static int backend_connect(struct xenbus_device *dev)
+{
+ return 0;
+}
+
+static int backend_disconnect(struct xenbus_device *dev)
+{
+ return 0;
+}
+
static int pvcalls_back_probe(struct xenbus_device *dev,
const struct xenbus_device_id *id)
{
+ int err, abort;
+ struct xenbus_transaction xbt;
+
+again:
+ abort = 1;
+
+ err = xenbus_transaction_start(&xbt);
+ if (err) {
+ pr_warn("%s cannot create xenstore transaction\n", __func__);
+ return err;
+ }
+
+ err = xenbus_printf(xbt, dev->nodename, "versions", "%s",
+ PVCALLS_VERSIONS);
+ if (err) {
+ pr_warn("%s write out 'version' failed\n", __func__);
+ goto abort;
+ }
+
+ err = xenbus_printf(xbt, dev->nodename, "max-page-order", "%u",
+ MAX_RING_ORDER);
+ if (err) {
+ pr_warn("%s write out 'max-page-order' failed\n", __func__);
+ goto abort;
+ }
+
+ err = xenbus_printf(xbt, dev->nodename, "function-calls",
+ XENBUS_FUNCTIONS_CALLS);
+ if (err) {
+ pr_warn("%s write out 'function-calls' failed\n", __func__);
+ goto abort;
+ }
+
+ abort = 0;
+abort:
+ err = xenbus_transaction_end(xbt, abort);
+ if (err) {
+ if (err == -EAGAIN && !abort)
+ goto again;
+ pr_warn("%s cannot complete xenstore transaction\n", __func__);
+ return err;
+ }
+
+ xenbus_switch_state(dev, XenbusStateInitWait);
+
return 0;
}

+static void set_backend_state(struct xenbus_device *dev,
+ enum xenbus_state state)
+{
+ while (dev->state != state) {
+ switch (dev->state) {
+ case XenbusStateClosed:
+ switch (state) {
+ case XenbusStateInitWait:
+ case XenbusStateConnected:
+ xenbus_switch_state(dev, XenbusStateInitWait);
+ break;
+ case XenbusStateClosing:
+ xenbus_switch_state(dev, XenbusStateClosing);
+ break;
+ default:
+ __WARN();
+ }
+ break;
+ case XenbusStateInitWait:
+ case XenbusStateInitialised:
+ switch (state) {
+ case XenbusStateConnected:
+ backend_connect(dev);
+ xenbus_switch_state(dev, XenbusStateConnected);
+ break;
+ case XenbusStateClosing:
+ case XenbusStateClosed:
+ xenbus_switch_state(dev, XenbusStateClosing);
+ break;
+ default:
+ __WARN();
+ }
+ break;
+ case XenbusStateConnected:
+ switch (state) {
+ case XenbusStateInitWait:
+ case XenbusStateClosing:
+ case XenbusStateClosed:
+ down(&pvcalls_back_global.frontends_lock);
+ backend_disconnect(dev);
+ up(&pvcalls_back_global.frontends_lock);
+ xenbus_switch_state(dev, XenbusStateClosing);
+ break;
+ default:
+ __WARN();
+ }
+ break;
+ case XenbusStateClosing:
+ switch (state) {
+ case XenbusStateInitWait:
+ case XenbusStateConnected:
+ case XenbusStateClosed:
+ xenbus_switch_state(dev, XenbusStateClosed);
+ break;
+ default:
+ __WARN();
+ }
+ break;
+ default:
+ __WARN();
+ }
+ }
+}
+
static void pvcalls_back_changed(struct xenbus_device *dev,
enum xenbus_state frontend_state)
{
+ switch (frontend_state) {
+ case XenbusStateInitialising:
+ set_backend_state(dev, XenbusStateInitWait);
+ break;
+
+ case XenbusStateInitialised:
+ case XenbusStateConnected:
+ set_backend_state(dev, XenbusStateConnected);
+ break;
+
+ case XenbusStateClosing:
+ set_backend_state(dev, XenbusStateClosing);
+ break;
+
+ case XenbusStateClosed:
+ set_backend_state(dev, XenbusStateClosed);
+ if (xenbus_dev_is_online(dev))
+ break;
+ device_unregister(&dev->dev);
+ break;
+ case XenbusStateUnknown:
+ set_backend_state(dev, XenbusStateClosed);
+ device_unregister(&dev->dev);
+ break;
+
+ default:
+ xenbus_dev_fatal(dev, -EINVAL, "saw state %d at frontend",
+ frontend_state);
+ break;
+ }
}

static int pvcalls_back_remove(struct xenbus_device *dev)
--
1.9.1

2017-06-12 21:18:57

by Boris Ostrovsky

[permalink] [raw]
Subject: Re: [PATCH v3 01/18] xen: introduce the pvcalls interface header

On 06/02/2017 03:31 PM, Stefano Stabellini wrote:
> Introduce the C header file which defines the PV Calls interface. It is
> imported from xen/include/public/io/pvcalls.h.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> CC: [email protected]

Reviewed-by: Boris Ostrovsky <[email protected]>

2017-06-12 21:19:46

by Boris Ostrovsky

[permalink] [raw]
Subject: Re: [PATCH v3 04/18] xen/pvcalls: xenbus state handling

On 06/02/2017 03:31 PM, Stefano Stabellini wrote:
> Introduce the code to handle xenbus state changes.
>
> Implement the probe function for the pvcalls backend. Write the
> supported versions, max-page-order and function-calls nodes to xenstore,
> as required by the protocol.
>
> Introduce stub functions for disconnecting/connecting to a frontend.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]


Reviewed-by: Boris Ostrovsky <[email protected]>

2017-06-12 21:27:45

by Boris Ostrovsky

[permalink] [raw]
Subject: Re: [PATCH v3 05/18] xen/pvcalls: connect to a frontend

On 06/02/2017 03:31 PM, Stefano Stabellini wrote:
> Introduce a per-frontend data structure named pvcalls_fedata. It
> contains pointers to the command ring, its event channel, a list of
> active sockets and a tree of passive sockets (passing sockets need to be
> looked up from the id on listen, accept and poll commands, while active
> sockets only on release).
>
> It also has an unbound workqueue to schedule the work of parsing and
> executing commands on the command ring. socket_lock protects the two
> lists. In pvcalls_back_global, keep a list of connected frontends.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]

Reviewed-by: Boris Ostrovsky <[email protected]>

2017-06-12 22:03:59

by Boris Ostrovsky

[permalink] [raw]
Subject: Re: [PATCH v3 06/18] xen/pvcalls: handle commands from the frontend


> +
> static void pvcalls_back_work(struct work_struct *work)
> {
> + struct pvcalls_fedata *priv = container_of(work,
> + struct pvcalls_fedata, register_work);
> + int notify, notify_all = 0, more = 1;
> + struct xen_pvcalls_request req;
> + struct xenbus_device *dev = priv->dev;
> +
> + while (more) {
> + while (RING_HAS_UNCONSUMED_REQUESTS(&priv->ring)) {
> + RING_COPY_REQUEST(&priv->ring,
> + priv->ring.req_cons++,
> + &req);
> +
> + if (!pvcalls_back_handle_cmd(dev, &req)) {
> + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(
> + &priv->ring, notify);
> + notify_all += notify;
> + }
> + }
> +
> + if (notify_all)
> + notify_remote_via_irq(priv->irq);
> +
> + RING_FINAL_CHECK_FOR_REQUESTS(&priv->ring, more);
> + }
> }
>
> static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
> {
> + struct xenbus_device *dev = dev_id;
> + struct pvcalls_fedata *priv = NULL;
> +
> + if (dev == NULL)
> + return IRQ_HANDLED;
> +
> + priv = dev_get_drvdata(&dev->dev);
> + if (priv == NULL)
> + return IRQ_HANDLED;
> +
> + /*
> + * TODO: a small theoretical race exists if we try to queue work
> + * after pvcalls_back_work checked for final requests and before
> + * it returns. The queuing will fail, and pvcalls_back_work
> + * won't do the work because it is about to return. In that
> + * case, we lose the notification.
> + */
> + queue_work(priv->wq, &priv->register_work);

Would queuing delayed work (if queue_work() failed) help? And canceling
it on next invocation of pvcalls_back_event()?

-boris

2017-06-13 06:02:30

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 18/18] xen: introduce a Kconfig option to enable the pvcalls backend

On 02/06/17 21:31, Stefano Stabellini wrote:
> Also add pvcalls-back to the Makefile.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/Kconfig | 12 ++++++++++++
> drivers/xen/Makefile | 1 +
> 2 files changed, 13 insertions(+)
>
> diff --git a/drivers/xen/Kconfig b/drivers/xen/Kconfig
> index f15bb3b7..bbdf059 100644
> --- a/drivers/xen/Kconfig
> +++ b/drivers/xen/Kconfig
> @@ -196,6 +196,18 @@ config XEN_PCIDEV_BACKEND
>
> If in doubt, say m.
>
> +config XEN_PVCALLS_BACKEND
> + bool "XEN PV Calls backend driver"
> + depends on INET && XEN

depends on XEN_BACKEND?


Juergen

> + default n
> + help
> + Experimental backend for the Xen PV Calls protocol
> + (https://xenbits.xen.org/docs/unstable/misc/pvcalls.html). It
> + allows PV Calls frontends to send POSIX calls to the backend,
> + which implements them.
> +
> + If in doubt, say n.
> +
> config XEN_SCSI_BACKEND
> tristate "XEN SCSI backend driver"
> depends on XEN && XEN_BACKEND && TARGET_CORE
> diff --git a/drivers/xen/Makefile b/drivers/xen/Makefile
> index 8feab810..480b928 100644
> --- a/drivers/xen/Makefile
> +++ b/drivers/xen/Makefile
> @@ -38,6 +38,7 @@ obj-$(CONFIG_XEN_ACPI_PROCESSOR) += xen-acpi-processor.o
> obj-$(CONFIG_XEN_EFI) += efi.o
> obj-$(CONFIG_XEN_SCSI_BACKEND) += xen-scsiback.o
> obj-$(CONFIG_XEN_AUTO_XLATE) += xlate_mmu.o
> +obj-$(CONFIG_XEN_PVCALLS_BACKEND) += pvcalls-back.o
> xen-evtchn-y := evtchn.o
> xen-gntdev-y := gntdev.o
> xen-gntalloc-y := gntalloc.o
>

2017-06-13 06:06:35

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 07/18] xen/pvcalls: implement socket command

On 02/06/17 21:31, Stefano Stabellini wrote:
> Just reply with success to the other end for now. Delay the allocation
> of the actual socket to bind and/or connect.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 29 ++++++++++++++++++++++++++++-
> 1 file changed, 28 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index 6057533..1f2bb26 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -12,12 +12,17 @@
> * GNU General Public License for more details.
> */
>
> +#include <linux/inet.h>
> #include <linux/kthread.h>
> #include <linux/list.h>
> #include <linux/radix-tree.h>
> #include <linux/module.h>
> #include <linux/semaphore.h>
> #include <linux/wait.h>
> +#include <net/sock.h>
> +#include <net/inet_common.h>
> +#include <net/inet_connection_sock.h>
> +#include <net/request_sock.h>
>
> #include <xen/events.h>
> #include <xen/grant_table.h>
> @@ -54,7 +59,29 @@ struct pvcalls_fedata {
> static int pvcalls_back_socket(struct xenbus_device *dev,
> struct xen_pvcalls_request *req)
> {
> - return 0;
> + struct pvcalls_fedata *priv;
> + int ret;
> + struct xen_pvcalls_response *rsp;
> +
> + priv = dev_get_drvdata(&dev->dev);
> +
> + if (req->u.socket.domain != AF_INET ||
> + req->u.socket.type != SOCK_STREAM ||
> + (req->u.socket.protocol != IPPROTO_IP &&
> + req->u.socket.protocol != AF_INET))
> + ret = -EAFNOSUPPORT;
> + else
> + ret = 0;
> +
> + /* leave the actual socket allocation for later */
> +
> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> + rsp->req_id = req->req_id;
> + rsp->cmd = req->cmd;
> + rsp->u.socket.id = req->u.socket.id;
> + rsp->ret = ret;
> +
> + return ret;

So if ret != 0 this will omit the call of
RING_PUSH_RESPONSES_AND_CHECK_NOTIFY() in pvcalls_back_work().

I think you want to return 0.


Juergen

2017-06-13 06:17:14

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 08/18] xen/pvcalls: implement connect command

On 02/06/17 21:31, Stefano Stabellini wrote:
> Allocate a socket. Keep track of socket <-> ring mappings with a new data
> structure, called sock_mapping. Implement the connect command by calling
> inet_stream_connect, and mapping the new indexes page and data ring.
> Allocate a workqueue and a work_struct, called ioworker, to perform
> reads and writes to the socket.
>
> When an active socket is closed (sk_state_change), set in_error to
> -ENOTCONN and notify the other end, as specified by the protocol.
>
> sk_data_ready and pvcalls_back_ioworker will be implemented later.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 170 +++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 170 insertions(+)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index 1f2bb26..3eb84ef 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -56,6 +56,40 @@ struct pvcalls_fedata {
> struct work_struct register_work;
> };
>
> +struct pvcalls_ioworker {
> + struct work_struct register_work;
> + struct workqueue_struct *wq;
> + unsigned int cpu;
> +};
> +
> +struct sock_mapping {
> + struct list_head list;
> + struct pvcalls_fedata *priv;
> + struct socket *sock;
> + uint64_t id;
> + grant_ref_t ref;
> + struct pvcalls_data_intf *ring;
> + void *bytes;
> + struct pvcalls_data data;
> + uint32_t ring_order;
> + int irq;
> + atomic_t read;
> + atomic_t write;
> + atomic_t io;
> + atomic_t release;
> + void (*saved_data_ready)(struct sock *sk);
> + struct pvcalls_ioworker ioworker;
> +};
> +
> +static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map);
> +static int pvcalls_back_release_active(struct xenbus_device *dev,
> + struct pvcalls_fedata *priv,
> + struct sock_mapping *map);
> +
> +static void pvcalls_back_ioworker(struct work_struct *work)
> +{
> +}
> +
> static int pvcalls_back_socket(struct xenbus_device *dev,
> struct xen_pvcalls_request *req)
> {
> @@ -84,9 +118,140 @@ static int pvcalls_back_socket(struct xenbus_device *dev,
> return ret;
> }
>
> +static void pvcalls_sk_state_change(struct sock *sock)
> +{
> + struct sock_mapping *map = sock->sk_user_data;
> + struct pvcalls_data_intf *intf;
> +
> + if (map == NULL)
> + return;
> +
> + intf = map->ring;
> + intf->in_error = -ENOTCONN;
> + notify_remote_via_irq(map->irq);
> +}
> +
> +static void pvcalls_sk_data_ready(struct sock *sock)
> +{
> +}
> +
> +static struct sock_mapping *pvcalls_new_active_socket(
> + struct pvcalls_fedata *priv,
> + uint64_t id,
> + grant_ref_t ref,
> + uint32_t evtchn,
> + struct socket *sock)
> +{
> + int ret;
> + struct sock_mapping *map = NULL;

Pointless initializer.

> + void *page;
> +
> + map = kzalloc(sizeof(*map), GFP_KERNEL);
> + if (map == NULL)
> + return NULL;
> +
> + map->priv = priv;
> + map->sock = sock;
> + map->id = id;
> + map->ref = ref;
> +
> + ret = xenbus_map_ring_valloc(priv->dev, &ref, 1, &page);
> + if (ret < 0)
> + goto out;
> + map->ring = page;
> + map->ring_order = map->ring->ring_order;
> + /* first read the order, then map the data ring */
> + virt_rmb();
> + if (map->ring_order > MAX_RING_ORDER)

Issue a message?

> + goto out;
> + ret = xenbus_map_ring_valloc(priv->dev, map->ring->ref,
> + (1 << map->ring_order), &page);
> + if (ret < 0)
> + goto out;
> + map->bytes = page;
> +
> + ret = bind_interdomain_evtchn_to_irqhandler(priv->dev->otherend_id,
> + evtchn,
> + pvcalls_back_conn_event,
> + 0,
> + "pvcalls-backend",
> + map);
> + if (ret < 0)
> + goto out;
> + map->irq = ret;
> +
> + map->data.in = map->bytes;
> + map->data.out = map->bytes + XEN_FLEX_RING_SIZE(map->ring_order);
> +
> + map->ioworker.wq = alloc_workqueue("pvcalls_io", WQ_UNBOUND, 1);
> + if (!map->ioworker.wq)
> + goto out;
> + map->ioworker.cpu = get_random_int() % num_online_cpus();

So you expect all cpus [0..num_online_cpus()[ to be online. I don't
think this is always true.

> + atomic_set(&map->io, 1);
> + INIT_WORK(&map->ioworker.register_work, pvcalls_back_ioworker);
> +
> + down(&priv->socket_lock);
> + list_add_tail(&map->list, &priv->socket_mappings);
> + up(&priv->socket_lock);
> +
> + write_lock_bh(&map->sock->sk->sk_callback_lock);
> + map->saved_data_ready = map->sock->sk->sk_data_ready;
> + map->sock->sk->sk_user_data = map;
> + map->sock->sk->sk_data_ready = pvcalls_sk_data_ready;
> + map->sock->sk->sk_state_change = pvcalls_sk_state_change;
> + write_unlock_bh(&map->sock->sk->sk_callback_lock);
> +
> + return map;
> +out:
> + pvcalls_back_release_active(priv->dev, priv, map);
> + return NULL;
> +}
> +
> static int pvcalls_back_connect(struct xenbus_device *dev,
> struct xen_pvcalls_request *req)
> {
> + struct pvcalls_fedata *priv;
> + int ret;
> + struct socket *sock;
> + struct sock_mapping *map = NULL;
> + struct xen_pvcalls_response *rsp;
> +
> + priv = dev_get_drvdata(&dev->dev);
> +
> + ret = sock_create(AF_INET, SOCK_STREAM, 0, &sock);
> + if (ret < 0)
> + goto out;
> + ret = inet_stream_connect(sock, (struct sockaddr *)&req->u.connect.addr,
> + req->u.connect.len, req->u.connect.flags);
> + if (ret < 0) {
> + sock_release(map->sock);
> + goto out;
> + }
> +
> + map = pvcalls_new_active_socket(priv,
> + req->u.connect.id,
> + req->u.connect.ref,
> + req->u.connect.evtchn,
> + sock);
> + if (!map) {
> + sock_release(map->sock);
> + goto out;

ret will be 0 here. So there was a failure and you report success.


Juergen

> + }
> +
> +out:
> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> + rsp->req_id = req->req_id;
> + rsp->cmd = req->cmd;
> + rsp->u.connect.id = req->u.connect.id;
> + rsp->ret = ret;
> +
> + return ret;
> +}
> +
> +static int pvcalls_back_release_active(struct xenbus_device *dev,
> + struct pvcalls_fedata *priv,
> + struct sock_mapping *map)
> +{
> return 0;
> }
>
> @@ -206,6 +371,11 @@ static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
> return IRQ_HANDLED;
> }
>
> +static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map)
> +{
> + return IRQ_HANDLED;
> +}
> +
> static int backend_connect(struct xenbus_device *dev)
> {
> int err, evtchn;
>

2017-06-13 07:00:22

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 09/18] xen/pvcalls: implement bind command

On 02/06/17 21:31, Stefano Stabellini wrote:
> Allocate a socket. Track the allocated passive sockets with a new data
> structure named sockpass_mapping. It contains an unbound workqueue to
> schedule delayed work for the accept and poll commands. It also has a
> reqcopy field to be used to store a copy of a request for delayed work.
> Reads/writes to it are protected by a lock (the "copy_lock" spinlock).
> Initialize the workqueue in pvcalls_back_bind.
>
> Implement the bind command with inet_bind.
>
> The pass_sk_data_ready event handler will be added later.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 87 +++++++++++++++++++++++++++++++++++++++++++++-
> 1 file changed, 86 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index 3eb84ef..4a0cfa3 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -81,6 +81,18 @@ struct sock_mapping {
> struct pvcalls_ioworker ioworker;
> };
>
> +struct sockpass_mapping {
> + struct list_head list;
> + struct pvcalls_fedata *priv;
> + struct socket *sock;
> + uint64_t id;
> + struct xen_pvcalls_request reqcopy;
> + spinlock_t copy_lock;
> + struct workqueue_struct *wq;
> + struct work_struct register_work;
> + void (*saved_data_ready)(struct sock *sk);
> +};
> +
> static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map);
> static int pvcalls_back_release_active(struct xenbus_device *dev,
> struct pvcalls_fedata *priv,
> @@ -261,10 +273,83 @@ static int pvcalls_back_release(struct xenbus_device *dev,
> return 0;
> }
>
> +static void __pvcalls_back_accept(struct work_struct *work)
> +{
> +}
> +
> +static void pvcalls_pass_sk_data_ready(struct sock *sock)
> +{
> +}
> +
> static int pvcalls_back_bind(struct xenbus_device *dev,
> struct xen_pvcalls_request *req)
> {
> - return 0;
> + struct pvcalls_fedata *priv;
> + int ret, err;
> + struct socket *sock;
> + struct sockpass_mapping *map = NULL;

Pointless initializer.

> + struct xen_pvcalls_response *rsp;
> +
> + priv = dev_get_drvdata(&dev->dev);
> +
> + map = kzalloc(sizeof(*map), GFP_KERNEL);
> + if (map == NULL) {
> + ret = -ENOMEM;
> + goto out;
> + }
> +
> + INIT_WORK(&map->register_work, __pvcalls_back_accept);
> + spin_lock_init(&map->copy_lock);
> + map->wq = alloc_workqueue("pvcalls_wq", WQ_UNBOUND, 1);
> + if (!map->wq) {
> + ret = -ENOMEM;
> + kfree(map);
> + goto out;
> + }
> +
> + ret = sock_create(AF_INET, SOCK_STREAM, 0, &sock);
> + if (ret < 0) {
> + destroy_workqueue(map->wq);
> + kfree(map);
> + goto out;
> + }
> +
> + ret = inet_bind(sock, (struct sockaddr *)&req->u.bind.addr,
> + req->u.bind.len);
> + if (ret < 0) {
> + destroy_workqueue(map->wq);
> + kfree(map);

sock_release()?

> + goto out;
> + }
> +
> + map->priv = priv;
> + map->sock = sock;
> + map->id = req->u.bind.id;
> +
> + down(&priv->socket_lock);
> + err = radix_tree_insert(&priv->socketpass_mappings, map->id,
> + map);
> + up(&priv->socket_lock);
> + if (err) {
> + ret = err;
> + destroy_workqueue(map->wq);
> + kfree(map);

sock_release()?

> + goto out;
> + }
> +
> + write_lock_bh(&sock->sk->sk_callback_lock);
> + map->saved_data_ready = sock->sk->sk_data_ready;
> + sock->sk->sk_user_data = map;
> + sock->sk->sk_data_ready = pvcalls_pass_sk_data_ready;
> + write_unlock_bh(&sock->sk->sk_callback_lock);
> +
> +out:
> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> + rsp->req_id = req->req_id;
> + rsp->cmd = req->cmd;
> + rsp->u.bind.id = req->u.bind.id;
> + rsp->ret = ret;
> + return ret;

return 0?


Juergen

> }
>
> static int pvcalls_back_listen(struct xenbus_device *dev,
>

2017-06-13 07:01:00

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 10/18] xen/pvcalls: implement listen command

On 02/06/17 21:31, Stefano Stabellini wrote:
> Call inet_listen to implement the listen command.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 21 ++++++++++++++++++++-
> 1 file changed, 20 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index 4a0cfa3..a75586e 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -355,7 +355,26 @@ static int pvcalls_back_bind(struct xenbus_device *dev,
> static int pvcalls_back_listen(struct xenbus_device *dev,
> struct xen_pvcalls_request *req)
> {
> - return 0;
> + struct pvcalls_fedata *priv;
> + int ret = -EINVAL;
> + struct sockpass_mapping *map;
> + struct xen_pvcalls_response *rsp;
> +
> + priv = dev_get_drvdata(&dev->dev);
> +
> + map = radix_tree_lookup(&priv->socketpass_mappings, req->u.listen.id);
> + if (map == NULL)
> + goto out;
> +
> + ret = inet_listen(map->sock, req->u.listen.backlog);
> +
> +out:
> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> + rsp->req_id = req->req_id;
> + rsp->cmd = req->cmd;
> + rsp->u.listen.id = req->u.listen.id;
> + rsp->ret = ret;
> + return ret;

return 0?


Juergen

> }
>
> static int pvcalls_back_accept(struct xenbus_device *dev,
>

2017-06-13 07:25:04

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 11/18] xen/pvcalls: implement accept command

On 02/06/17 21:31, Stefano Stabellini wrote:
> Implement the accept command by calling inet_accept. To avoid blocking
> in the kernel, call inet_accept(O_NONBLOCK) from a workqueue, which get
> scheduled on sk_data_ready (for a passive socket, it means that there
> are connections to accept).
>
> Use the reqcopy field to store the request. Accept the new socket from
> the delayed work function, create a new sock_mapping for it, map
> the indexes page and data ring, and reply to the other end. Allocate an
> ioworker for the socket.
>
> Only support one outstanding blocking accept request for every socket at
> any time.
>
> Add a field to sock_mapping to remember the passive socket from which an
> active socket was created.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 109 ++++++++++++++++++++++++++++++++++++++++++++-
> 1 file changed, 108 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index a75586e..f1173f4 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -65,6 +65,7 @@ struct pvcalls_ioworker {
> struct sock_mapping {
> struct list_head list;
> struct pvcalls_fedata *priv;
> + struct sockpass_mapping *sockpass;
> struct socket *sock;
> uint64_t id;
> grant_ref_t ref;
> @@ -275,10 +276,79 @@ static int pvcalls_back_release(struct xenbus_device *dev,
>
> static void __pvcalls_back_accept(struct work_struct *work)
> {
> + struct sockpass_mapping *mappass = container_of(
> + work, struct sockpass_mapping, register_work);
> + struct sock_mapping *map;
> + struct pvcalls_ioworker *iow;
> + struct pvcalls_fedata *priv;
> + struct socket *sock;
> + struct xen_pvcalls_response *rsp;
> + struct xen_pvcalls_request *req;
> + int notify;
> + int ret = -EINVAL;
> + unsigned long flags;
> +
> + priv = mappass->priv;
> + /* We only need to check the value of "cmd" atomically on read. */
> + spin_lock_irqsave(&mappass->copy_lock, flags);
> + req = &mappass->reqcopy;
> + if (req->cmd != PVCALLS_ACCEPT) {
> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> + return;
> + }
> + spin_unlock_irqrestore(&mappass->copy_lock, flags);

What about:
req = &mappass->reqcopy;
if (ACCESS_ONCE(req->cmd) != PVCALLS_ACCEPT)
return;

I can't see the need for taking a lock here.

> +
> + sock = sock_alloc();
> + if (sock == NULL)
> + goto out_error;
> + sock->type = mappass->sock->type;
> + sock->ops = mappass->sock->ops;
> +
> + ret = inet_accept(mappass->sock, sock, O_NONBLOCK, true);
> + if (ret == -EAGAIN) {
> + sock_release(sock);
> + goto out_error;
> + }
> +
> + map = pvcalls_new_active_socket(priv,
> + req->u.accept.id_new,
> + req->u.accept.ref,
> + req->u.accept.evtchn,
> + sock);
> + if (!map) {
> + sock_release(sock);
> + goto out_error;
> + }
> +
> + map->sockpass = mappass;
> + iow = &map->ioworker;
> + atomic_inc(&map->read);
> + atomic_inc(&map->io);
> + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
> +
> +out_error:
> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> + rsp->req_id = req->req_id;
> + rsp->cmd = req->cmd;
> + rsp->u.accept.id = req->u.accept.id;
> + rsp->ret = ret;
> + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
> + if (notify)
> + notify_remote_via_irq(priv->irq);
> +
> + spin_lock_irqsave(&mappass->copy_lock, flags);
> + mappass->reqcopy.cmd = 0;
> + spin_unlock_irqrestore(&mappass->copy_lock, flags);

ACCESS_ONCE(mappass->reqcopy.cmd) = 0;

> }
>
> static void pvcalls_pass_sk_data_ready(struct sock *sock)
> {
> + struct sockpass_mapping *mappass = sock->sk_user_data;
> +
> + if (mappass == NULL)
> + return;
> +
> + queue_work(mappass->wq, &mappass->register_work);
> }
>
> static int pvcalls_back_bind(struct xenbus_device *dev,
> @@ -380,7 +450,44 @@ static int pvcalls_back_listen(struct xenbus_device *dev,
> static int pvcalls_back_accept(struct xenbus_device *dev,
> struct xen_pvcalls_request *req)
> {
> - return 0;
> + struct pvcalls_fedata *priv;
> + struct sockpass_mapping *mappass;
> + int ret = -EINVAL;
> + struct xen_pvcalls_response *rsp;
> + unsigned long flags;
> +
> + priv = dev_get_drvdata(&dev->dev);
> +
> + mappass = radix_tree_lookup(&priv->socketpass_mappings,
> + req->u.accept.id);
> + if (mappass == NULL)
> + goto out_error;
> +
> + /*
> + * Limitation of the current implementation: only support one
> + * concurrent accept or poll call on one socket.
> + */
> + spin_lock_irqsave(&mappass->copy_lock, flags);
> + if (mappass->reqcopy.cmd != 0) {
> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> + ret = -EINTR;
> + goto out_error;
> + }
> +
> + mappass->reqcopy = *req;

This time you need the lock, however you should use:

ACCESS_ONCE(mappass->reqcopy) = *req;

> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> + queue_work(mappass->wq, &mappass->register_work);
> +
> + /* Tell the caller we don't need to send back a notification yet */
> + return -1;
> +
> +out_error:
> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> + rsp->req_id = req->req_id;
> + rsp->cmd = req->cmd;
> + rsp->u.accept.id = req->u.accept.id;
> + rsp->ret = ret;
> + return ret;

return 0?


Juergen

> }
>
> static int pvcalls_back_poll(struct xenbus_device *dev,
>

2017-06-13 07:28:05

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 12/18] xen/pvcalls: implement poll command

On 02/06/17 21:31, Stefano Stabellini wrote:
> Implement poll on passive sockets by requesting a delayed response with
> mappass->reqcopy, and reply back when there is data on the passive
> socket.
>
> Poll on active socket is unimplemented as by the spec, as the frontend
> should just wait for events and check the indexes on the indexes page.
>
> Only support one outstanding poll (or accept) request for every passive
> socket at any given time.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 75 ++++++++++++++++++++++++++++++++++++++++++++--
> 1 file changed, 73 insertions(+), 2 deletions(-)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index f1173f4..82f350d 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -344,11 +344,33 @@ static void __pvcalls_back_accept(struct work_struct *work)
> static void pvcalls_pass_sk_data_ready(struct sock *sock)
> {
> struct sockpass_mapping *mappass = sock->sk_user_data;
> + struct pvcalls_fedata *priv;
> + struct xen_pvcalls_response *rsp;
> + unsigned long flags;
> + int notify;
>
> if (mappass == NULL)
> return;
>
> - queue_work(mappass->wq, &mappass->register_work);
> + priv = mappass->priv;
> + spin_lock_irqsave(&mappass->copy_lock, flags);
> + if (mappass->reqcopy.cmd == PVCALLS_POLL) {
> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> + rsp->req_id = mappass->reqcopy.req_id;
> + rsp->u.poll.id = mappass->reqcopy.u.poll.id;
> + rsp->cmd = mappass->reqcopy.cmd;
> + rsp->ret = 0;
> +
> + mappass->reqcopy.cmd = 0;
> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> +
> + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
> + if (notify)
> + notify_remote_via_irq(mappass->priv->irq);
> + } else {
> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> + queue_work(mappass->wq, &mappass->register_work);
> + }
> }
>
> static int pvcalls_back_bind(struct xenbus_device *dev,
> @@ -493,7 +515,56 @@ static int pvcalls_back_accept(struct xenbus_device *dev,
> static int pvcalls_back_poll(struct xenbus_device *dev,
> struct xen_pvcalls_request *req)
> {
> - return 0;
> + struct pvcalls_fedata *priv;
> + struct sockpass_mapping *mappass;
> + struct xen_pvcalls_response *rsp;
> + struct inet_connection_sock *icsk;
> + struct request_sock_queue *queue;
> + unsigned long flags;
> + int ret;
> + bool data;
> +
> + priv = dev_get_drvdata(&dev->dev);
> +
> + mappass = radix_tree_lookup(&priv->socketpass_mappings, req->u.poll.id);
> + if (mappass == NULL)
> + return -EINVAL;
> +
> + /*
> + * Limitation of the current implementation: only support one
> + * concurrent accept or poll call on one socket.
> + */
> + spin_lock_irqsave(&mappass->copy_lock, flags);
> + if (mappass->reqcopy.cmd != 0) {
> + ret = -EINTR;
> + goto out;
> + }
> +
> + mappass->reqcopy = *req;
> + icsk = inet_csk(mappass->sock->sk);
> + queue = &icsk->icsk_accept_queue;
> + spin_lock(&queue->rskq_lock);
> + data = queue->rskq_accept_head != NULL;
> + spin_unlock(&queue->rskq_lock);
> + if (data) {
> + mappass->reqcopy.cmd = 0;
> + ret = 0;
> + goto out;
> + }
> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> +
> + /* Tell the caller we don't need to send back a notification yet */
> + return -1;
> +
> +out:
> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> +
> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> + rsp->req_id = req->req_id;
> + rsp->cmd = req->cmd;
> + rsp->u.poll.id = req->u.poll.id;
> + rsp->ret = ret;
> + return ret;

return 0;


Juergen

> }
>
> static int pvcalls_back_handle_cmd(struct xenbus_device *dev,
>

2017-06-13 07:39:43

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 13/18] xen/pvcalls: implement release command

On 02/06/17 21:31, Stefano Stabellini wrote:
> Release both active and passive sockets. For active sockets, make sure
> to avoid possible conflicts with the ioworker reading/writing to those
> sockets concurrently. Set map->release to let the ioworker know
> atomically that the socket will be released soon, then wait until the
> ioworker finishes (flush_work).
>
> Unmap indexes pages and data rings.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 72 +++++++++++++++++++++++++++++++++++++++++++++-
> 1 file changed, 71 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index 82f350d..b541887 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -265,13 +265,83 @@ static int pvcalls_back_release_active(struct xenbus_device *dev,
> struct pvcalls_fedata *priv,
> struct sock_mapping *map)
> {
> + disable_irq(map->irq);
> + if (map->sock->sk != NULL) {
> + write_lock_bh(&map->sock->sk->sk_callback_lock);
> + map->sock->sk->sk_user_data = NULL;
> + map->sock->sk->sk_data_ready = map->saved_data_ready;
> + write_unlock_bh(&map->sock->sk->sk_callback_lock);
> + }
> +
> + atomic_set(&map->release, 1);
> + flush_work(&map->ioworker.register_work);
> +
> + down(&priv->socket_lock);
> + list_del(&map->list);
> + up(&priv->socket_lock);
> +
> + xenbus_unmap_ring_vfree(dev, (void *)map->bytes);
> + xenbus_unmap_ring_vfree(dev, (void *)map->ring);

I don't think you need the casts to (void*) here.

> + unbind_from_irqhandler(map->irq, map);
> +
> + sock_release(map->sock);
> + kfree(map);
> +
> + return 0;
> +}
> +
> +static int pvcalls_back_release_passive(struct xenbus_device *dev,
> + struct pvcalls_fedata *priv,
> + struct sockpass_mapping *mappass)
> +{
> + if (mappass->sock->sk != NULL) {
> + write_lock_bh(&mappass->sock->sk->sk_callback_lock);
> + mappass->sock->sk->sk_user_data = NULL;
> + mappass->sock->sk->sk_data_ready = mappass->saved_data_ready;
> + write_unlock_bh(&mappass->sock->sk->sk_callback_lock);
> + }
> + down(&priv->socket_lock);
> + radix_tree_delete(&priv->socketpass_mappings, mappass->id);
> + sock_release(mappass->sock);
> + flush_workqueue(mappass->wq);
> + destroy_workqueue(mappass->wq);
> + kfree(mappass);
> + up(&priv->socket_lock);
> +
> return 0;
> }
>
> static int pvcalls_back_release(struct xenbus_device *dev,
> struct xen_pvcalls_request *req)
> {
> - return 0;
> + struct pvcalls_fedata *priv;
> + struct sock_mapping *map, *n;
> + struct sockpass_mapping *mappass;
> + int ret = 0;
> + struct xen_pvcalls_response *rsp;
> +
> + priv = dev_get_drvdata(&dev->dev);
> +
> + list_for_each_entry_safe(map, n, &priv->socket_mappings, list) {
> + if (map->id == req->u.release.id) {
> + ret = pvcalls_back_release_active(dev, priv, map);
> + goto out;
> + }
> + }
> + mappass = radix_tree_lookup(&priv->socketpass_mappings,
> + req->u.release.id);
> + if (mappass != NULL) {
> + ret = pvcalls_back_release_passive(dev, priv, mappass);
> + goto out;
> + }
> +
> +out:
> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> + rsp->req_id = req->req_id;
> + rsp->u.release.id = req->u.release.id;
> + rsp->cmd = req->cmd;
> + rsp->ret = ret;
> + return ret;

return 0;


Juergen

> }
>
> static void __pvcalls_back_accept(struct work_struct *work)
>

2017-06-13 07:49:20

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 14/18] xen/pvcalls: disconnect and module_exit

On 02/06/17 21:31, Stefano Stabellini wrote:
> Implement backend_disconnect. Call pvcalls_back_release_active on active
> sockets and pvcalls_back_release_passive on passive sockets.
>
> Implement module_exit by calling backend_disconnect on frontend
> connections.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 49 ++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 49 insertions(+)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index b541887..6afe7a0 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -800,6 +800,38 @@ static int backend_connect(struct xenbus_device *dev)
>
> static int backend_disconnect(struct xenbus_device *dev)
> {
> + struct pvcalls_fedata *priv;
> + struct sock_mapping *map, *n;
> + struct sockpass_mapping *mappass;
> + struct radix_tree_iter iter;
> + void **slot;
> +
> +
> + priv = dev_get_drvdata(&dev->dev);
> +
> + list_for_each_entry_safe(map, n, &priv->socket_mappings, list) {
> + pvcalls_back_release_active(dev, priv, map);
> + }

You can drop the {}

> +
> + radix_tree_for_each_slot(slot, &priv->socketpass_mappings, &iter, 0) {
> + mappass = radix_tree_deref_slot(slot);
> + if (!mappass || radix_tree_exception(mappass)) {

This looks fishy.

You might call radix_tree_deref_retry(NULL). Right now this is okay,
but you depend on the radix tree internals here.

> + if (radix_tree_deref_retry(mappass)) {
> + slot = radix_tree_iter_retry(&iter);
> + continue;

The continue; statement is pointless here.

> + }
> + } else
> + pvcalls_back_release_passive(dev, priv, mappass);
> + }
> +
> + xenbus_unmap_ring_vfree(dev, (void *)priv->sring);

Drop the cast.

> + unbind_from_irqhandler(priv->irq, dev);
> +
> + list_del(&priv->list);
> + destroy_workqueue(priv->wq);
> + kfree(priv);
> + dev_set_drvdata(&dev->dev, NULL);
> +
> return 0;
> }
>
> @@ -993,3 +1025,20 @@ static int __init pvcalls_back_init(void)
> return 0;
> }
> module_init(pvcalls_back_init);
> +
> +static void __exit pvcalls_back_fin(void)
> +{
> + struct pvcalls_fedata *priv, *npriv;
> +
> + down(&pvcalls_back_global.frontends_lock);
> + list_for_each_entry_safe(priv, npriv, &pvcalls_back_global.frontends,
> + list) {
> + backend_disconnect(priv->dev);
> + }
> + up(&pvcalls_back_global.frontends_lock);
> +
> + xenbus_unregister_driver(&pvcalls_back_driver);
> + memset(&pvcalls_back_global, 0, sizeof(pvcalls_back_global));

Why?


Juergen

> +}
> +
> +module_exit(pvcalls_back_fin);
>

2017-06-13 07:51:43

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 15/18] xen/pvcalls: implement the ioworker functions

On 02/06/17 21:31, Stefano Stabellini wrote:
> We have one ioworker per socket. Each ioworker goes through the list of
> outstanding read/write requests. Once all requests have been dealt with,
> it returns.
>
> We use one atomic counter per socket for "read" operations and one
> for "write" operations to keep track of the reads/writes to do.
>
> We also use one atomic counter ("io") per ioworker to keep track of how
> many outstanding requests we have in total assigned to the ioworker. The
> ioworker finishes when there are none.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 27 +++++++++++++++++++++++++++
> 1 file changed, 27 insertions(+)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index 6afe7a0..0283d49 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -99,8 +99,35 @@ static int pvcalls_back_release_active(struct xenbus_device *dev,
> struct pvcalls_fedata *priv,
> struct sock_mapping *map);
>
> +static void pvcalls_conn_back_read(unsigned long opaque)

Why not void *opaque? You could drop the cast below then.


Juergen

> +{
> +}
> +
> +static int pvcalls_conn_back_write(struct sock_mapping *map)
> +{
> + return 0;
> +}
> +
> static void pvcalls_back_ioworker(struct work_struct *work)
> {
> + struct pvcalls_ioworker *ioworker = container_of(work,
> + struct pvcalls_ioworker, register_work);
> + struct sock_mapping *map = container_of(ioworker, struct sock_mapping,
> + ioworker);
> +
> + while (atomic_read(&map->io) > 0) {
> + if (atomic_read(&map->release) > 0) {
> + atomic_set(&map->release, 0);
> + return;
> + }
> +
> + if (atomic_read(&map->read) > 0)
> + pvcalls_conn_back_read((unsigned long)map);
> + if (atomic_read(&map->write) > 0)
> + pvcalls_conn_back_write(map);
> +
> + atomic_dec(&map->io);
> + }
> }
>
> static int pvcalls_back_socket(struct xenbus_device *dev,
>

2017-06-13 07:56:17

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 16/18] xen/pvcalls: implement read

On 02/06/17 21:31, Stefano Stabellini wrote:
> When an active socket has data available, increment the io and read
> counters, and schedule the ioworker.
>
> Implement the read function by reading from the socket, writing the data
> to the data ring.
>
> Set in_error on error.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 85 insertions(+)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index 0283d49..e7d2b85 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -101,6 +101,81 @@ static int pvcalls_back_release_active(struct xenbus_device *dev,
>
> static void pvcalls_conn_back_read(unsigned long opaque)
> {
> + struct sock_mapping *map = (struct sock_mapping *)opaque;
> + struct msghdr msg;
> + struct kvec vec[2];
> + RING_IDX cons, prod, size, wanted, array_size, masked_prod, masked_cons;
> + int32_t error;
> + struct pvcalls_data_intf *intf = map->ring;
> + struct pvcalls_data *data = &map->data;
> + unsigned long flags;
> + int ret;
> +
> + array_size = XEN_FLEX_RING_SIZE(map->ring_order);
> + cons = intf->in_cons;
> + prod = intf->in_prod;
> + error = intf->in_error;
> + /* read the indexes first, then deal with the data */
> + virt_mb();
> +
> + if (error)
> + return;
> +
> + size = pvcalls_queued(prod, cons, array_size);
> + if (size >= array_size)
> + return;
> + spin_lock_irqsave(&map->sock->sk->sk_receive_queue.lock, flags);
> + if (skb_queue_empty(&map->sock->sk->sk_receive_queue)) {
> + atomic_set(&map->read, 0);
> + spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock,
> + flags);
> + return;
> + }
> + spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock, flags);
> + wanted = array_size - size;
> + masked_prod = pvcalls_mask(prod, array_size);
> + masked_cons = pvcalls_mask(cons, array_size);
> +
> + memset(&msg, 0, sizeof(msg));
> + msg.msg_iter.type = ITER_KVEC|WRITE;
> + msg.msg_iter.count = wanted;
> + if (masked_prod < masked_cons) {
> + vec[0].iov_base = data->in + masked_prod;
> + vec[0].iov_len = wanted;
> + msg.msg_iter.kvec = vec;
> + msg.msg_iter.nr_segs = 1;
> + } else {
> + vec[0].iov_base = data->in + masked_prod;
> + vec[0].iov_len = array_size - masked_prod;
> + vec[1].iov_base = data->in;
> + vec[1].iov_len = wanted - vec[0].iov_len;
> + msg.msg_iter.kvec = vec;
> + msg.msg_iter.nr_segs = 2;
> + }
> +
> + atomic_set(&map->read, 0);
> + ret = inet_recvmsg(map->sock, &msg, wanted, MSG_DONTWAIT);
> + WARN_ON(ret > 0 && ret > wanted);

wanted is always > 0, so you can omit the ret > 0 test.


Juergen

> + if (ret == -EAGAIN) /* shouldn't happen */
> + return;
> + if (!ret)
> + ret = -ENOTCONN;
> + spin_lock_irqsave(&map->sock->sk->sk_receive_queue.lock, flags);
> + if (ret > 0 && !skb_queue_empty(&map->sock->sk->sk_receive_queue))
> + atomic_inc(&map->read);
> + spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock, flags);
> +
> + /* write the data, then modify the indexes */
> + virt_wmb();
> + if (ret < 0)
> + intf->in_error = ret;
> + else
> + intf->in_prod = prod + ret;
> + /* update the indexes, then notify the other end */
> + virt_wmb();
> + notify_remote_via_irq(map->irq);
> +
> + return;
> }
>
> static int pvcalls_conn_back_write(struct sock_mapping *map)
> @@ -173,6 +248,16 @@ static void pvcalls_sk_state_change(struct sock *sock)
>
> static void pvcalls_sk_data_ready(struct sock *sock)
> {
> + struct sock_mapping *map = sock->sk_user_data;
> + struct pvcalls_ioworker *iow;
> +
> + if (map == NULL)
> + return;
> +
> + iow = &map->ioworker;
> + atomic_inc(&map->read);
> + atomic_inc(&map->io);
> + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
> }
>
> static struct sock_mapping *pvcalls_new_active_socket(
>

2017-06-13 08:00:01

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 17/18] xen/pvcalls: implement write

On 02/06/17 21:31, Stefano Stabellini wrote:
> When the other end notifies us that there is data to be written
> (pvcalls_back_conn_event), increment the io and write counters, and
> schedule the ioworker.
>
> Implement the write function called by ioworker by reading the data from
> the data ring, writing it to the socket by calling inet_sendmsg.
>
> Set out_error on error.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-back.c | 74 +++++++++++++++++++++++++++++++++++++++++++++-
> 1 file changed, 73 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> index e7d2b85..fe3e70f 100644
> --- a/drivers/xen/pvcalls-back.c
> +++ b/drivers/xen/pvcalls-back.c
> @@ -180,7 +180,66 @@ static void pvcalls_conn_back_read(unsigned long opaque)
>
> static int pvcalls_conn_back_write(struct sock_mapping *map)
> {
> - return 0;
> + struct pvcalls_data_intf *intf = map->ring;
> + struct pvcalls_data *data = &map->data;
> + struct msghdr msg;
> + struct kvec vec[2];
> + RING_IDX cons, prod, size, ring_size;
> + int ret;
> +
> + cons = intf->out_cons;
> + prod = intf->out_prod;
> + /* read the indexes before dealing with the data */
> + virt_mb();
> +
> + ring_size = XEN_FLEX_RING_SIZE(map->ring_order);
> + size = pvcalls_queued(prod, cons, ring_size);
> + if (size == 0)
> + return 0;
> +
> + memset(&msg, 0, sizeof(msg));
> + msg.msg_flags |= MSG_DONTWAIT;
> + msg.msg_iter.type = ITER_KVEC|READ;
> + msg.msg_iter.count = size;
> + if (pvcalls_mask(prod, ring_size) > pvcalls_mask(cons, ring_size)) {
> + vec[0].iov_base = data->out + pvcalls_mask(cons, ring_size);
> + vec[0].iov_len = size;
> + msg.msg_iter.kvec = vec;
> + msg.msg_iter.nr_segs = 1;
> + } else {
> + vec[0].iov_base = data->out + pvcalls_mask(cons, ring_size);
> + vec[0].iov_len = ring_size - pvcalls_mask(cons, ring_size);
> + vec[1].iov_base = data->out;
> + vec[1].iov_len = size - vec[0].iov_len;
> + msg.msg_iter.kvec = vec;
> + msg.msg_iter.nr_segs = 2;
> + }
> +
> + atomic_set(&map->write, 0);
> + ret = inet_sendmsg(map->sock, &msg, size);
> + if (ret == -EAGAIN || ret < size) {

Do you really want to do this for all errors?
Or did you mean:
if ((ret >= 0 && ret < size) || ret == -EAGAIN)


Juergen

> + atomic_inc(&map->write);
> + atomic_inc(&map->io);
> + }
> + if (ret == -EAGAIN)
> + return ret;
> +
> + /* write the data, then update the indexes */
> + virt_wmb();
> + if (ret < 0) {
> + intf->out_error = ret;
> + } else {
> + intf->out_error = 0;
> + intf->out_cons = cons + ret;
> + prod = intf->out_prod;
> + }
> + /* update the indexes, then notify the other end */
> + virt_wmb();
> + if (prod != cons + ret)
> + atomic_inc(&map->write);
> + notify_remote_via_irq(map->irq);
> +
> + return ret;
> }
>
> static void pvcalls_back_ioworker(struct work_struct *work)
> @@ -837,6 +896,19 @@ static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
>
> static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map)
> {
> + struct sock_mapping *map = sock_map;
> + struct pvcalls_ioworker *iow;
> +
> + if (map == NULL || map->sock == NULL || map->sock->sk == NULL ||
> + map->sock->sk->sk_user_data != map)
> + return IRQ_HANDLED;
> +
> + iow = &map->ioworker;
> +
> + atomic_inc(&map->write);
> + atomic_inc(&map->io);
> + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
> +
> return IRQ_HANDLED;
> }
>
>

2017-06-13 23:47:07

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 18/18] xen: introduce a Kconfig option to enable the pvcalls backend

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > Also add pvcalls-back to the Makefile.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/Kconfig | 12 ++++++++++++
> > drivers/xen/Makefile | 1 +
> > 2 files changed, 13 insertions(+)
> >
> > diff --git a/drivers/xen/Kconfig b/drivers/xen/Kconfig
> > index f15bb3b7..bbdf059 100644
> > --- a/drivers/xen/Kconfig
> > +++ b/drivers/xen/Kconfig
> > @@ -196,6 +196,18 @@ config XEN_PCIDEV_BACKEND
> >
> > If in doubt, say m.
> >
> > +config XEN_PVCALLS_BACKEND
> > + bool "XEN PV Calls backend driver"
> > + depends on INET && XEN
>
> depends on XEN_BACKEND?

Yes, I'll make the change


> > + default n
> > + help
> > + Experimental backend for the Xen PV Calls protocol
> > + (https://xenbits.xen.org/docs/unstable/misc/pvcalls.html). It
> > + allows PV Calls frontends to send POSIX calls to the backend,
> > + which implements them.
> > +
> > + If in doubt, say n.
> > +
> > config XEN_SCSI_BACKEND
> > tristate "XEN SCSI backend driver"
> > depends on XEN && XEN_BACKEND && TARGET_CORE
> > diff --git a/drivers/xen/Makefile b/drivers/xen/Makefile
> > index 8feab810..480b928 100644
> > --- a/drivers/xen/Makefile
> > +++ b/drivers/xen/Makefile
> > @@ -38,6 +38,7 @@ obj-$(CONFIG_XEN_ACPI_PROCESSOR) += xen-acpi-processor.o
> > obj-$(CONFIG_XEN_EFI) += efi.o
> > obj-$(CONFIG_XEN_SCSI_BACKEND) += xen-scsiback.o
> > obj-$(CONFIG_XEN_AUTO_XLATE) += xlate_mmu.o
> > +obj-$(CONFIG_XEN_PVCALLS_BACKEND) += pvcalls-back.o
> > xen-evtchn-y := evtchn.o
> > xen-gntdev-y := gntdev.o
> > xen-gntalloc-y := gntalloc.o
> >
>

2017-06-14 00:46:31

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 07/18] xen/pvcalls: implement socket command

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > Just reply with success to the other end for now. Delay the allocation
> > of the actual socket to bind and/or connect.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 29 ++++++++++++++++++++++++++++-
> > 1 file changed, 28 insertions(+), 1 deletion(-)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index 6057533..1f2bb26 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -12,12 +12,17 @@
> > * GNU General Public License for more details.
> > */
> >
> > +#include <linux/inet.h>
> > #include <linux/kthread.h>
> > #include <linux/list.h>
> > #include <linux/radix-tree.h>
> > #include <linux/module.h>
> > #include <linux/semaphore.h>
> > #include <linux/wait.h>
> > +#include <net/sock.h>
> > +#include <net/inet_common.h>
> > +#include <net/inet_connection_sock.h>
> > +#include <net/request_sock.h>
> >
> > #include <xen/events.h>
> > #include <xen/grant_table.h>
> > @@ -54,7 +59,29 @@ struct pvcalls_fedata {
> > static int pvcalls_back_socket(struct xenbus_device *dev,
> > struct xen_pvcalls_request *req)
> > {
> > - return 0;
> > + struct pvcalls_fedata *priv;
> > + int ret;
> > + struct xen_pvcalls_response *rsp;
> > +
> > + priv = dev_get_drvdata(&dev->dev);
> > +
> > + if (req->u.socket.domain != AF_INET ||
> > + req->u.socket.type != SOCK_STREAM ||
> > + (req->u.socket.protocol != IPPROTO_IP &&
> > + req->u.socket.protocol != AF_INET))
> > + ret = -EAFNOSUPPORT;
> > + else
> > + ret = 0;
> > +
> > + /* leave the actual socket allocation for later */
> > +
> > + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> > + rsp->req_id = req->req_id;
> > + rsp->cmd = req->cmd;
> > + rsp->u.socket.id = req->u.socket.id;
> > + rsp->ret = ret;
> > +
> > + return ret;
>
> So if ret != 0 this will omit the call of
> RING_PUSH_RESPONSES_AND_CHECK_NOTIFY() in pvcalls_back_work().
>
> I think you want to return 0.

Yes, you are right, thanks!

2017-06-14 00:46:45

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 08/18] xen/pvcalls: implement connect command

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > Allocate a socket. Keep track of socket <-> ring mappings with a new data
> > structure, called sock_mapping. Implement the connect command by calling
> > inet_stream_connect, and mapping the new indexes page and data ring.
> > Allocate a workqueue and a work_struct, called ioworker, to perform
> > reads and writes to the socket.
> >
> > When an active socket is closed (sk_state_change), set in_error to
> > -ENOTCONN and notify the other end, as specified by the protocol.
> >
> > sk_data_ready and pvcalls_back_ioworker will be implemented later.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 170 +++++++++++++++++++++++++++++++++++++++++++++
> > 1 file changed, 170 insertions(+)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index 1f2bb26..3eb84ef 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -56,6 +56,40 @@ struct pvcalls_fedata {
> > struct work_struct register_work;
> > };
> >
> > +struct pvcalls_ioworker {
> > + struct work_struct register_work;
> > + struct workqueue_struct *wq;
> > + unsigned int cpu;
> > +};
> > +
> > +struct sock_mapping {
> > + struct list_head list;
> > + struct pvcalls_fedata *priv;
> > + struct socket *sock;
> > + uint64_t id;
> > + grant_ref_t ref;
> > + struct pvcalls_data_intf *ring;
> > + void *bytes;
> > + struct pvcalls_data data;
> > + uint32_t ring_order;
> > + int irq;
> > + atomic_t read;
> > + atomic_t write;
> > + atomic_t io;
> > + atomic_t release;
> > + void (*saved_data_ready)(struct sock *sk);
> > + struct pvcalls_ioworker ioworker;
> > +};
> > +
> > +static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map);
> > +static int pvcalls_back_release_active(struct xenbus_device *dev,
> > + struct pvcalls_fedata *priv,
> > + struct sock_mapping *map);
> > +
> > +static void pvcalls_back_ioworker(struct work_struct *work)
> > +{
> > +}
> > +
> > static int pvcalls_back_socket(struct xenbus_device *dev,
> > struct xen_pvcalls_request *req)
> > {
> > @@ -84,9 +118,140 @@ static int pvcalls_back_socket(struct xenbus_device *dev,
> > return ret;
> > }
> >
> > +static void pvcalls_sk_state_change(struct sock *sock)
> > +{
> > + struct sock_mapping *map = sock->sk_user_data;
> > + struct pvcalls_data_intf *intf;
> > +
> > + if (map == NULL)
> > + return;
> > +
> > + intf = map->ring;
> > + intf->in_error = -ENOTCONN;
> > + notify_remote_via_irq(map->irq);
> > +}
> > +
> > +static void pvcalls_sk_data_ready(struct sock *sock)
> > +{
> > +}
> > +
> > +static struct sock_mapping *pvcalls_new_active_socket(
> > + struct pvcalls_fedata *priv,
> > + uint64_t id,
> > + grant_ref_t ref,
> > + uint32_t evtchn,
> > + struct socket *sock)
> > +{
> > + int ret;
> > + struct sock_mapping *map = NULL;
>
> Pointless initializer.

I'll fix


> > + void *page;
> > +
> > + map = kzalloc(sizeof(*map), GFP_KERNEL);
> > + if (map == NULL)
> > + return NULL;
> > +
> > + map->priv = priv;
> > + map->sock = sock;
> > + map->id = id;
> > + map->ref = ref;
> > +
> > + ret = xenbus_map_ring_valloc(priv->dev, &ref, 1, &page);
> > + if (ret < 0)
> > + goto out;
> > + map->ring = page;
> > + map->ring_order = map->ring->ring_order;
> > + /* first read the order, then map the data ring */
> > + virt_rmb();
> > + if (map->ring_order > MAX_RING_ORDER)
>
> Issue a message?

OK


> > + goto out;
> > + ret = xenbus_map_ring_valloc(priv->dev, map->ring->ref,
> > + (1 << map->ring_order), &page);
> > + if (ret < 0)
> > + goto out;
> > + map->bytes = page;
> > +
> > + ret = bind_interdomain_evtchn_to_irqhandler(priv->dev->otherend_id,
> > + evtchn,
> > + pvcalls_back_conn_event,
> > + 0,
> > + "pvcalls-backend",
> > + map);
> > + if (ret < 0)
> > + goto out;
> > + map->irq = ret;
> > +
> > + map->data.in = map->bytes;
> > + map->data.out = map->bytes + XEN_FLEX_RING_SIZE(map->ring_order);
> > +
> > + map->ioworker.wq = alloc_workqueue("pvcalls_io", WQ_UNBOUND, 1);
> > + if (!map->ioworker.wq)
> > + goto out;
> > + map->ioworker.cpu = get_random_int() % num_online_cpus();
>
> So you expect all cpus [0..num_online_cpus()[ to be online. I don't
> think this is always true.

I'll drop the cpu selection


> > + atomic_set(&map->io, 1);
> > + INIT_WORK(&map->ioworker.register_work, pvcalls_back_ioworker);
> > +
> > + down(&priv->socket_lock);
> > + list_add_tail(&map->list, &priv->socket_mappings);
> > + up(&priv->socket_lock);
> > +
> > + write_lock_bh(&map->sock->sk->sk_callback_lock);
> > + map->saved_data_ready = map->sock->sk->sk_data_ready;
> > + map->sock->sk->sk_user_data = map;
> > + map->sock->sk->sk_data_ready = pvcalls_sk_data_ready;
> > + map->sock->sk->sk_state_change = pvcalls_sk_state_change;
> > + write_unlock_bh(&map->sock->sk->sk_callback_lock);
> > +
> > + return map;
> > +out:
> > + pvcalls_back_release_active(priv->dev, priv, map);
> > + return NULL;
> > +}
> > +
> > static int pvcalls_back_connect(struct xenbus_device *dev,
> > struct xen_pvcalls_request *req)
> > {
> > + struct pvcalls_fedata *priv;
> > + int ret;
> > + struct socket *sock;
> > + struct sock_mapping *map = NULL;
> > + struct xen_pvcalls_response *rsp;
> > +
> > + priv = dev_get_drvdata(&dev->dev);
> > +
> > + ret = sock_create(AF_INET, SOCK_STREAM, 0, &sock);
> > + if (ret < 0)
> > + goto out;
> > + ret = inet_stream_connect(sock, (struct sockaddr *)&req->u.connect.addr,
> > + req->u.connect.len, req->u.connect.flags);
> > + if (ret < 0) {
> > + sock_release(map->sock);
> > + goto out;
> > + }
> > +
> > + map = pvcalls_new_active_socket(priv,
> > + req->u.connect.id,
> > + req->u.connect.ref,
> > + req->u.connect.evtchn,
> > + sock);
> > + if (!map) {
> > + sock_release(map->sock);
> > + goto out;
>
> ret will be 0 here. So there was a failure and you report success.

I'll fix


> > + }
> > +
> > +out:
> > + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> > + rsp->req_id = req->req_id;
> > + rsp->cmd = req->cmd;
> > + rsp->u.connect.id = req->u.connect.id;
> > + rsp->ret = ret;
> > +
> > + return ret;
> > +}
> > +
> > +static int pvcalls_back_release_active(struct xenbus_device *dev,
> > + struct pvcalls_fedata *priv,
> > + struct sock_mapping *map)
> > +{
> > return 0;
> > }
> >
> > @@ -206,6 +371,11 @@ static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
> > return IRQ_HANDLED;
> > }
> >
> > +static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map)
> > +{
> > + return IRQ_HANDLED;
> > +}
> > +
> > static int backend_connect(struct xenbus_device *dev)
> > {
> > int err, evtchn;
> >
>

2017-06-14 00:46:52

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 09/18] xen/pvcalls: implement bind command

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > Allocate a socket. Track the allocated passive sockets with a new data
> > structure named sockpass_mapping. It contains an unbound workqueue to
> > schedule delayed work for the accept and poll commands. It also has a
> > reqcopy field to be used to store a copy of a request for delayed work.
> > Reads/writes to it are protected by a lock (the "copy_lock" spinlock).
> > Initialize the workqueue in pvcalls_back_bind.
> >
> > Implement the bind command with inet_bind.
> >
> > The pass_sk_data_ready event handler will be added later.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 87 +++++++++++++++++++++++++++++++++++++++++++++-
> > 1 file changed, 86 insertions(+), 1 deletion(-)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index 3eb84ef..4a0cfa3 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -81,6 +81,18 @@ struct sock_mapping {
> > struct pvcalls_ioworker ioworker;
> > };
> >
> > +struct sockpass_mapping {
> > + struct list_head list;
> > + struct pvcalls_fedata *priv;
> > + struct socket *sock;
> > + uint64_t id;
> > + struct xen_pvcalls_request reqcopy;
> > + spinlock_t copy_lock;
> > + struct workqueue_struct *wq;
> > + struct work_struct register_work;
> > + void (*saved_data_ready)(struct sock *sk);
> > +};
> > +
> > static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map);
> > static int pvcalls_back_release_active(struct xenbus_device *dev,
> > struct pvcalls_fedata *priv,
> > @@ -261,10 +273,83 @@ static int pvcalls_back_release(struct xenbus_device *dev,
> > return 0;
> > }
> >
> > +static void __pvcalls_back_accept(struct work_struct *work)
> > +{
> > +}
> > +
> > +static void pvcalls_pass_sk_data_ready(struct sock *sock)
> > +{
> > +}
> > +
> > static int pvcalls_back_bind(struct xenbus_device *dev,
> > struct xen_pvcalls_request *req)
> > {
> > - return 0;
> > + struct pvcalls_fedata *priv;
> > + int ret, err;
> > + struct socket *sock;
> > + struct sockpass_mapping *map = NULL;
>
> Pointless initializer.

I'll fix


> > + struct xen_pvcalls_response *rsp;
> > +
> > + priv = dev_get_drvdata(&dev->dev);
> > +
> > + map = kzalloc(sizeof(*map), GFP_KERNEL);
> > + if (map == NULL) {
> > + ret = -ENOMEM;
> > + goto out;
> > + }
> > +
> > + INIT_WORK(&map->register_work, __pvcalls_back_accept);
> > + spin_lock_init(&map->copy_lock);
> > + map->wq = alloc_workqueue("pvcalls_wq", WQ_UNBOUND, 1);
> > + if (!map->wq) {
> > + ret = -ENOMEM;
> > + kfree(map);
> > + goto out;
> > + }
> > +
> > + ret = sock_create(AF_INET, SOCK_STREAM, 0, &sock);
> > + if (ret < 0) {
> > + destroy_workqueue(map->wq);
> > + kfree(map);
> > + goto out;
> > + }
> > +
> > + ret = inet_bind(sock, (struct sockaddr *)&req->u.bind.addr,
> > + req->u.bind.len);
> > + if (ret < 0) {
> > + destroy_workqueue(map->wq);
> > + kfree(map);
>
> sock_release()?

OK


> > + goto out;
> > + }
> > +
> > + map->priv = priv;
> > + map->sock = sock;
> > + map->id = req->u.bind.id;
> > +
> > + down(&priv->socket_lock);
> > + err = radix_tree_insert(&priv->socketpass_mappings, map->id,
> > + map);
> > + up(&priv->socket_lock);
> > + if (err) {
> > + ret = err;
> > + destroy_workqueue(map->wq);
> > + kfree(map);
>
> sock_release()?

OK


> > + goto out;
> > + }
> > +
> > + write_lock_bh(&sock->sk->sk_callback_lock);
> > + map->saved_data_ready = sock->sk->sk_data_ready;
> > + sock->sk->sk_user_data = map;
> > + sock->sk->sk_data_ready = pvcalls_pass_sk_data_ready;
> > + write_unlock_bh(&sock->sk->sk_callback_lock);
> > +
> > +out:
> > + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> > + rsp->req_id = req->req_id;
> > + rsp->cmd = req->cmd;
> > + rsp->u.bind.id = req->u.bind.id;
> > + rsp->ret = ret;
> > + return ret;
>
> return 0?

Yes

2017-06-14 00:47:19

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 11/18] xen/pvcalls: implement accept command

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > Implement the accept command by calling inet_accept. To avoid blocking
> > in the kernel, call inet_accept(O_NONBLOCK) from a workqueue, which get
> > scheduled on sk_data_ready (for a passive socket, it means that there
> > are connections to accept).
> >
> > Use the reqcopy field to store the request. Accept the new socket from
> > the delayed work function, create a new sock_mapping for it, map
> > the indexes page and data ring, and reply to the other end. Allocate an
> > ioworker for the socket.
> >
> > Only support one outstanding blocking accept request for every socket at
> > any time.
> >
> > Add a field to sock_mapping to remember the passive socket from which an
> > active socket was created.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 109 ++++++++++++++++++++++++++++++++++++++++++++-
> > 1 file changed, 108 insertions(+), 1 deletion(-)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index a75586e..f1173f4 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -65,6 +65,7 @@ struct pvcalls_ioworker {
> > struct sock_mapping {
> > struct list_head list;
> > struct pvcalls_fedata *priv;
> > + struct sockpass_mapping *sockpass;
> > struct socket *sock;
> > uint64_t id;
> > grant_ref_t ref;
> > @@ -275,10 +276,79 @@ static int pvcalls_back_release(struct xenbus_device *dev,
> >
> > static void __pvcalls_back_accept(struct work_struct *work)
> > {
> > + struct sockpass_mapping *mappass = container_of(
> > + work, struct sockpass_mapping, register_work);
> > + struct sock_mapping *map;
> > + struct pvcalls_ioworker *iow;
> > + struct pvcalls_fedata *priv;
> > + struct socket *sock;
> > + struct xen_pvcalls_response *rsp;
> > + struct xen_pvcalls_request *req;
> > + int notify;
> > + int ret = -EINVAL;
> > + unsigned long flags;
> > +
> > + priv = mappass->priv;
> > + /* We only need to check the value of "cmd" atomically on read. */
> > + spin_lock_irqsave(&mappass->copy_lock, flags);
> > + req = &mappass->reqcopy;
> > + if (req->cmd != PVCALLS_ACCEPT) {
> > + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> > + return;
> > + }
> > + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>
> What about:
> req = &mappass->reqcopy;
> if (ACCESS_ONCE(req->cmd) != PVCALLS_ACCEPT)
> return;
>
> I can't see the need for taking a lock here.

Sure, good idea


> > +
> > + sock = sock_alloc();
> > + if (sock == NULL)
> > + goto out_error;
> > + sock->type = mappass->sock->type;
> > + sock->ops = mappass->sock->ops;
> > +
> > + ret = inet_accept(mappass->sock, sock, O_NONBLOCK, true);
> > + if (ret == -EAGAIN) {
> > + sock_release(sock);
> > + goto out_error;
> > + }
> > +
> > + map = pvcalls_new_active_socket(priv,
> > + req->u.accept.id_new,
> > + req->u.accept.ref,
> > + req->u.accept.evtchn,
> > + sock);
> > + if (!map) {
> > + sock_release(sock);
> > + goto out_error;
> > + }
> > +
> > + map->sockpass = mappass;
> > + iow = &map->ioworker;
> > + atomic_inc(&map->read);
> > + atomic_inc(&map->io);
> > + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
> > +
> > +out_error:
> > + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> > + rsp->req_id = req->req_id;
> > + rsp->cmd = req->cmd;
> > + rsp->u.accept.id = req->u.accept.id;
> > + rsp->ret = ret;
> > + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
> > + if (notify)
> > + notify_remote_via_irq(priv->irq);
> > +
> > + spin_lock_irqsave(&mappass->copy_lock, flags);
> > + mappass->reqcopy.cmd = 0;
> > + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>
> ACCESS_ONCE(mappass->reqcopy.cmd) = 0;

OK


> > }
> >
> > static void pvcalls_pass_sk_data_ready(struct sock *sock)
> > {
> > + struct sockpass_mapping *mappass = sock->sk_user_data;
> > +
> > + if (mappass == NULL)
> > + return;
> > +
> > + queue_work(mappass->wq, &mappass->register_work);
> > }
> >
> > static int pvcalls_back_bind(struct xenbus_device *dev,
> > @@ -380,7 +450,44 @@ static int pvcalls_back_listen(struct xenbus_device *dev,
> > static int pvcalls_back_accept(struct xenbus_device *dev,
> > struct xen_pvcalls_request *req)
> > {
> > - return 0;
> > + struct pvcalls_fedata *priv;
> > + struct sockpass_mapping *mappass;
> > + int ret = -EINVAL;
> > + struct xen_pvcalls_response *rsp;
> > + unsigned long flags;
> > +
> > + priv = dev_get_drvdata(&dev->dev);
> > +
> > + mappass = radix_tree_lookup(&priv->socketpass_mappings,
> > + req->u.accept.id);
> > + if (mappass == NULL)
> > + goto out_error;
> > +
> > + /*
> > + * Limitation of the current implementation: only support one
> > + * concurrent accept or poll call on one socket.
> > + */
> > + spin_lock_irqsave(&mappass->copy_lock, flags);
> > + if (mappass->reqcopy.cmd != 0) {
> > + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> > + ret = -EINTR;
> > + goto out_error;
> > + }
> > +
> > + mappass->reqcopy = *req;
>
> This time you need the lock, however you should use:
>
> ACCESS_ONCE(mappass->reqcopy) = *req;

I don't think that guarantees atomic accesses to the cmd field of the
struct. Shouldn't this be:

ACCESS_ONCE(mappass->reqcopy.cmd) = req->cmd;
mappass->reqcopy = *req;


> > + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> > + queue_work(mappass->wq, &mappass->register_work);
> > +
> > + /* Tell the caller we don't need to send back a notification yet */
> > + return -1;
> > +
> > +out_error:
> > + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> > + rsp->req_id = req->req_id;
> > + rsp->cmd = req->cmd;
> > + rsp->u.accept.id = req->u.accept.id;
> > + rsp->ret = ret;
> > + return ret;
>
> return 0?

Yes, thanks

2017-06-14 00:47:39

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 12/18] xen/pvcalls: implement poll command

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > Implement poll on passive sockets by requesting a delayed response with
> > mappass->reqcopy, and reply back when there is data on the passive
> > socket.
> >
> > Poll on active socket is unimplemented as by the spec, as the frontend
> > should just wait for events and check the indexes on the indexes page.
> >
> > Only support one outstanding poll (or accept) request for every passive
> > socket at any given time.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 75 ++++++++++++++++++++++++++++++++++++++++++++--
> > 1 file changed, 73 insertions(+), 2 deletions(-)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index f1173f4..82f350d 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -344,11 +344,33 @@ static void __pvcalls_back_accept(struct work_struct *work)
> > static void pvcalls_pass_sk_data_ready(struct sock *sock)
> > {
> > struct sockpass_mapping *mappass = sock->sk_user_data;
> > + struct pvcalls_fedata *priv;
> > + struct xen_pvcalls_response *rsp;
> > + unsigned long flags;
> > + int notify;
> >
> > if (mappass == NULL)
> > return;
> >
> > - queue_work(mappass->wq, &mappass->register_work);
> > + priv = mappass->priv;
> > + spin_lock_irqsave(&mappass->copy_lock, flags);
> > + if (mappass->reqcopy.cmd == PVCALLS_POLL) {
> > + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> > + rsp->req_id = mappass->reqcopy.req_id;
> > + rsp->u.poll.id = mappass->reqcopy.u.poll.id;
> > + rsp->cmd = mappass->reqcopy.cmd;
> > + rsp->ret = 0;
> > +
> > + mappass->reqcopy.cmd = 0;
> > + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> > +
> > + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
> > + if (notify)
> > + notify_remote_via_irq(mappass->priv->irq);
> > + } else {
> > + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> > + queue_work(mappass->wq, &mappass->register_work);
> > + }
> > }
> >
> > static int pvcalls_back_bind(struct xenbus_device *dev,
> > @@ -493,7 +515,56 @@ static int pvcalls_back_accept(struct xenbus_device *dev,
> > static int pvcalls_back_poll(struct xenbus_device *dev,
> > struct xen_pvcalls_request *req)
> > {
> > - return 0;
> > + struct pvcalls_fedata *priv;
> > + struct sockpass_mapping *mappass;
> > + struct xen_pvcalls_response *rsp;
> > + struct inet_connection_sock *icsk;
> > + struct request_sock_queue *queue;
> > + unsigned long flags;
> > + int ret;
> > + bool data;
> > +
> > + priv = dev_get_drvdata(&dev->dev);
> > +
> > + mappass = radix_tree_lookup(&priv->socketpass_mappings, req->u.poll.id);
> > + if (mappass == NULL)
> > + return -EINVAL;
> > +
> > + /*
> > + * Limitation of the current implementation: only support one
> > + * concurrent accept or poll call on one socket.
> > + */
> > + spin_lock_irqsave(&mappass->copy_lock, flags);
> > + if (mappass->reqcopy.cmd != 0) {
> > + ret = -EINTR;
> > + goto out;
> > + }
> > +
> > + mappass->reqcopy = *req;
> > + icsk = inet_csk(mappass->sock->sk);
> > + queue = &icsk->icsk_accept_queue;
> > + spin_lock(&queue->rskq_lock);
> > + data = queue->rskq_accept_head != NULL;
> > + spin_unlock(&queue->rskq_lock);
> > + if (data) {
> > + mappass->reqcopy.cmd = 0;
> > + ret = 0;
> > + goto out;
> > + }
> > + spin_unlock_irqrestore(&mappass->copy_lock, flags);

I'll also need to change these mappass->reqcopy.cmd accesses to
ACCESS_ONCE to be consistent with the changes to the previous patch
(need to become atomic)


> > + /* Tell the caller we don't need to send back a notification yet */
> > + return -1;
> > +
> > +out:
> > + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> > +
> > + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> > + rsp->req_id = req->req_id;
> > + rsp->cmd = req->cmd;
> > + rsp->u.poll.id = req->u.poll.id;
> > + rsp->ret = ret;
> > + return ret;
>
> return 0;

Yes

2017-06-14 00:47:42

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 13/18] xen/pvcalls: implement release command

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > Release both active and passive sockets. For active sockets, make sure
> > to avoid possible conflicts with the ioworker reading/writing to those
> > sockets concurrently. Set map->release to let the ioworker know
> > atomically that the socket will be released soon, then wait until the
> > ioworker finishes (flush_work).
> >
> > Unmap indexes pages and data rings.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 72 +++++++++++++++++++++++++++++++++++++++++++++-
> > 1 file changed, 71 insertions(+), 1 deletion(-)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index 82f350d..b541887 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -265,13 +265,83 @@ static int pvcalls_back_release_active(struct xenbus_device *dev,
> > struct pvcalls_fedata *priv,
> > struct sock_mapping *map)
> > {
> > + disable_irq(map->irq);
> > + if (map->sock->sk != NULL) {
> > + write_lock_bh(&map->sock->sk->sk_callback_lock);
> > + map->sock->sk->sk_user_data = NULL;
> > + map->sock->sk->sk_data_ready = map->saved_data_ready;
> > + write_unlock_bh(&map->sock->sk->sk_callback_lock);
> > + }
> > +
> > + atomic_set(&map->release, 1);
> > + flush_work(&map->ioworker.register_work);
> > +
> > + down(&priv->socket_lock);
> > + list_del(&map->list);
> > + up(&priv->socket_lock);
> > +
> > + xenbus_unmap_ring_vfree(dev, (void *)map->bytes);
> > + xenbus_unmap_ring_vfree(dev, (void *)map->ring);
>
> I don't think you need the casts to (void*) here.

OK

> > + unbind_from_irqhandler(map->irq, map);
> > +
> > + sock_release(map->sock);
> > + kfree(map);
> > +
> > + return 0;
> > +}
> > +
> > +static int pvcalls_back_release_passive(struct xenbus_device *dev,
> > + struct pvcalls_fedata *priv,
> > + struct sockpass_mapping *mappass)
> > +{
> > + if (mappass->sock->sk != NULL) {
> > + write_lock_bh(&mappass->sock->sk->sk_callback_lock);
> > + mappass->sock->sk->sk_user_data = NULL;
> > + mappass->sock->sk->sk_data_ready = mappass->saved_data_ready;
> > + write_unlock_bh(&mappass->sock->sk->sk_callback_lock);
> > + }
> > + down(&priv->socket_lock);
> > + radix_tree_delete(&priv->socketpass_mappings, mappass->id);
> > + sock_release(mappass->sock);
> > + flush_workqueue(mappass->wq);
> > + destroy_workqueue(mappass->wq);
> > + kfree(mappass);
> > + up(&priv->socket_lock);
> > +
> > return 0;
> > }
> >
> > static int pvcalls_back_release(struct xenbus_device *dev,
> > struct xen_pvcalls_request *req)
> > {
> > - return 0;
> > + struct pvcalls_fedata *priv;
> > + struct sock_mapping *map, *n;
> > + struct sockpass_mapping *mappass;
> > + int ret = 0;
> > + struct xen_pvcalls_response *rsp;
> > +
> > + priv = dev_get_drvdata(&dev->dev);
> > +
> > + list_for_each_entry_safe(map, n, &priv->socket_mappings, list) {
> > + if (map->id == req->u.release.id) {
> > + ret = pvcalls_back_release_active(dev, priv, map);
> > + goto out;
> > + }
> > + }
> > + mappass = radix_tree_lookup(&priv->socketpass_mappings,
> > + req->u.release.id);
> > + if (mappass != NULL) {
> > + ret = pvcalls_back_release_passive(dev, priv, mappass);
> > + goto out;
> > + }
> > +
> > +out:
> > + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> > + rsp->req_id = req->req_id;
> > + rsp->u.release.id = req->u.release.id;
> > + rsp->cmd = req->cmd;
> > + rsp->ret = ret;
> > + return ret;
>
> return 0;

OK

2017-06-14 00:54:41

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 14/18] xen/pvcalls: disconnect and module_exit

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > Implement backend_disconnect. Call pvcalls_back_release_active on active
> > sockets and pvcalls_back_release_passive on passive sockets.
> >
> > Implement module_exit by calling backend_disconnect on frontend
> > connections.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 49 ++++++++++++++++++++++++++++++++++++++++++++++
> > 1 file changed, 49 insertions(+)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index b541887..6afe7a0 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -800,6 +800,38 @@ static int backend_connect(struct xenbus_device *dev)
> >
> > static int backend_disconnect(struct xenbus_device *dev)
> > {
> > + struct pvcalls_fedata *priv;
> > + struct sock_mapping *map, *n;
> > + struct sockpass_mapping *mappass;
> > + struct radix_tree_iter iter;
> > + void **slot;
> > +
> > +
> > + priv = dev_get_drvdata(&dev->dev);
> > +
> > + list_for_each_entry_safe(map, n, &priv->socket_mappings, list) {
> > + pvcalls_back_release_active(dev, priv, map);
> > + }
>
> You can drop the {}

OK


> > +
> > + radix_tree_for_each_slot(slot, &priv->socketpass_mappings, &iter, 0) {
> > + mappass = radix_tree_deref_slot(slot);
> > + if (!mappass || radix_tree_exception(mappass)) {
>
> This looks fishy.
>
> You might call radix_tree_deref_retry(NULL). Right now this is okay,
> but you depend on the radix tree internals here.

I'll avoid id


> > + if (radix_tree_deref_retry(mappass)) {
> > + slot = radix_tree_iter_retry(&iter);
> > + continue;
>
> The continue; statement is pointless here.

I'll remove it


> > + }
> > + } else
> > + pvcalls_back_release_passive(dev, priv, mappass);
> > + }
> > +
> > + xenbus_unmap_ring_vfree(dev, (void *)priv->sring);
>
> Drop the cast.

OK


> > + unbind_from_irqhandler(priv->irq, dev);
> > +
> > + list_del(&priv->list);
> > + destroy_workqueue(priv->wq);
> > + kfree(priv);
> > + dev_set_drvdata(&dev->dev, NULL);
> > +
> > return 0;
> > }
> >
> > @@ -993,3 +1025,20 @@ static int __init pvcalls_back_init(void)
> > return 0;
> > }
> > module_init(pvcalls_back_init);
> > +
> > +static void __exit pvcalls_back_fin(void)
> > +{
> > + struct pvcalls_fedata *priv, *npriv;
> > +
> > + down(&pvcalls_back_global.frontends_lock);
> > + list_for_each_entry_safe(priv, npriv, &pvcalls_back_global.frontends,
> > + list) {
> > + backend_disconnect(priv->dev);
> > + }
> > + up(&pvcalls_back_global.frontends_lock);
> > +
> > + xenbus_unregister_driver(&pvcalls_back_driver);
> > + memset(&pvcalls_back_global, 0, sizeof(pvcalls_back_global));
>
> Why?

legacy code, I'll remove it


> Juergen
>
> > +}
> > +
> > +module_exit(pvcalls_back_fin);
> >
>

2017-06-14 00:56:32

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 15/18] xen/pvcalls: implement the ioworker functions

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > We have one ioworker per socket. Each ioworker goes through the list of
> > outstanding read/write requests. Once all requests have been dealt with,
> > it returns.
> >
> > We use one atomic counter per socket for "read" operations and one
> > for "write" operations to keep track of the reads/writes to do.
> >
> > We also use one atomic counter ("io") per ioworker to keep track of how
> > many outstanding requests we have in total assigned to the ioworker. The
> > ioworker finishes when there are none.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 27 +++++++++++++++++++++++++++
> > 1 file changed, 27 insertions(+)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index 6afe7a0..0283d49 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -99,8 +99,35 @@ static int pvcalls_back_release_active(struct xenbus_device *dev,
> > struct pvcalls_fedata *priv,
> > struct sock_mapping *map);
> >
> > +static void pvcalls_conn_back_read(unsigned long opaque)
>
> Why not void *opaque? You could drop the cast below then.

Good idea


> > +{
> > +}
> > +
> > +static int pvcalls_conn_back_write(struct sock_mapping *map)
> > +{
> > + return 0;
> > +}
> > +
> > static void pvcalls_back_ioworker(struct work_struct *work)
> > {
> > + struct pvcalls_ioworker *ioworker = container_of(work,
> > + struct pvcalls_ioworker, register_work);
> > + struct sock_mapping *map = container_of(ioworker, struct sock_mapping,
> > + ioworker);
> > +
> > + while (atomic_read(&map->io) > 0) {
> > + if (atomic_read(&map->release) > 0) {
> > + atomic_set(&map->release, 0);
> > + return;
> > + }
> > +
> > + if (atomic_read(&map->read) > 0)
> > + pvcalls_conn_back_read((unsigned long)map);
> > + if (atomic_read(&map->write) > 0)
> > + pvcalls_conn_back_write(map);
> > +
> > + atomic_dec(&map->io);
> > + }
> > }
> >
> > static int pvcalls_back_socket(struct xenbus_device *dev,
> >
>

2017-06-14 00:58:01

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 16/18] xen/pvcalls: implement read

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > When an active socket has data available, increment the io and read
> > counters, and schedule the ioworker.
> >
> > Implement the read function by reading from the socket, writing the data
> > to the data ring.
> >
> > Set in_error on error.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++
> > 1 file changed, 85 insertions(+)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index 0283d49..e7d2b85 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -101,6 +101,81 @@ static int pvcalls_back_release_active(struct xenbus_device *dev,
> >
> > static void pvcalls_conn_back_read(unsigned long opaque)
> > {
> > + struct sock_mapping *map = (struct sock_mapping *)opaque;
> > + struct msghdr msg;
> > + struct kvec vec[2];
> > + RING_IDX cons, prod, size, wanted, array_size, masked_prod, masked_cons;
> > + int32_t error;
> > + struct pvcalls_data_intf *intf = map->ring;
> > + struct pvcalls_data *data = &map->data;
> > + unsigned long flags;
> > + int ret;
> > +
> > + array_size = XEN_FLEX_RING_SIZE(map->ring_order);
> > + cons = intf->in_cons;
> > + prod = intf->in_prod;
> > + error = intf->in_error;
> > + /* read the indexes first, then deal with the data */
> > + virt_mb();
> > +
> > + if (error)
> > + return;
> > +
> > + size = pvcalls_queued(prod, cons, array_size);
> > + if (size >= array_size)
> > + return;
> > + spin_lock_irqsave(&map->sock->sk->sk_receive_queue.lock, flags);
> > + if (skb_queue_empty(&map->sock->sk->sk_receive_queue)) {
> > + atomic_set(&map->read, 0);
> > + spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock,
> > + flags);
> > + return;
> > + }
> > + spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock, flags);
> > + wanted = array_size - size;
> > + masked_prod = pvcalls_mask(prod, array_size);
> > + masked_cons = pvcalls_mask(cons, array_size);
> > +
> > + memset(&msg, 0, sizeof(msg));
> > + msg.msg_iter.type = ITER_KVEC|WRITE;
> > + msg.msg_iter.count = wanted;
> > + if (masked_prod < masked_cons) {
> > + vec[0].iov_base = data->in + masked_prod;
> > + vec[0].iov_len = wanted;
> > + msg.msg_iter.kvec = vec;
> > + msg.msg_iter.nr_segs = 1;
> > + } else {
> > + vec[0].iov_base = data->in + masked_prod;
> > + vec[0].iov_len = array_size - masked_prod;
> > + vec[1].iov_base = data->in;
> > + vec[1].iov_len = wanted - vec[0].iov_len;
> > + msg.msg_iter.kvec = vec;
> > + msg.msg_iter.nr_segs = 2;
> > + }
> > +
> > + atomic_set(&map->read, 0);
> > + ret = inet_recvmsg(map->sock, &msg, wanted, MSG_DONTWAIT);
> > + WARN_ON(ret > 0 && ret > wanted);
>
> wanted is always > 0, so you can omit the ret > 0 test.

Good point


>
> > + if (ret == -EAGAIN) /* shouldn't happen */
> > + return;
> > + if (!ret)
> > + ret = -ENOTCONN;
> > + spin_lock_irqsave(&map->sock->sk->sk_receive_queue.lock, flags);
> > + if (ret > 0 && !skb_queue_empty(&map->sock->sk->sk_receive_queue))
> > + atomic_inc(&map->read);
> > + spin_unlock_irqrestore(&map->sock->sk->sk_receive_queue.lock, flags);
> > +
> > + /* write the data, then modify the indexes */
> > + virt_wmb();
> > + if (ret < 0)
> > + intf->in_error = ret;
> > + else
> > + intf->in_prod = prod + ret;
> > + /* update the indexes, then notify the other end */
> > + virt_wmb();
> > + notify_remote_via_irq(map->irq);
> > +
> > + return;
> > }
> >
> > static int pvcalls_conn_back_write(struct sock_mapping *map)
> > @@ -173,6 +248,16 @@ static void pvcalls_sk_state_change(struct sock *sock)
> >
> > static void pvcalls_sk_data_ready(struct sock *sock)
> > {
> > + struct sock_mapping *map = sock->sk_user_data;
> > + struct pvcalls_ioworker *iow;
> > +
> > + if (map == NULL)
> > + return;
> > +
> > + iow = &map->ioworker;
> > + atomic_inc(&map->read);
> > + atomic_inc(&map->io);
> > + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
> > }
> >
> > static struct sock_mapping *pvcalls_new_active_socket(
> >
>

2017-06-14 01:00:20

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 17/18] xen/pvcalls: implement write

On Tue, 13 Jun 2017, Juergen Gross wrote:
> On 02/06/17 21:31, Stefano Stabellini wrote:
> > When the other end notifies us that there is data to be written
> > (pvcalls_back_conn_event), increment the io and write counters, and
> > schedule the ioworker.
> >
> > Implement the write function called by ioworker by reading the data from
> > the data ring, writing it to the socket by calling inet_sendmsg.
> >
> > Set out_error on error.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-back.c | 74 +++++++++++++++++++++++++++++++++++++++++++++-
> > 1 file changed, 73 insertions(+), 1 deletion(-)
> >
> > diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> > index e7d2b85..fe3e70f 100644
> > --- a/drivers/xen/pvcalls-back.c
> > +++ b/drivers/xen/pvcalls-back.c
> > @@ -180,7 +180,66 @@ static void pvcalls_conn_back_read(unsigned long opaque)
> >
> > static int pvcalls_conn_back_write(struct sock_mapping *map)
> > {
> > - return 0;
> > + struct pvcalls_data_intf *intf = map->ring;
> > + struct pvcalls_data *data = &map->data;
> > + struct msghdr msg;
> > + struct kvec vec[2];
> > + RING_IDX cons, prod, size, ring_size;
> > + int ret;
> > +
> > + cons = intf->out_cons;
> > + prod = intf->out_prod;
> > + /* read the indexes before dealing with the data */
> > + virt_mb();
> > +
> > + ring_size = XEN_FLEX_RING_SIZE(map->ring_order);
> > + size = pvcalls_queued(prod, cons, ring_size);
> > + if (size == 0)
> > + return 0;
> > +
> > + memset(&msg, 0, sizeof(msg));
> > + msg.msg_flags |= MSG_DONTWAIT;
> > + msg.msg_iter.type = ITER_KVEC|READ;
> > + msg.msg_iter.count = size;
> > + if (pvcalls_mask(prod, ring_size) > pvcalls_mask(cons, ring_size)) {
> > + vec[0].iov_base = data->out + pvcalls_mask(cons, ring_size);
> > + vec[0].iov_len = size;
> > + msg.msg_iter.kvec = vec;
> > + msg.msg_iter.nr_segs = 1;
> > + } else {
> > + vec[0].iov_base = data->out + pvcalls_mask(cons, ring_size);
> > + vec[0].iov_len = ring_size - pvcalls_mask(cons, ring_size);
> > + vec[1].iov_base = data->out;
> > + vec[1].iov_len = size - vec[0].iov_len;
> > + msg.msg_iter.kvec = vec;
> > + msg.msg_iter.nr_segs = 2;
> > + }
> > +
> > + atomic_set(&map->write, 0);
> > + ret = inet_sendmsg(map->sock, &msg, size);
> > + if (ret == -EAGAIN || ret < size) {
>
> Do you really want to do this for all errors?
> Or did you mean:
> if ((ret >= 0 && ret < size) || ret == -EAGAIN)

Yes, that's what I meant, thanks!


> > + atomic_inc(&map->write);
> > + atomic_inc(&map->io);
> > + }
> > + if (ret == -EAGAIN)
> > + return ret;
> > +
> > + /* write the data, then update the indexes */
> > + virt_wmb();
> > + if (ret < 0) {
> > + intf->out_error = ret;
> > + } else {
> > + intf->out_error = 0;
> > + intf->out_cons = cons + ret;
> > + prod = intf->out_prod;
> > + }
> > + /* update the indexes, then notify the other end */
> > + virt_wmb();
> > + if (prod != cons + ret)
> > + atomic_inc(&map->write);
> > + notify_remote_via_irq(map->irq);
> > +
> > + return ret;
> > }
> >
> > static void pvcalls_back_ioworker(struct work_struct *work)
> > @@ -837,6 +896,19 @@ static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
> >
> > static irqreturn_t pvcalls_back_conn_event(int irq, void *sock_map)
> > {
> > + struct sock_mapping *map = sock_map;
> > + struct pvcalls_ioworker *iow;
> > +
> > + if (map == NULL || map->sock == NULL || map->sock->sk == NULL ||
> > + map->sock->sk->sk_user_data != map)
> > + return IRQ_HANDLED;
> > +
> > + iow = &map->ioworker;
> > +
> > + atomic_inc(&map->write);
> > + atomic_inc(&map->io);
> > + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
> > +
> > return IRQ_HANDLED;
> > }
> >
> >
>

2017-06-14 06:32:12

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 11/18] xen/pvcalls: implement accept command

On 14/06/17 02:47, Stefano Stabellini wrote:
> On Tue, 13 Jun 2017, Juergen Gross wrote:
>> On 02/06/17 21:31, Stefano Stabellini wrote:
>>> Implement the accept command by calling inet_accept. To avoid blocking
>>> in the kernel, call inet_accept(O_NONBLOCK) from a workqueue, which get
>>> scheduled on sk_data_ready (for a passive socket, it means that there
>>> are connections to accept).
>>>
>>> Use the reqcopy field to store the request. Accept the new socket from
>>> the delayed work function, create a new sock_mapping for it, map
>>> the indexes page and data ring, and reply to the other end. Allocate an
>>> ioworker for the socket.
>>>
>>> Only support one outstanding blocking accept request for every socket at
>>> any time.
>>>
>>> Add a field to sock_mapping to remember the passive socket from which an
>>> active socket was created.
>>>
>>> Signed-off-by: Stefano Stabellini <[email protected]>
>>> CC: [email protected]
>>> CC: [email protected]
>>> ---
>>> drivers/xen/pvcalls-back.c | 109 ++++++++++++++++++++++++++++++++++++++++++++-
>>> 1 file changed, 108 insertions(+), 1 deletion(-)
>>>
>>> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
>>> index a75586e..f1173f4 100644
>>> --- a/drivers/xen/pvcalls-back.c
>>> +++ b/drivers/xen/pvcalls-back.c
>>> @@ -65,6 +65,7 @@ struct pvcalls_ioworker {
>>> struct sock_mapping {
>>> struct list_head list;
>>> struct pvcalls_fedata *priv;
>>> + struct sockpass_mapping *sockpass;
>>> struct socket *sock;
>>> uint64_t id;
>>> grant_ref_t ref;
>>> @@ -275,10 +276,79 @@ static int pvcalls_back_release(struct xenbus_device *dev,
>>>
>>> static void __pvcalls_back_accept(struct work_struct *work)
>>> {
>>> + struct sockpass_mapping *mappass = container_of(
>>> + work, struct sockpass_mapping, register_work);
>>> + struct sock_mapping *map;
>>> + struct pvcalls_ioworker *iow;
>>> + struct pvcalls_fedata *priv;
>>> + struct socket *sock;
>>> + struct xen_pvcalls_response *rsp;
>>> + struct xen_pvcalls_request *req;
>>> + int notify;
>>> + int ret = -EINVAL;
>>> + unsigned long flags;
>>> +
>>> + priv = mappass->priv;
>>> + /* We only need to check the value of "cmd" atomically on read. */
>>> + spin_lock_irqsave(&mappass->copy_lock, flags);
>>> + req = &mappass->reqcopy;
>>> + if (req->cmd != PVCALLS_ACCEPT) {
>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>>> + return;
>>> + }
>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>>
>> What about:
>> req = &mappass->reqcopy;
>> if (ACCESS_ONCE(req->cmd) != PVCALLS_ACCEPT)
>> return;
>>
>> I can't see the need for taking a lock here.
>
> Sure, good idea
>
>
>>> +
>>> + sock = sock_alloc();
>>> + if (sock == NULL)
>>> + goto out_error;
>>> + sock->type = mappass->sock->type;
>>> + sock->ops = mappass->sock->ops;
>>> +
>>> + ret = inet_accept(mappass->sock, sock, O_NONBLOCK, true);
>>> + if (ret == -EAGAIN) {
>>> + sock_release(sock);
>>> + goto out_error;
>>> + }
>>> +
>>> + map = pvcalls_new_active_socket(priv,
>>> + req->u.accept.id_new,
>>> + req->u.accept.ref,
>>> + req->u.accept.evtchn,
>>> + sock);
>>> + if (!map) {
>>> + sock_release(sock);
>>> + goto out_error;
>>> + }
>>> +
>>> + map->sockpass = mappass;
>>> + iow = &map->ioworker;
>>> + atomic_inc(&map->read);
>>> + atomic_inc(&map->io);
>>> + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
>>> +
>>> +out_error:
>>> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
>>> + rsp->req_id = req->req_id;
>>> + rsp->cmd = req->cmd;
>>> + rsp->u.accept.id = req->u.accept.id;
>>> + rsp->ret = ret;
>>> + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
>>> + if (notify)
>>> + notify_remote_via_irq(priv->irq);
>>> +
>>> + spin_lock_irqsave(&mappass->copy_lock, flags);
>>> + mappass->reqcopy.cmd = 0;
>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>>
>> ACCESS_ONCE(mappass->reqcopy.cmd) = 0;
>
> OK
>
>
>>> }
>>>
>>> static void pvcalls_pass_sk_data_ready(struct sock *sock)
>>> {
>>> + struct sockpass_mapping *mappass = sock->sk_user_data;
>>> +
>>> + if (mappass == NULL)
>>> + return;
>>> +
>>> + queue_work(mappass->wq, &mappass->register_work);
>>> }
>>>
>>> static int pvcalls_back_bind(struct xenbus_device *dev,
>>> @@ -380,7 +450,44 @@ static int pvcalls_back_listen(struct xenbus_device *dev,
>>> static int pvcalls_back_accept(struct xenbus_device *dev,
>>> struct xen_pvcalls_request *req)
>>> {
>>> - return 0;
>>> + struct pvcalls_fedata *priv;
>>> + struct sockpass_mapping *mappass;
>>> + int ret = -EINVAL;
>>> + struct xen_pvcalls_response *rsp;
>>> + unsigned long flags;
>>> +
>>> + priv = dev_get_drvdata(&dev->dev);
>>> +
>>> + mappass = radix_tree_lookup(&priv->socketpass_mappings,
>>> + req->u.accept.id);
>>> + if (mappass == NULL)
>>> + goto out_error;
>>> +
>>> + /*
>>> + * Limitation of the current implementation: only support one
>>> + * concurrent accept or poll call on one socket.
>>> + */
>>> + spin_lock_irqsave(&mappass->copy_lock, flags);
>>> + if (mappass->reqcopy.cmd != 0) {
>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>>> + ret = -EINTR;
>>> + goto out_error;
>>> + }
>>> +
>>> + mappass->reqcopy = *req;
>>
>> This time you need the lock, however you should use:
>>
>> ACCESS_ONCE(mappass->reqcopy) = *req;
>
> I don't think that guarantees atomic accesses to the cmd field of the
> struct. Shouldn't this be:
>
> ACCESS_ONCE(mappass->reqcopy.cmd) = req->cmd;
> mappass->reqcopy = *req;

Hmm, what if the frontend changes cmd between those two accesses?
You either need another local buffer or you have to copy cmd via
ACCESS_ONCE() and the rest of *req separately (seems not to be
that hard: its just cmd, req_id and u).

BTW: Maybe you should use READ_ONCE() and WRITE_ONCE() instead of
ACCESS_ONCE(), as those seem to be preferred nowadays.


Juergen

2017-06-14 13:51:48

by Boris Ostrovsky

[permalink] [raw]
Subject: Re: [PATCH v3 14/18] xen/pvcalls: disconnect and module_exit


>>> static int backend_disconnect(struct xenbus_device *dev)
>>> {
>>> + struct pvcalls_fedata *priv;
>>> + struct sock_mapping *map, *n;
>>> + struct sockpass_mapping *mappass;
>>> + struct radix_tree_iter iter;
>>> + void **slot;
>>> +
>>> +
>>> + priv = dev_get_drvdata(&dev->dev);

Can you also rename priv to something else (like fedata)? And in other
routines too.

-boris

2017-06-14 19:15:36

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 14/18] xen/pvcalls: disconnect and module_exit

On Wed, 14 Jun 2017, Boris Ostrovsky wrote:
> >>> static int backend_disconnect(struct xenbus_device *dev)
> >>> {
> >>> + struct pvcalls_fedata *priv;
> >>> + struct sock_mapping *map, *n;
> >>> + struct sockpass_mapping *mappass;
> >>> + struct radix_tree_iter iter;
> >>> + void **slot;
> >>> +
> >>> +
> >>> + priv = dev_get_drvdata(&dev->dev);
>
> Can you also rename priv to something else (like fedata)? And in other
> routines too.

Yes, done

2017-06-14 19:27:17

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 11/18] xen/pvcalls: implement accept command

On Wed, 14 Jun 2017, Juergen Gross wrote:
> On 14/06/17 02:47, Stefano Stabellini wrote:
> > On Tue, 13 Jun 2017, Juergen Gross wrote:
> >> On 02/06/17 21:31, Stefano Stabellini wrote:
> >>> Implement the accept command by calling inet_accept. To avoid blocking
> >>> in the kernel, call inet_accept(O_NONBLOCK) from a workqueue, which get
> >>> scheduled on sk_data_ready (for a passive socket, it means that there
> >>> are connections to accept).
> >>>
> >>> Use the reqcopy field to store the request. Accept the new socket from
> >>> the delayed work function, create a new sock_mapping for it, map
> >>> the indexes page and data ring, and reply to the other end. Allocate an
> >>> ioworker for the socket.
> >>>
> >>> Only support one outstanding blocking accept request for every socket at
> >>> any time.
> >>>
> >>> Add a field to sock_mapping to remember the passive socket from which an
> >>> active socket was created.
> >>>
> >>> Signed-off-by: Stefano Stabellini <[email protected]>
> >>> CC: [email protected]
> >>> CC: [email protected]
> >>> ---
> >>> drivers/xen/pvcalls-back.c | 109 ++++++++++++++++++++++++++++++++++++++++++++-
> >>> 1 file changed, 108 insertions(+), 1 deletion(-)
> >>>
> >>> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> >>> index a75586e..f1173f4 100644
> >>> --- a/drivers/xen/pvcalls-back.c
> >>> +++ b/drivers/xen/pvcalls-back.c
> >>> @@ -65,6 +65,7 @@ struct pvcalls_ioworker {
> >>> struct sock_mapping {
> >>> struct list_head list;
> >>> struct pvcalls_fedata *priv;
> >>> + struct sockpass_mapping *sockpass;
> >>> struct socket *sock;
> >>> uint64_t id;
> >>> grant_ref_t ref;
> >>> @@ -275,10 +276,79 @@ static int pvcalls_back_release(struct xenbus_device *dev,
> >>>
> >>> static void __pvcalls_back_accept(struct work_struct *work)
> >>> {
> >>> + struct sockpass_mapping *mappass = container_of(
> >>> + work, struct sockpass_mapping, register_work);
> >>> + struct sock_mapping *map;
> >>> + struct pvcalls_ioworker *iow;
> >>> + struct pvcalls_fedata *priv;
> >>> + struct socket *sock;
> >>> + struct xen_pvcalls_response *rsp;
> >>> + struct xen_pvcalls_request *req;
> >>> + int notify;
> >>> + int ret = -EINVAL;
> >>> + unsigned long flags;
> >>> +
> >>> + priv = mappass->priv;
> >>> + /* We only need to check the value of "cmd" atomically on read. */
> >>> + spin_lock_irqsave(&mappass->copy_lock, flags);
> >>> + req = &mappass->reqcopy;
> >>> + if (req->cmd != PVCALLS_ACCEPT) {
> >>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> >>> + return;
> >>> + }
> >>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> >>
> >> What about:
> >> req = &mappass->reqcopy;
> >> if (ACCESS_ONCE(req->cmd) != PVCALLS_ACCEPT)
> >> return;
> >>
> >> I can't see the need for taking a lock here.
> >
> > Sure, good idea
> >
> >
> >>> +
> >>> + sock = sock_alloc();
> >>> + if (sock == NULL)
> >>> + goto out_error;
> >>> + sock->type = mappass->sock->type;
> >>> + sock->ops = mappass->sock->ops;
> >>> +
> >>> + ret = inet_accept(mappass->sock, sock, O_NONBLOCK, true);
> >>> + if (ret == -EAGAIN) {
> >>> + sock_release(sock);
> >>> + goto out_error;
> >>> + }
> >>> +
> >>> + map = pvcalls_new_active_socket(priv,
> >>> + req->u.accept.id_new,
> >>> + req->u.accept.ref,
> >>> + req->u.accept.evtchn,
> >>> + sock);
> >>> + if (!map) {
> >>> + sock_release(sock);
> >>> + goto out_error;
> >>> + }
> >>> +
> >>> + map->sockpass = mappass;
> >>> + iow = &map->ioworker;
> >>> + atomic_inc(&map->read);
> >>> + atomic_inc(&map->io);
> >>> + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
> >>> +
> >>> +out_error:
> >>> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> >>> + rsp->req_id = req->req_id;
> >>> + rsp->cmd = req->cmd;
> >>> + rsp->u.accept.id = req->u.accept.id;
> >>> + rsp->ret = ret;
> >>> + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
> >>> + if (notify)
> >>> + notify_remote_via_irq(priv->irq);
> >>> +
> >>> + spin_lock_irqsave(&mappass->copy_lock, flags);
> >>> + mappass->reqcopy.cmd = 0;
> >>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> >>
> >> ACCESS_ONCE(mappass->reqcopy.cmd) = 0;
> >
> > OK
> >
> >
> >>> }
> >>>
> >>> static void pvcalls_pass_sk_data_ready(struct sock *sock)
> >>> {
> >>> + struct sockpass_mapping *mappass = sock->sk_user_data;
> >>> +
> >>> + if (mappass == NULL)
> >>> + return;
> >>> +
> >>> + queue_work(mappass->wq, &mappass->register_work);
> >>> }
> >>>
> >>> static int pvcalls_back_bind(struct xenbus_device *dev,
> >>> @@ -380,7 +450,44 @@ static int pvcalls_back_listen(struct xenbus_device *dev,
> >>> static int pvcalls_back_accept(struct xenbus_device *dev,
> >>> struct xen_pvcalls_request *req)
> >>> {
> >>> - return 0;
> >>> + struct pvcalls_fedata *priv;
> >>> + struct sockpass_mapping *mappass;
> >>> + int ret = -EINVAL;
> >>> + struct xen_pvcalls_response *rsp;
> >>> + unsigned long flags;
> >>> +
> >>> + priv = dev_get_drvdata(&dev->dev);
> >>> +
> >>> + mappass = radix_tree_lookup(&priv->socketpass_mappings,
> >>> + req->u.accept.id);
> >>> + if (mappass == NULL)
> >>> + goto out_error;
> >>> +
> >>> + /*
> >>> + * Limitation of the current implementation: only support one
> >>> + * concurrent accept or poll call on one socket.
> >>> + */
> >>> + spin_lock_irqsave(&mappass->copy_lock, flags);
> >>> + if (mappass->reqcopy.cmd != 0) {
> >>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> >>> + ret = -EINTR;
> >>> + goto out_error;
> >>> + }
> >>> +
> >>> + mappass->reqcopy = *req;
> >>
> >> This time you need the lock, however you should use:
> >>
> >> ACCESS_ONCE(mappass->reqcopy) = *req;
> >
> > I don't think that guarantees atomic accesses to the cmd field of the
> > struct. Shouldn't this be:
> >
> > ACCESS_ONCE(mappass->reqcopy.cmd) = req->cmd;
> > mappass->reqcopy = *req;
>
> Hmm, what if the frontend changes cmd between those two accesses?

This cannot happen because req is a copy of the guest request here.
However, it is possible that __pvcalls_back_accept is racing against
pvcalls_back_accept. In that case, I would need to make sure not only
that cmd is written atomically, but now that I am thinking about this,
that cmd is written *after* the rest of reqcopy: otherwise
__pvcalls_back_accept could see a partially updated reqcopy.

It would be possible to do this with atomic accesses and barriers, but
I am thinking that it is not worth the effort. I am tempted to roll back
to the previous version with spinlocks.


> You either need another local buffer or you have to copy cmd via
> ACCESS_ONCE() and the rest of *req separately (seems not to be
> that hard: its just cmd, req_id and u).
>
> BTW: Maybe you should use READ_ONCE() and WRITE_ONCE() instead of
> ACCESS_ONCE(), as those seem to be preferred nowadays.

2017-06-14 21:03:12

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 06/18] xen/pvcalls: handle commands from the frontend

On Mon, 12 Jun 2017, Boris Ostrovsky wrote:
> > +
> > static void pvcalls_back_work(struct work_struct *work)
> > {
> > + struct pvcalls_fedata *priv = container_of(work,
> > + struct pvcalls_fedata, register_work);
> > + int notify, notify_all = 0, more = 1;
> > + struct xen_pvcalls_request req;
> > + struct xenbus_device *dev = priv->dev;
> > +
> > + while (more) {
> > + while (RING_HAS_UNCONSUMED_REQUESTS(&priv->ring)) {
> > + RING_COPY_REQUEST(&priv->ring,
> > + priv->ring.req_cons++,
> > + &req);
> > +
> > + if (!pvcalls_back_handle_cmd(dev, &req)) {
> > + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(
> > + &priv->ring, notify);
> > + notify_all += notify;
> > + }
> > + }
> > +
> > + if (notify_all)
> > + notify_remote_via_irq(priv->irq);
> > +
> > + RING_FINAL_CHECK_FOR_REQUESTS(&priv->ring, more);
> > + }
> > }
> >
> > static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
> > {
> > + struct xenbus_device *dev = dev_id;
> > + struct pvcalls_fedata *priv = NULL;
> > +
> > + if (dev == NULL)
> > + return IRQ_HANDLED;
> > +
> > + priv = dev_get_drvdata(&dev->dev);
> > + if (priv == NULL)
> > + return IRQ_HANDLED;
> > +
> > + /*
> > + * TODO: a small theoretical race exists if we try to queue work
> > + * after pvcalls_back_work checked for final requests and before
> > + * it returns. The queuing will fail, and pvcalls_back_work
> > + * won't do the work because it is about to return. In that
> > + * case, we lose the notification.
> > + */
> > + queue_work(priv->wq, &priv->register_work);
>
> Would queuing delayed work (if queue_work() failed) help? And canceling
> it on next invocation of pvcalls_back_event()?

Looking at the implementation of queue_delayed_work_on and
queue_work_on, it looks like that if queue_work fails then also
queue_delayed_work would fail: they both test on
WORK_STRUCT_PENDING_BIT.

2017-06-15 06:58:10

by Jürgen Groß

[permalink] [raw]
Subject: Re: [PATCH v3 11/18] xen/pvcalls: implement accept command

On 14/06/17 21:27, Stefano Stabellini wrote:
> On Wed, 14 Jun 2017, Juergen Gross wrote:
>> On 14/06/17 02:47, Stefano Stabellini wrote:
>>> On Tue, 13 Jun 2017, Juergen Gross wrote:
>>>> On 02/06/17 21:31, Stefano Stabellini wrote:
>>>>> Implement the accept command by calling inet_accept. To avoid blocking
>>>>> in the kernel, call inet_accept(O_NONBLOCK) from a workqueue, which get
>>>>> scheduled on sk_data_ready (for a passive socket, it means that there
>>>>> are connections to accept).
>>>>>
>>>>> Use the reqcopy field to store the request. Accept the new socket from
>>>>> the delayed work function, create a new sock_mapping for it, map
>>>>> the indexes page and data ring, and reply to the other end. Allocate an
>>>>> ioworker for the socket.
>>>>>
>>>>> Only support one outstanding blocking accept request for every socket at
>>>>> any time.
>>>>>
>>>>> Add a field to sock_mapping to remember the passive socket from which an
>>>>> active socket was created.
>>>>>
>>>>> Signed-off-by: Stefano Stabellini <[email protected]>
>>>>> CC: [email protected]
>>>>> CC: [email protected]
>>>>> ---
>>>>> drivers/xen/pvcalls-back.c | 109 ++++++++++++++++++++++++++++++++++++++++++++-
>>>>> 1 file changed, 108 insertions(+), 1 deletion(-)
>>>>>
>>>>> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
>>>>> index a75586e..f1173f4 100644
>>>>> --- a/drivers/xen/pvcalls-back.c
>>>>> +++ b/drivers/xen/pvcalls-back.c
>>>>> @@ -65,6 +65,7 @@ struct pvcalls_ioworker {
>>>>> struct sock_mapping {
>>>>> struct list_head list;
>>>>> struct pvcalls_fedata *priv;
>>>>> + struct sockpass_mapping *sockpass;
>>>>> struct socket *sock;
>>>>> uint64_t id;
>>>>> grant_ref_t ref;
>>>>> @@ -275,10 +276,79 @@ static int pvcalls_back_release(struct xenbus_device *dev,
>>>>>
>>>>> static void __pvcalls_back_accept(struct work_struct *work)
>>>>> {
>>>>> + struct sockpass_mapping *mappass = container_of(
>>>>> + work, struct sockpass_mapping, register_work);
>>>>> + struct sock_mapping *map;
>>>>> + struct pvcalls_ioworker *iow;
>>>>> + struct pvcalls_fedata *priv;
>>>>> + struct socket *sock;
>>>>> + struct xen_pvcalls_response *rsp;
>>>>> + struct xen_pvcalls_request *req;
>>>>> + int notify;
>>>>> + int ret = -EINVAL;
>>>>> + unsigned long flags;
>>>>> +
>>>>> + priv = mappass->priv;
>>>>> + /* We only need to check the value of "cmd" atomically on read. */
>>>>> + spin_lock_irqsave(&mappass->copy_lock, flags);
>>>>> + req = &mappass->reqcopy;
>>>>> + if (req->cmd != PVCALLS_ACCEPT) {
>>>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>>>>> + return;
>>>>> + }
>>>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>>>>
>>>> What about:
>>>> req = &mappass->reqcopy;
>>>> if (ACCESS_ONCE(req->cmd) != PVCALLS_ACCEPT)
>>>> return;
>>>>
>>>> I can't see the need for taking a lock here.
>>>
>>> Sure, good idea
>>>
>>>
>>>>> +
>>>>> + sock = sock_alloc();
>>>>> + if (sock == NULL)
>>>>> + goto out_error;
>>>>> + sock->type = mappass->sock->type;
>>>>> + sock->ops = mappass->sock->ops;
>>>>> +
>>>>> + ret = inet_accept(mappass->sock, sock, O_NONBLOCK, true);
>>>>> + if (ret == -EAGAIN) {
>>>>> + sock_release(sock);
>>>>> + goto out_error;
>>>>> + }
>>>>> +
>>>>> + map = pvcalls_new_active_socket(priv,
>>>>> + req->u.accept.id_new,
>>>>> + req->u.accept.ref,
>>>>> + req->u.accept.evtchn,
>>>>> + sock);
>>>>> + if (!map) {
>>>>> + sock_release(sock);
>>>>> + goto out_error;
>>>>> + }
>>>>> +
>>>>> + map->sockpass = mappass;
>>>>> + iow = &map->ioworker;
>>>>> + atomic_inc(&map->read);
>>>>> + atomic_inc(&map->io);
>>>>> + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
>>>>> +
>>>>> +out_error:
>>>>> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
>>>>> + rsp->req_id = req->req_id;
>>>>> + rsp->cmd = req->cmd;
>>>>> + rsp->u.accept.id = req->u.accept.id;
>>>>> + rsp->ret = ret;
>>>>> + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
>>>>> + if (notify)
>>>>> + notify_remote_via_irq(priv->irq);
>>>>> +
>>>>> + spin_lock_irqsave(&mappass->copy_lock, flags);
>>>>> + mappass->reqcopy.cmd = 0;
>>>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>>>>
>>>> ACCESS_ONCE(mappass->reqcopy.cmd) = 0;
>>>
>>> OK
>>>
>>>
>>>>> }
>>>>>
>>>>> static void pvcalls_pass_sk_data_ready(struct sock *sock)
>>>>> {
>>>>> + struct sockpass_mapping *mappass = sock->sk_user_data;
>>>>> +
>>>>> + if (mappass == NULL)
>>>>> + return;
>>>>> +
>>>>> + queue_work(mappass->wq, &mappass->register_work);
>>>>> }
>>>>>
>>>>> static int pvcalls_back_bind(struct xenbus_device *dev,
>>>>> @@ -380,7 +450,44 @@ static int pvcalls_back_listen(struct xenbus_device *dev,
>>>>> static int pvcalls_back_accept(struct xenbus_device *dev,
>>>>> struct xen_pvcalls_request *req)
>>>>> {
>>>>> - return 0;
>>>>> + struct pvcalls_fedata *priv;
>>>>> + struct sockpass_mapping *mappass;
>>>>> + int ret = -EINVAL;
>>>>> + struct xen_pvcalls_response *rsp;
>>>>> + unsigned long flags;
>>>>> +
>>>>> + priv = dev_get_drvdata(&dev->dev);
>>>>> +
>>>>> + mappass = radix_tree_lookup(&priv->socketpass_mappings,
>>>>> + req->u.accept.id);
>>>>> + if (mappass == NULL)
>>>>> + goto out_error;
>>>>> +
>>>>> + /*
>>>>> + * Limitation of the current implementation: only support one
>>>>> + * concurrent accept or poll call on one socket.
>>>>> + */
>>>>> + spin_lock_irqsave(&mappass->copy_lock, flags);
>>>>> + if (mappass->reqcopy.cmd != 0) {
>>>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
>>>>> + ret = -EINTR;
>>>>> + goto out_error;
>>>>> + }
>>>>> +
>>>>> + mappass->reqcopy = *req;
>>>>
>>>> This time you need the lock, however you should use:
>>>>
>>>> ACCESS_ONCE(mappass->reqcopy) = *req;
>>>
>>> I don't think that guarantees atomic accesses to the cmd field of the
>>> struct. Shouldn't this be:
>>>
>>> ACCESS_ONCE(mappass->reqcopy.cmd) = req->cmd;
>>> mappass->reqcopy = *req;
>>
>> Hmm, what if the frontend changes cmd between those two accesses?
>
> This cannot happen because req is a copy of the guest request here.
> However, it is possible that __pvcalls_back_accept is racing against
> pvcalls_back_accept. In that case, I would need to make sure not only
> that cmd is written atomically, but now that I am thinking about this,
> that cmd is written *after* the rest of reqcopy: otherwise
> __pvcalls_back_accept could see a partially updated reqcopy.
>
> It would be possible to do this with atomic accesses and barriers, but
> I am thinking that it is not worth the effort. I am tempted to roll back
> to the previous version with spinlocks.

Okay. Maybe add a comment mentioning this possible race.


Juergen

>
>
>> You either need another local buffer or you have to copy cmd via
>> ACCESS_ONCE() and the rest of *req separately (seems not to be
>> that hard: its just cmd, req_id and u).
>>
>> BTW: Maybe you should use READ_ONCE() and WRITE_ONCE() instead of
>> ACCESS_ONCE(), as those seem to be preferred nowadays.
>

2017-06-15 14:33:56

by Boris Ostrovsky

[permalink] [raw]
Subject: Re: [PATCH v3 06/18] xen/pvcalls: handle commands from the frontend

On 06/14/2017 05:03 PM, Stefano Stabellini wrote:
> On Mon, 12 Jun 2017, Boris Ostrovsky wrote:
>>> +
>>> static void pvcalls_back_work(struct work_struct *work)
>>> {
>>> + struct pvcalls_fedata *priv = container_of(work,
>>> + struct pvcalls_fedata, register_work);
>>> + int notify, notify_all = 0, more = 1;
>>> + struct xen_pvcalls_request req;
>>> + struct xenbus_device *dev = priv->dev;
>>> +
>>> + while (more) {
>>> + while (RING_HAS_UNCONSUMED_REQUESTS(&priv->ring)) {
>>> + RING_COPY_REQUEST(&priv->ring,
>>> + priv->ring.req_cons++,
>>> + &req);
>>> +
>>> + if (!pvcalls_back_handle_cmd(dev, &req)) {
>>> + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(
>>> + &priv->ring, notify);
>>> + notify_all += notify;
>>> + }
>>> + }
>>> +
>>> + if (notify_all)
>>> + notify_remote_via_irq(priv->irq);
>>> +
>>> + RING_FINAL_CHECK_FOR_REQUESTS(&priv->ring, more);
>>> + }
>>> }
>>>
>>> static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
>>> {
>>> + struct xenbus_device *dev = dev_id;
>>> + struct pvcalls_fedata *priv = NULL;
>>> +
>>> + if (dev == NULL)
>>> + return IRQ_HANDLED;
>>> +
>>> + priv = dev_get_drvdata(&dev->dev);
>>> + if (priv == NULL)
>>> + return IRQ_HANDLED;
>>> +
>>> + /*
>>> + * TODO: a small theoretical race exists if we try to queue work
>>> + * after pvcalls_back_work checked for final requests and before
>>> + * it returns. The queuing will fail, and pvcalls_back_work
>>> + * won't do the work because it is about to return. In that
>>> + * case, we lose the notification.
>>> + */
>>> + queue_work(priv->wq, &priv->register_work);
>> Would queuing delayed work (if queue_work() failed) help? And canceling
>> it on next invocation of pvcalls_back_event()?
> Looking at the implementation of queue_delayed_work_on and
> queue_work_on, it looks like that if queue_work fails then also
> queue_delayed_work would fail: they both test on
> WORK_STRUCT_PENDING_BIT.

Right, I should have looked at this myself. And flush_work() I suppose
cannot be used here since it may sleep?

Then I also can't think of anything else.

-boris

2017-06-15 17:54:34

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 11/18] xen/pvcalls: implement accept command

On Thu, 15 Jun 2017, Juergen Gross wrote:
> On 14/06/17 21:27, Stefano Stabellini wrote:
> > On Wed, 14 Jun 2017, Juergen Gross wrote:
> >> On 14/06/17 02:47, Stefano Stabellini wrote:
> >>> On Tue, 13 Jun 2017, Juergen Gross wrote:
> >>>> On 02/06/17 21:31, Stefano Stabellini wrote:
> >>>>> Implement the accept command by calling inet_accept. To avoid blocking
> >>>>> in the kernel, call inet_accept(O_NONBLOCK) from a workqueue, which get
> >>>>> scheduled on sk_data_ready (for a passive socket, it means that there
> >>>>> are connections to accept).
> >>>>>
> >>>>> Use the reqcopy field to store the request. Accept the new socket from
> >>>>> the delayed work function, create a new sock_mapping for it, map
> >>>>> the indexes page and data ring, and reply to the other end. Allocate an
> >>>>> ioworker for the socket.
> >>>>>
> >>>>> Only support one outstanding blocking accept request for every socket at
> >>>>> any time.
> >>>>>
> >>>>> Add a field to sock_mapping to remember the passive socket from which an
> >>>>> active socket was created.
> >>>>>
> >>>>> Signed-off-by: Stefano Stabellini <[email protected]>
> >>>>> CC: [email protected]
> >>>>> CC: [email protected]
> >>>>> ---
> >>>>> drivers/xen/pvcalls-back.c | 109 ++++++++++++++++++++++++++++++++++++++++++++-
> >>>>> 1 file changed, 108 insertions(+), 1 deletion(-)
> >>>>>
> >>>>> diff --git a/drivers/xen/pvcalls-back.c b/drivers/xen/pvcalls-back.c
> >>>>> index a75586e..f1173f4 100644
> >>>>> --- a/drivers/xen/pvcalls-back.c
> >>>>> +++ b/drivers/xen/pvcalls-back.c
> >>>>> @@ -65,6 +65,7 @@ struct pvcalls_ioworker {
> >>>>> struct sock_mapping {
> >>>>> struct list_head list;
> >>>>> struct pvcalls_fedata *priv;
> >>>>> + struct sockpass_mapping *sockpass;
> >>>>> struct socket *sock;
> >>>>> uint64_t id;
> >>>>> grant_ref_t ref;
> >>>>> @@ -275,10 +276,79 @@ static int pvcalls_back_release(struct xenbus_device *dev,
> >>>>>
> >>>>> static void __pvcalls_back_accept(struct work_struct *work)
> >>>>> {
> >>>>> + struct sockpass_mapping *mappass = container_of(
> >>>>> + work, struct sockpass_mapping, register_work);
> >>>>> + struct sock_mapping *map;
> >>>>> + struct pvcalls_ioworker *iow;
> >>>>> + struct pvcalls_fedata *priv;
> >>>>> + struct socket *sock;
> >>>>> + struct xen_pvcalls_response *rsp;
> >>>>> + struct xen_pvcalls_request *req;
> >>>>> + int notify;
> >>>>> + int ret = -EINVAL;
> >>>>> + unsigned long flags;
> >>>>> +
> >>>>> + priv = mappass->priv;
> >>>>> + /* We only need to check the value of "cmd" atomically on read. */
> >>>>> + spin_lock_irqsave(&mappass->copy_lock, flags);
> >>>>> + req = &mappass->reqcopy;
> >>>>> + if (req->cmd != PVCALLS_ACCEPT) {
> >>>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> >>>>> + return;
> >>>>> + }
> >>>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> >>>>
> >>>> What about:
> >>>> req = &mappass->reqcopy;
> >>>> if (ACCESS_ONCE(req->cmd) != PVCALLS_ACCEPT)
> >>>> return;
> >>>>
> >>>> I can't see the need for taking a lock here.
> >>>
> >>> Sure, good idea
> >>>
> >>>
> >>>>> +
> >>>>> + sock = sock_alloc();
> >>>>> + if (sock == NULL)
> >>>>> + goto out_error;
> >>>>> + sock->type = mappass->sock->type;
> >>>>> + sock->ops = mappass->sock->ops;
> >>>>> +
> >>>>> + ret = inet_accept(mappass->sock, sock, O_NONBLOCK, true);
> >>>>> + if (ret == -EAGAIN) {
> >>>>> + sock_release(sock);
> >>>>> + goto out_error;
> >>>>> + }
> >>>>> +
> >>>>> + map = pvcalls_new_active_socket(priv,
> >>>>> + req->u.accept.id_new,
> >>>>> + req->u.accept.ref,
> >>>>> + req->u.accept.evtchn,
> >>>>> + sock);
> >>>>> + if (!map) {
> >>>>> + sock_release(sock);
> >>>>> + goto out_error;
> >>>>> + }
> >>>>> +
> >>>>> + map->sockpass = mappass;
> >>>>> + iow = &map->ioworker;
> >>>>> + atomic_inc(&map->read);
> >>>>> + atomic_inc(&map->io);
> >>>>> + queue_work_on(iow->cpu, iow->wq, &iow->register_work);
> >>>>> +
> >>>>> +out_error:
> >>>>> + rsp = RING_GET_RESPONSE(&priv->ring, priv->ring.rsp_prod_pvt++);
> >>>>> + rsp->req_id = req->req_id;
> >>>>> + rsp->cmd = req->cmd;
> >>>>> + rsp->u.accept.id = req->u.accept.id;
> >>>>> + rsp->ret = ret;
> >>>>> + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&priv->ring, notify);
> >>>>> + if (notify)
> >>>>> + notify_remote_via_irq(priv->irq);
> >>>>> +
> >>>>> + spin_lock_irqsave(&mappass->copy_lock, flags);
> >>>>> + mappass->reqcopy.cmd = 0;
> >>>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> >>>>
> >>>> ACCESS_ONCE(mappass->reqcopy.cmd) = 0;
> >>>
> >>> OK
> >>>
> >>>
> >>>>> }
> >>>>>
> >>>>> static void pvcalls_pass_sk_data_ready(struct sock *sock)
> >>>>> {
> >>>>> + struct sockpass_mapping *mappass = sock->sk_user_data;
> >>>>> +
> >>>>> + if (mappass == NULL)
> >>>>> + return;
> >>>>> +
> >>>>> + queue_work(mappass->wq, &mappass->register_work);
> >>>>> }
> >>>>>
> >>>>> static int pvcalls_back_bind(struct xenbus_device *dev,
> >>>>> @@ -380,7 +450,44 @@ static int pvcalls_back_listen(struct xenbus_device *dev,
> >>>>> static int pvcalls_back_accept(struct xenbus_device *dev,
> >>>>> struct xen_pvcalls_request *req)
> >>>>> {
> >>>>> - return 0;
> >>>>> + struct pvcalls_fedata *priv;
> >>>>> + struct sockpass_mapping *mappass;
> >>>>> + int ret = -EINVAL;
> >>>>> + struct xen_pvcalls_response *rsp;
> >>>>> + unsigned long flags;
> >>>>> +
> >>>>> + priv = dev_get_drvdata(&dev->dev);
> >>>>> +
> >>>>> + mappass = radix_tree_lookup(&priv->socketpass_mappings,
> >>>>> + req->u.accept.id);
> >>>>> + if (mappass == NULL)
> >>>>> + goto out_error;
> >>>>> +
> >>>>> + /*
> >>>>> + * Limitation of the current implementation: only support one
> >>>>> + * concurrent accept or poll call on one socket.
> >>>>> + */
> >>>>> + spin_lock_irqsave(&mappass->copy_lock, flags);
> >>>>> + if (mappass->reqcopy.cmd != 0) {
> >>>>> + spin_unlock_irqrestore(&mappass->copy_lock, flags);
> >>>>> + ret = -EINTR;
> >>>>> + goto out_error;
> >>>>> + }
> >>>>> +
> >>>>> + mappass->reqcopy = *req;
> >>>>
> >>>> This time you need the lock, however you should use:
> >>>>
> >>>> ACCESS_ONCE(mappass->reqcopy) = *req;
> >>>
> >>> I don't think that guarantees atomic accesses to the cmd field of the
> >>> struct. Shouldn't this be:
> >>>
> >>> ACCESS_ONCE(mappass->reqcopy.cmd) = req->cmd;
> >>> mappass->reqcopy = *req;
> >>
> >> Hmm, what if the frontend changes cmd between those two accesses?
> >
> > This cannot happen because req is a copy of the guest request here.
> > However, it is possible that __pvcalls_back_accept is racing against
> > pvcalls_back_accept. In that case, I would need to make sure not only
> > that cmd is written atomically, but now that I am thinking about this,
> > that cmd is written *after* the rest of reqcopy: otherwise
> > __pvcalls_back_accept could see a partially updated reqcopy.
> >
> > It would be possible to do this with atomic accesses and barriers, but
> > I am thinking that it is not worth the effort. I am tempted to roll back
> > to the previous version with spinlocks.
>
> Okay. Maybe add a comment mentioning this possible race.

I'll do


> >
> >
> >> You either need another local buffer or you have to copy cmd via
> >> ACCESS_ONCE() and the rest of *req separately (seems not to be
> >> that hard: its just cmd, req_id and u).
> >>
> >> BTW: Maybe you should use READ_ONCE() and WRITE_ONCE() instead of
> >> ACCESS_ONCE(), as those seem to be preferred nowadays.

2017-06-15 18:56:21

by Stefano Stabellini

[permalink] [raw]
Subject: Re: [PATCH v3 06/18] xen/pvcalls: handle commands from the frontend

On Thu, 15 Jun 2017, Boris Ostrovsky wrote:
> On 06/14/2017 05:03 PM, Stefano Stabellini wrote:
> > On Mon, 12 Jun 2017, Boris Ostrovsky wrote:
> >>> +
> >>> static void pvcalls_back_work(struct work_struct *work)
> >>> {
> >>> + struct pvcalls_fedata *priv = container_of(work,
> >>> + struct pvcalls_fedata, register_work);
> >>> + int notify, notify_all = 0, more = 1;
> >>> + struct xen_pvcalls_request req;
> >>> + struct xenbus_device *dev = priv->dev;
> >>> +
> >>> + while (more) {
> >>> + while (RING_HAS_UNCONSUMED_REQUESTS(&priv->ring)) {
> >>> + RING_COPY_REQUEST(&priv->ring,
> >>> + priv->ring.req_cons++,
> >>> + &req);
> >>> +
> >>> + if (!pvcalls_back_handle_cmd(dev, &req)) {
> >>> + RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(
> >>> + &priv->ring, notify);
> >>> + notify_all += notify;
> >>> + }
> >>> + }
> >>> +
> >>> + if (notify_all)
> >>> + notify_remote_via_irq(priv->irq);
> >>> +
> >>> + RING_FINAL_CHECK_FOR_REQUESTS(&priv->ring, more);
> >>> + }
> >>> }
> >>>
> >>> static irqreturn_t pvcalls_back_event(int irq, void *dev_id)
> >>> {
> >>> + struct xenbus_device *dev = dev_id;
> >>> + struct pvcalls_fedata *priv = NULL;
> >>> +
> >>> + if (dev == NULL)
> >>> + return IRQ_HANDLED;
> >>> +
> >>> + priv = dev_get_drvdata(&dev->dev);
> >>> + if (priv == NULL)
> >>> + return IRQ_HANDLED;
> >>> +
> >>> + /*
> >>> + * TODO: a small theoretical race exists if we try to queue work
> >>> + * after pvcalls_back_work checked for final requests and before
> >>> + * it returns. The queuing will fail, and pvcalls_back_work
> >>> + * won't do the work because it is about to return. In that
> >>> + * case, we lose the notification.
> >>> + */
> >>> + queue_work(priv->wq, &priv->register_work);
> >> Would queuing delayed work (if queue_work() failed) help? And canceling
> >> it on next invocation of pvcalls_back_event()?
> > Looking at the implementation of queue_delayed_work_on and
> > queue_work_on, it looks like that if queue_work fails then also
> > queue_delayed_work would fail: they both test on
> > WORK_STRUCT_PENDING_BIT.
>
> Right, I should have looked at this myself. And flush_work() I suppose
> cannot be used here since it may sleep?
>
> Then I also can't think of anything else.

I guess one way to work around the issue would be to use multiple work
items, and queue a new (different) work item at each pvcalls_back_event.
But that approach would use more memory and would need a new lock
in pvcalls_back_work.

Given that the race is only theoretical (I am running nginx inside a
VM and hitting it with as many multiple requests as I can and still I
cannot reproduce it), I am tempted to leave it as-is with a comment. We
can revisit it in the future if we find any real issues.