Hi all,
this series introduces the frontend for the newly introduced PV Calls
procotol.
PV Calls is a paravirtualized protocol that allows the implementation of
a set of POSIX functions in a different domain. The PV Calls frontend
sends POSIX function calls to the backend, which implements them and
returns a value to the frontend and acts on the function call.
For more information about PV Calls, please read:
https://xenbits.xen.org/docs/unstable/misc/pvcalls.html
This patch series only implements the frontend driver. It doesn't
attempt to redirect POSIX calls to it. The functions exported in
pvcalls-front.h are meant to be used for that. A separate patch series
will be sent to use them and hook them into the system.
Changes in v2:
- use xenbus_read_unsigned when possible
- call dev_set_drvdata earlier in pvcalls_front_probe not to dereference
a NULL pointer in the error path
- set ret appropriately in pvcalls_front_probe
- include pvcalls-front.h in pvcalls-front.c
- call wake_up only once after the consuming loop in pvcalls_front_event_handler
- don't leak "bytes" in case of errors in create_active
- call spin_unlock appropriately in case of errors in create_active
- remove all BUG_ONs
- don't leak newsock->sk in pvcalls_front_accept in case of errors
- rename PVCALLS_FRON_MAX_SPIN to PVCALLS_FRONT_MAX_SPIN
- return bool from pvcalls_front_read_todo
- add a barrier after setting PVCALLS_FLAG_POLL_RET in
pvcalls_front_event_handler
- remove outdated comment in pvcalls_front_free_map
- clear sock->sk->sk_send_head later in pvcalls_front_release
- make XEN_PVCALLS_FRONTEND tristate
- don't add an empty resume function
Stefano Stabellini (13):
xen/pvcalls: introduce the pvcalls xenbus frontend
xen/pvcalls: connect to the backend
xen/pvcalls: implement socket command and handle events
xen/pvcalls: implement connect command
xen/pvcalls: implement bind command
xen/pvcalls: implement listen command
xen/pvcalls: implement accept command
xen/pvcalls: implement sendmsg
xen/pvcalls: implement recvmsg
xen/pvcalls: implement poll command
xen/pvcalls: implement release command
xen/pvcalls: implement frontend disconnect
xen: introduce a Kconfig option to enable the pvcalls frontend
drivers/xen/Kconfig | 9 +
drivers/xen/Makefile | 1 +
drivers/xen/pvcalls-front.c | 1103 +++++++++++++++++++++++++++++++++++++++++++
drivers/xen/pvcalls-front.h | 28 ++
4 files changed, 1141 insertions(+)
create mode 100644 drivers/xen/pvcalls-front.c
create mode 100644 drivers/xen/pvcalls-front.h
Introduce a xenbus frontend for the pvcalls protocol, as defined by
https://xenbits.xen.org/docs/unstable/misc/pvcalls.html.
This patch only adds the stubs, the code will be added by the following
patches.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 61 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 61 insertions(+)
create mode 100644 drivers/xen/pvcalls-front.c
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
new file mode 100644
index 0000000..a8d38c2
--- /dev/null
+++ b/drivers/xen/pvcalls-front.c
@@ -0,0 +1,61 @@
+/*
+ * (c) 2017 Stefano Stabellini <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+
+#include <xen/events.h>
+#include <xen/grant_table.h>
+#include <xen/xen.h>
+#include <xen/xenbus.h>
+#include <xen/interface/io/pvcalls.h>
+
+static const struct xenbus_device_id pvcalls_front_ids[] = {
+ { "pvcalls" },
+ { "" }
+};
+
+static int pvcalls_front_remove(struct xenbus_device *dev)
+{
+ return 0;
+}
+
+static int pvcalls_front_probe(struct xenbus_device *dev,
+ const struct xenbus_device_id *id)
+{
+ return 0;
+}
+
+static void pvcalls_front_changed(struct xenbus_device *dev,
+ enum xenbus_state backend_state)
+{
+}
+
+static struct xenbus_driver pvcalls_front_driver = {
+ .ids = pvcalls_front_ids,
+ .probe = pvcalls_front_probe,
+ .remove = pvcalls_front_remove,
+ .otherend_changed = pvcalls_front_changed,
+};
+
+static int __init pvcalls_frontend_init(void)
+{
+ if (!xen_domain())
+ return -ENODEV;
+
+ pr_info("Initialising Xen pvcalls frontend driver\n");
+
+ return xenbus_register_frontend(&pvcalls_front_driver);
+}
+
+module_init(pvcalls_frontend_init);
--
1.9.1
Also add pvcalls-front to the Makefile.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/Kconfig | 9 +++++++++
drivers/xen/Makefile | 1 +
2 files changed, 10 insertions(+)
diff --git a/drivers/xen/Kconfig b/drivers/xen/Kconfig
index 4545561..0b2c828 100644
--- a/drivers/xen/Kconfig
+++ b/drivers/xen/Kconfig
@@ -196,6 +196,15 @@ config XEN_PCIDEV_BACKEND
If in doubt, say m.
+config XEN_PVCALLS_FRONTEND
+ tristate "XEN PV Calls frontend driver"
+ depends on INET && XEN
+ help
+ Experimental frontend for the Xen PV Calls protocol
+ (https://xenbits.xen.org/docs/unstable/misc/pvcalls.html). It
+ sends a small set of POSIX calls to the backend, which
+ implements them.
+
config XEN_PVCALLS_BACKEND
bool "XEN PV Calls backend driver"
depends on INET && XEN && XEN_BACKEND
diff --git a/drivers/xen/Makefile b/drivers/xen/Makefile
index 480b928..afb9e03 100644
--- a/drivers/xen/Makefile
+++ b/drivers/xen/Makefile
@@ -39,6 +39,7 @@ obj-$(CONFIG_XEN_EFI) += efi.o
obj-$(CONFIG_XEN_SCSI_BACKEND) += xen-scsiback.o
obj-$(CONFIG_XEN_AUTO_XLATE) += xlate_mmu.o
obj-$(CONFIG_XEN_PVCALLS_BACKEND) += pvcalls-back.o
+obj-$(CONFIG_XEN_PVCALLS_FRONTEND) += pvcalls-front.o
xen-evtchn-y := evtchn.o
xen-gntdev-y := gntdev.o
xen-gntalloc-y := gntalloc.o
--
1.9.1
Implement pvcalls frontend removal function. Go through the list of
active and passive sockets and free them all, one at a time.
Signed-off-by: Stefano Stabellini <[email protected]>
Reviewed-by: Juergen Gross <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 28 ++++++++++++++++++++++++++++
1 file changed, 28 insertions(+)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index 5a4040e..b3d4675 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -911,6 +911,34 @@ int pvcalls_front_release(struct socket *sock)
static int pvcalls_front_remove(struct xenbus_device *dev)
{
+ struct pvcalls_bedata *bedata;
+ struct sock_mapping *map = NULL, *n;
+
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+ list_for_each_entry_safe(map, n, &bedata->socket_mappings, list) {
+ mutex_lock(&map->active.in_mutex);
+ mutex_lock(&map->active.out_mutex);
+ pvcalls_front_free_map(bedata, map);
+ mutex_unlock(&map->active.out_mutex);
+ mutex_unlock(&map->active.in_mutex);
+ kfree(map);
+ }
+ list_for_each_entry_safe(map, n, &bedata->socketpass_mappings, list) {
+ spin_lock(&bedata->pvcallss_lock);
+ list_del_init(&map->list);
+ spin_unlock(&bedata->pvcallss_lock);
+ kfree(map);
+ }
+ if (bedata->irq > 0)
+ unbind_from_irqhandler(bedata->irq, dev);
+ if (bedata->ref >= 0)
+ gnttab_end_foreign_access(bedata->ref, 0, 0);
+ kfree(bedata->ring.sring);
+ kfree(bedata);
+ dev_set_drvdata(&dev->dev, NULL);
+ xenbus_switch_state(dev, XenbusStateClosed);
+ pvcalls_front_dev = NULL;
return 0;
}
--
1.9.1
Send PVCALLS_ACCEPT to the backend. Allocate a new active socket. Make
sure that only one accept command is executed at any given time by
setting PVCALLS_FLAG_ACCEPT_INFLIGHT and waiting on the
inflight_accept_req waitqueue.
sock->sk->sk_send_head is not used for ip sockets: reuse the field to
store a pointer to the struct sock_mapping corresponding to the socket.
Convert the new struct socket pointer into an uint64_t and use it as id
for the new socket to pass to the backend.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 79 +++++++++++++++++++++++++++++++++++++++++++++
drivers/xen/pvcalls-front.h | 3 ++
2 files changed, 82 insertions(+)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index 3b5d50e..b8c4538 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -413,6 +413,85 @@ int pvcalls_front_listen(struct socket *sock, int backlog)
return ret;
}
+int pvcalls_front_accept(struct socket *sock, struct socket *newsock, int flags)
+{
+ struct pvcalls_bedata *bedata;
+ struct sock_mapping *map;
+ struct sock_mapping *map2 = NULL;
+ struct xen_pvcalls_request *req;
+ int notify, req_id, ret, evtchn;
+
+ if (!pvcalls_front_dev)
+ return -ENOTCONN;
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+ map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
+ if (!map)
+ return -ENOTSOCK;
+
+ if (map->passive.status != PVCALLS_STATUS_LISTEN)
+ return -EINVAL;
+
+ /*
+ * Backend only supports 1 inflight accept request, will return
+ * errors for the others
+ */
+ if (test_and_set_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
+ (void *)&map->passive.flags)) {
+ if (wait_event_interruptible(map->passive.inflight_accept_req,
+ !test_and_set_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
+ (void *)&map->passive.flags))
+ != 0)
+ return -EINTR;
+ }
+
+
+ newsock->sk = kzalloc(sizeof(*newsock->sk), GFP_KERNEL);
+ if (newsock->sk == NULL)
+ return -ENOMEM;
+
+ spin_lock(&bedata->pvcallss_lock);
+ req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
+ if (RING_FULL(&bedata->ring) ||
+ READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
+ kfree(newsock->sk);
+ spin_unlock(&bedata->pvcallss_lock);
+ return -EAGAIN;
+ }
+
+ map2 = create_active(&evtchn);
+
+ req = RING_GET_REQUEST(&bedata->ring, req_id);
+ req->req_id = req_id;
+ req->cmd = PVCALLS_ACCEPT;
+ req->u.accept.id = (uint64_t) sock;
+ req->u.accept.ref = map2->active.ref;
+ req->u.accept.id_new = (uint64_t) newsock;
+ req->u.accept.evtchn = evtchn;
+
+ list_add_tail(&map2->list, &bedata->socket_mappings);
+ WRITE_ONCE(newsock->sk->sk_send_head, (void *)map2);
+ map2->sock = newsock;
+
+ bedata->ring.req_prod_pvt++;
+ RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
+ spin_unlock(&bedata->pvcallss_lock);
+ if (notify)
+ notify_remote_via_irq(bedata->irq);
+
+ wait_event(bedata->inflight_req,
+ READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
+
+ clear_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT, (void *)&map->passive.flags);
+ wake_up(&map->passive.inflight_accept_req);
+
+ ret = bedata->rsp[req_id].ret;
+ /* read ret, then set this rsp slot to be reused */
+ smp_mb();
+ WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
+ return ret;
+}
+
static const struct xenbus_device_id pvcalls_front_ids[] = {
{ "pvcalls" },
{ "" }
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index aa8fe10..ab4f1da 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -10,5 +10,8 @@ int pvcalls_front_bind(struct socket *sock,
struct sockaddr *addr,
int addr_len);
int pvcalls_front_listen(struct socket *sock, int backlog);
+int pvcalls_front_accept(struct socket *sock,
+ struct socket *newsock,
+ int flags);
#endif
--
1.9.1
Send data to an active socket by copying data to the "out" ring. Take
the active socket out_mutex so that only one function can access the
ring at any given time.
If not enough room is available on the ring, rather than returning
immediately or sleep-waiting, spin for up to 5000 cycles. This small
optimization turns out to improve performance significantly.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 109 ++++++++++++++++++++++++++++++++++++++++++++
drivers/xen/pvcalls-front.h | 3 ++
2 files changed, 112 insertions(+)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index b8c4538..d8ed280 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -29,6 +29,7 @@
#define PVCALLS_INVALID_ID (UINT_MAX)
#define RING_ORDER XENBUS_MAX_RING_GRANT_ORDER
#define PVCALLS_NR_REQ_PER_RING __CONST_RING_SIZE(xen_pvcalls, XEN_PAGE_SIZE)
+#define PVCALLS_FRONT_MAX_SPIN 5000
struct pvcalls_bedata {
struct xen_pvcalls_front_ring ring;
@@ -79,6 +80,22 @@ struct sock_mapping {
};
};
+static int pvcalls_front_write_todo(struct sock_mapping *map)
+{
+ struct pvcalls_data_intf *intf = map->active.ring;
+ RING_IDX cons, prod, size = XEN_FLEX_RING_SIZE(intf->ring_order);
+ int32_t error;
+
+ cons = intf->out_cons;
+ prod = intf->out_prod;
+ error = intf->out_error;
+ if (error == -ENOTCONN)
+ return 0;
+ if (error != 0)
+ return error;
+ return size - pvcalls_queued(prod, cons, size);
+}
+
static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
{
struct xenbus_device *dev = dev_id;
@@ -309,6 +326,98 @@ int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
return ret;
}
+static int __write_ring(struct pvcalls_data_intf *intf,
+ struct pvcalls_data *data,
+ struct iov_iter *msg_iter,
+ size_t len)
+{
+ RING_IDX cons, prod, size, masked_prod, masked_cons;
+ RING_IDX array_size = XEN_FLEX_RING_SIZE(intf->ring_order);
+ int32_t error;
+
+ cons = intf->out_cons;
+ prod = intf->out_prod;
+ error = intf->out_error;
+ /* read indexes before continuing */
+ virt_mb();
+
+ if (error < 0)
+ return error;
+
+ size = pvcalls_queued(prod, cons, array_size);
+ if (size >= array_size)
+ return 0;
+ if (len > array_size - size)
+ len = array_size - size;
+
+ masked_prod = pvcalls_mask(prod, array_size);
+ masked_cons = pvcalls_mask(cons, array_size);
+
+ if (masked_prod < masked_cons) {
+ copy_from_iter(data->out + masked_prod, len, msg_iter);
+ } else {
+ if (len > array_size - masked_prod) {
+ copy_from_iter(data->out + masked_prod,
+ array_size - masked_prod, msg_iter);
+ copy_from_iter(data->out,
+ len - (array_size - masked_prod),
+ msg_iter);
+ } else {
+ copy_from_iter(data->out + masked_prod, len, msg_iter);
+ }
+ }
+ /* write to ring before updating pointer */
+ virt_wmb();
+ intf->out_prod += len;
+
+ return len;
+}
+
+int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
+{
+ struct pvcalls_bedata *bedata;
+ struct sock_mapping *map;
+ int sent = 0, tot_sent = 0;
+ int count = 0, flags;
+
+ if (!pvcalls_front_dev)
+ return -ENOTCONN;
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+ map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
+ if (!map)
+ return -ENOTSOCK;
+
+ flags = msg->msg_flags;
+ if (flags & (MSG_CONFIRM|MSG_DONTROUTE|MSG_EOR|MSG_OOB))
+ return -EOPNOTSUPP;
+
+ mutex_lock(&map->active.out_mutex);
+ if ((flags & MSG_DONTWAIT) && !pvcalls_front_write_todo(map)) {
+ mutex_unlock(&map->active.out_mutex);
+ return -EAGAIN;
+ }
+
+again:
+ count++;
+ sent = __write_ring(map->active.ring,
+ &map->active.data, &msg->msg_iter,
+ len);
+ if (sent > 0) {
+ len -= sent;
+ tot_sent += sent;
+ notify_remote_via_irq(map->active.irq);
+ }
+ if (sent >= 0 && len > 0 && count < PVCALLS_FRONT_MAX_SPIN)
+ goto again;
+ if (sent < 0)
+ tot_sent = sent;
+
+ mutex_unlock(&map->active.out_mutex);
+ return tot_sent;
+}
+
int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
{
struct pvcalls_bedata *bedata;
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index ab4f1da..d937c24 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -13,5 +13,8 @@ int pvcalls_front_bind(struct socket *sock,
int pvcalls_front_accept(struct socket *sock,
struct socket *newsock,
int flags);
+int pvcalls_front_sendmsg(struct socket *sock,
+ struct msghdr *msg,
+ size_t len);
#endif
--
1.9.1
Send PVCALLS_RELEASE to the backend and wait for a reply. Take both
in_mutex and out_mutex to avoid concurrent accesses. Then, free the
socket.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 85 +++++++++++++++++++++++++++++++++++++++++++++
drivers/xen/pvcalls-front.h | 1 +
2 files changed, 86 insertions(+)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index 833b717..5a4040e 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -184,6 +184,23 @@ static irqreturn_t pvcalls_front_conn_handler(int irq, void *sock_map)
return IRQ_HANDLED;
}
+static void pvcalls_front_free_map(struct pvcalls_bedata *bedata,
+ struct sock_mapping *map)
+{
+ int i;
+
+ spin_lock(&bedata->pvcallss_lock);
+ if (!list_empty(&map->list))
+ list_del_init(&map->list);
+ spin_unlock(&bedata->pvcallss_lock);
+
+ for (i = 0; i < (1 << map->active.ring->ring_order); i++)
+ gnttab_end_foreign_access(map->active.ring->ref[i], 0, 0);
+ gnttab_end_foreign_access(map->active.ref, 0, 0);
+ free_page((unsigned long)map->active.ring);
+ unbind_from_irqhandler(map->active.irq, map);
+}
+
int pvcalls_front_socket(struct socket *sock)
{
struct pvcalls_bedata *bedata;
@@ -819,6 +836,74 @@ unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
return pvcalls_front_poll_passive(file, bedata, map, wait);
}
+int pvcalls_front_release(struct socket *sock)
+{
+ struct pvcalls_bedata *bedata;
+ struct sock_mapping *map;
+ int req_id, notify;
+ struct xen_pvcalls_request *req;
+
+ if (!pvcalls_front_dev)
+ return -EIO;
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+ if (!bedata)
+ return -EIO;
+
+ if (sock->sk == NULL)
+ return 0;
+
+ map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
+ if (map == NULL)
+ return 0;
+
+ spin_lock(&bedata->pvcallss_lock);
+ req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
+ if (RING_FULL(&bedata->ring) ||
+ READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
+ spin_unlock(&bedata->pvcallss_lock);
+ return -EAGAIN;
+ }
+ WRITE_ONCE(sock->sk->sk_send_head, NULL);
+
+ req = RING_GET_REQUEST(&bedata->ring, req_id);
+ req->req_id = req_id;
+ req->cmd = PVCALLS_RELEASE;
+ req->u.release.id = (uint64_t)sock;
+
+ bedata->ring.req_prod_pvt++;
+ RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
+ spin_unlock(&bedata->pvcallss_lock);
+ if (notify)
+ notify_remote_via_irq(bedata->irq);
+
+ wait_event(bedata->inflight_req,
+ READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
+
+ if (map->active_socket) {
+ /*
+ * Set in_error and wake up inflight_conn_req to force
+ * recvmsg waiters to exit.
+ */
+ map->active.ring->in_error = -EBADF;
+ wake_up_interruptible(&map->active.inflight_conn_req);
+
+ mutex_lock(&map->active.in_mutex);
+ mutex_lock(&map->active.out_mutex);
+ pvcalls_front_free_map(bedata, map);
+ mutex_unlock(&map->active.out_mutex);
+ mutex_unlock(&map->active.in_mutex);
+ kfree(map);
+ } else {
+ spin_lock(&bedata->pvcallss_lock);
+ list_del_init(&map->list);
+ kfree(map);
+ spin_unlock(&bedata->pvcallss_lock);
+ }
+ WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
+
+ return 0;
+}
+
static const struct xenbus_device_id pvcalls_front_ids[] = {
{ "pvcalls" },
{ "" }
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index 25e05b8..3332978 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -23,5 +23,6 @@ int pvcalls_front_recvmsg(struct socket *sock,
unsigned int pvcalls_front_poll(struct file *file,
struct socket *sock,
poll_table *wait);
+int pvcalls_front_release(struct socket *sock);
#endif
--
1.9.1
Send PVCALLS_LISTEN to the backend.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 48 +++++++++++++++++++++++++++++++++++++++++++++
drivers/xen/pvcalls-front.h | 1 +
2 files changed, 49 insertions(+)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index af2ce20..3b5d50e 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -365,6 +365,54 @@ int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
return 0;
}
+int pvcalls_front_listen(struct socket *sock, int backlog)
+{
+ struct pvcalls_bedata *bedata;
+ struct sock_mapping *map;
+ struct xen_pvcalls_request *req;
+ int notify, req_id, ret;
+
+ if (!pvcalls_front_dev)
+ return -ENOTCONN;
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+ map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
+ if (!map)
+ return -ENOTSOCK;
+
+ if (map->passive.status != PVCALLS_STATUS_BIND)
+ return -EOPNOTSUPP;
+
+ spin_lock(&bedata->pvcallss_lock);
+ req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
+ if (RING_FULL(&bedata->ring) ||
+ bedata->rsp[req_id].req_id != PVCALLS_INVALID_ID) {
+ spin_unlock(&bedata->pvcallss_lock);
+ return -EAGAIN;
+ }
+ req = RING_GET_REQUEST(&bedata->ring, req_id);
+ req->req_id = req_id;
+ req->cmd = PVCALLS_LISTEN;
+ req->u.listen.id = (uint64_t) sock;
+ req->u.listen.backlog = backlog;
+
+ bedata->ring.req_prod_pvt++;
+ RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
+ spin_unlock(&bedata->pvcallss_lock);
+ if (notify)
+ notify_remote_via_irq(bedata->irq);
+
+ wait_event(bedata->inflight_req,
+ READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
+
+ map->passive.status = PVCALLS_STATUS_LISTEN;
+ ret = bedata->rsp[req_id].ret;
+ /* read ret, then set this rsp slot to be reused */
+ smp_mb();
+ WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
+ return ret;
+}
+
static const struct xenbus_device_id pvcalls_front_ids[] = {
{ "pvcalls" },
{ "" }
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index 8b0a274..aa8fe10 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -9,5 +9,6 @@ int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
int pvcalls_front_bind(struct socket *sock,
struct sockaddr *addr,
int addr_len);
+int pvcalls_front_listen(struct socket *sock, int backlog);
#endif
--
1.9.1
Implement recvmsg by copying data from the "in" ring. If not enough data
is available and the recvmsg call is blocking, then wait on the
inflight_conn_req waitqueue. Take the active socket in_mutex so that
only one function can access the ring at any given time.
If not enough data is available on the ring, rather than returning
immediately or sleep-waiting, spin for up to 5000 cycles. This small
optimization turns out to improve performance and latency significantly.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 106 ++++++++++++++++++++++++++++++++++++++++++++
drivers/xen/pvcalls-front.h | 4 ++
2 files changed, 110 insertions(+)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index d8ed280..b4ca569 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -96,6 +96,20 @@ static int pvcalls_front_write_todo(struct sock_mapping *map)
return size - pvcalls_queued(prod, cons, size);
}
+static bool pvcalls_front_read_todo(struct sock_mapping *map)
+{
+ struct pvcalls_data_intf *intf = map->active.ring;
+ RING_IDX cons, prod;
+ int32_t error;
+
+ cons = intf->in_cons;
+ prod = intf->in_prod;
+ error = intf->in_error;
+ return (error != 0 ||
+ pvcalls_queued(prod, cons,
+ XEN_FLEX_RING_SIZE(intf->ring_order))) != 0;
+}
+
static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
{
struct xenbus_device *dev = dev_id;
@@ -418,6 +432,98 @@ int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg,
return tot_sent;
}
+static int __read_ring(struct pvcalls_data_intf *intf,
+ struct pvcalls_data *data,
+ struct iov_iter *msg_iter,
+ size_t len, int flags)
+{
+ RING_IDX cons, prod, size, masked_prod, masked_cons;
+ RING_IDX array_size = XEN_FLEX_RING_SIZE(intf->ring_order);
+ int32_t error;
+
+ cons = intf->in_cons;
+ prod = intf->in_prod;
+ error = intf->in_error;
+ /* get pointers before reading from the ring */
+ virt_rmb();
+ if (error < 0)
+ return error;
+
+ size = pvcalls_queued(prod, cons, array_size);
+ masked_prod = pvcalls_mask(prod, array_size);
+ masked_cons = pvcalls_mask(cons, array_size);
+
+ if (size == 0)
+ return 0;
+
+ if (len > size)
+ len = size;
+
+ if (masked_prod > masked_cons) {
+ copy_to_iter(data->in + masked_cons, len, msg_iter);
+ } else {
+ if (len > (array_size - masked_cons)) {
+ copy_to_iter(data->in + masked_cons,
+ array_size - masked_cons, msg_iter);
+ copy_to_iter(data->in,
+ len - (array_size - masked_cons),
+ msg_iter);
+ } else {
+ copy_to_iter(data->in + masked_cons, len, msg_iter);
+ }
+ }
+ /* read data from the ring before increasing the index */
+ virt_mb();
+ if (!(flags & MSG_PEEK))
+ intf->in_cons += len;
+
+ return len;
+}
+
+int pvcalls_front_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+ int flags)
+{
+ struct pvcalls_bedata *bedata;
+ int ret = -EAGAIN;
+ struct sock_mapping *map;
+ int count = 0;
+
+ if (!pvcalls_front_dev)
+ return -ENOTCONN;
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+ map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
+ if (!map)
+ return -ENOTSOCK;
+
+ if (flags & (MSG_CMSG_CLOEXEC|MSG_ERRQUEUE|MSG_OOB|MSG_TRUNC))
+ return -EOPNOTSUPP;
+
+ mutex_lock(&map->active.in_mutex);
+ if (len > XEN_FLEX_RING_SIZE(map->active.ring->ring_order))
+ len = XEN_FLEX_RING_SIZE(map->active.ring->ring_order);
+
+ while (!(flags & MSG_DONTWAIT) && !pvcalls_front_read_todo(map)) {
+ if (count < PVCALLS_FRONT_MAX_SPIN)
+ count++;
+ else
+ wait_event_interruptible(map->active.inflight_conn_req,
+ pvcalls_front_read_todo(map));
+ }
+ ret = __read_ring(map->active.ring, &map->active.data,
+ &msg->msg_iter, len, flags);
+
+ if (ret > 0)
+ notify_remote_via_irq(map->active.irq);
+ if (ret == 0)
+ ret = -EAGAIN;
+ if (ret == -ENOTCONN)
+ ret = 0;
+
+ mutex_unlock(&map->active.in_mutex);
+ return ret;
+}
+
int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
{
struct pvcalls_bedata *bedata;
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index d937c24..de24041 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -16,5 +16,9 @@ int pvcalls_front_accept(struct socket *sock,
int pvcalls_front_sendmsg(struct socket *sock,
struct msghdr *msg,
size_t len);
+int pvcalls_front_recvmsg(struct socket *sock,
+ struct msghdr *msg,
+ size_t len,
+ int flags);
#endif
--
1.9.1
For active sockets, check the indexes and use the inflight_conn_req
waitqueue to wait.
For passive sockets, send PVCALLS_POLL to the backend. Use the
inflight_accept_req waitqueue if an accept is outstanding. Otherwise use
the inflight_req waitqueue: inflight_req is awaken when a new response
is received; on wakeup we check whether the POLL response is arrived by
looking at the PVCALLS_FLAG_POLL_RET flag. We set the flag from
pvcalls_front_event_handler, if the response was for a POLL command.
In pvcalls_front_event_handler, get the struct socket pointer from the
poll id (we previously converted struct socket* to uint64_t and used it
as id).
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 134 ++++++++++++++++++++++++++++++++++++++++----
drivers/xen/pvcalls-front.h | 3 +
2 files changed, 126 insertions(+), 11 deletions(-)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index b4ca569..833b717 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -130,17 +130,35 @@ static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
rsp = RING_GET_RESPONSE(&bedata->ring, bedata->ring.rsp_cons);
req_id = rsp->req_id;
- src = (uint8_t *)&bedata->rsp[req_id];
- src += sizeof(rsp->req_id);
- dst = (uint8_t *)rsp;
- dst += sizeof(rsp->req_id);
- memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
- /*
- * First copy the rest of the data, then req_id. It is
- * paired with the barrier when accessing bedata->rsp.
- */
- smp_wmb();
- WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
+ if (rsp->cmd == PVCALLS_POLL) {
+ struct socket *sock = (struct socket *) rsp->u.poll.id;
+ struct sock_mapping *map =
+ (struct sock_mapping *)
+ READ_ONCE(sock->sk->sk_send_head);
+
+ set_bit(PVCALLS_FLAG_POLL_RET,
+ (void *)&map->passive.flags);
+ /*
+ * Set RET, then clear INFLIGHT. It pairs with
+ * the checks at the beginning of
+ * pvcalls_front_poll_passive.
+ */
+ smp_wmb();
+ clear_bit(PVCALLS_FLAG_POLL_INFLIGHT,
+ (void *)&map->passive.flags);
+ } else {
+ src = (uint8_t *)&bedata->rsp[req_id];
+ src += sizeof(rsp->req_id);
+ dst = (uint8_t *)rsp;
+ dst += sizeof(rsp->req_id);
+ memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
+ /*
+ * First copy the rest of the data, then req_id. It is
+ * paired with the barrier when accessing bedata->rsp.
+ */
+ smp_wmb();
+ WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
+ }
done = 1;
bedata->ring.rsp_cons++;
@@ -707,6 +725,100 @@ int pvcalls_front_accept(struct socket *sock, struct socket *newsock, int flags)
return ret;
}
+static unsigned int pvcalls_front_poll_passive(struct file *file,
+ struct pvcalls_bedata *bedata,
+ struct sock_mapping *map,
+ poll_table *wait)
+{
+ int notify, req_id;
+ struct xen_pvcalls_request *req;
+
+ if (test_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
+ (void *)&map->passive.flags)) {
+ poll_wait(file, &map->passive.inflight_accept_req, wait);
+ return 0;
+ }
+
+ if (test_and_clear_bit(PVCALLS_FLAG_POLL_RET,
+ (void *)&map->passive.flags))
+ return POLLIN;
+
+ /*
+ * First check RET, then INFLIGHT. No barriers necessary to
+ * ensure execution ordering because of the conditional
+ * instructions creating control dependencies.
+ */
+
+ if (test_and_set_bit(PVCALLS_FLAG_POLL_INFLIGHT,
+ (void *)&map->passive.flags)) {
+ poll_wait(file, &bedata->inflight_req, wait);
+ return 0;
+ }
+
+ spin_lock(&bedata->pvcallss_lock);
+ req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
+ if (RING_FULL(&bedata->ring) ||
+ READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
+ spin_unlock(&bedata->pvcallss_lock);
+ return -EAGAIN;
+ }
+ req = RING_GET_REQUEST(&bedata->ring, req_id);
+ req->req_id = req_id;
+ req->cmd = PVCALLS_POLL;
+ req->u.poll.id = (uint64_t) map->sock;
+
+ bedata->ring.req_prod_pvt++;
+ RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
+ spin_unlock(&bedata->pvcallss_lock);
+ if (notify)
+ notify_remote_via_irq(bedata->irq);
+
+ poll_wait(file, &bedata->inflight_req, wait);
+ return 0;
+}
+
+static unsigned int pvcalls_front_poll_active(struct file *file,
+ struct pvcalls_bedata *bedata,
+ struct sock_mapping *map,
+ poll_table *wait)
+{
+ unsigned int mask = 0;
+ int32_t in_error, out_error;
+ struct pvcalls_data_intf *intf = map->active.ring;
+
+ out_error = intf->out_error;
+ in_error = intf->in_error;
+
+ poll_wait(file, &map->active.inflight_conn_req, wait);
+ if (pvcalls_front_write_todo(map))
+ mask |= POLLOUT | POLLWRNORM;
+ if (pvcalls_front_read_todo(map))
+ mask |= POLLIN | POLLRDNORM;
+ if (in_error != 0 || out_error != 0)
+ mask |= POLLERR;
+
+ return mask;
+}
+
+unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
+ poll_table *wait)
+{
+ struct pvcalls_bedata *bedata;
+ struct sock_mapping *map;
+
+ if (!pvcalls_front_dev)
+ return POLLNVAL;
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+ map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
+ if (!map)
+ return POLLNVAL;
+ if (map->active_socket)
+ return pvcalls_front_poll_active(file, bedata, map, wait);
+ else
+ return pvcalls_front_poll_passive(file, bedata, map, wait);
+}
+
static const struct xenbus_device_id pvcalls_front_ids[] = {
{ "pvcalls" },
{ "" }
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index de24041..25e05b8 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -20,5 +20,8 @@ int pvcalls_front_recvmsg(struct socket *sock,
struct msghdr *msg,
size_t len,
int flags);
+unsigned int pvcalls_front_poll(struct file *file,
+ struct socket *sock,
+ poll_table *wait);
#endif
--
1.9.1
Send PVCALLS_BIND to the backend. Introduce a new structure, part of
struct sock_mapping, to store information specific to passive sockets.
Introduce a status field to keep track of the status of the passive
socket.
Introduce a waitqueue for the "accept" command (see the accept command
implementation): it is used to allow only one outstanding accept
command at any given time and to implement polling on the passive
socket. Introduce a flags field to keep track of in-flight accept and
poll commands.
sock->sk->sk_send_head is not used for ip sockets: reuse the field to
store a pointer to the struct sock_mapping corresponding to the socket.
Convert the struct socket pointer into an uint64_t and use it as id for
the socket to pass to the backend.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 73 +++++++++++++++++++++++++++++++++++++++++++++
drivers/xen/pvcalls-front.h | 3 ++
2 files changed, 76 insertions(+)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index d0f5f42..af2ce20 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -59,6 +59,23 @@ struct sock_mapping {
wait_queue_head_t inflight_conn_req;
} active;
+ struct {
+ /* Socket status */
+#define PVCALLS_STATUS_UNINITALIZED 0
+#define PVCALLS_STATUS_BIND 1
+#define PVCALLS_STATUS_LISTEN 2
+ uint8_t status;
+ /*
+ * Internal state-machine flags.
+ * Only one accept operation can be inflight for a socket.
+ * Only one poll operation can be inflight for a given socket.
+ */
+#define PVCALLS_FLAG_ACCEPT_INFLIGHT 0
+#define PVCALLS_FLAG_POLL_INFLIGHT 1
+#define PVCALLS_FLAG_POLL_RET 2
+ uint8_t flags;
+ wait_queue_head_t inflight_accept_req;
+ } passive;
};
};
@@ -292,6 +309,62 @@ int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
return ret;
}
+int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
+{
+ struct pvcalls_bedata *bedata;
+ struct sock_mapping *map = NULL;
+ struct xen_pvcalls_request *req;
+ int notify, req_id, ret;
+
+ if (!pvcalls_front_dev)
+ return -ENOTCONN;
+ if (addr->sa_family != AF_INET || sock->type != SOCK_STREAM)
+ return -ENOTSUPP;
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+ map = kzalloc(sizeof(*map), GFP_KERNEL);
+ if (map == NULL)
+ return -ENOMEM;
+
+ spin_lock(&bedata->pvcallss_lock);
+ req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
+ if (RING_FULL(&bedata->ring) ||
+ READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
+ kfree(map);
+ spin_unlock(&bedata->pvcallss_lock);
+ return -EAGAIN;
+ }
+ req = RING_GET_REQUEST(&bedata->ring, req_id);
+ req->req_id = req_id;
+ map->sock = sock;
+ req->cmd = PVCALLS_BIND;
+ req->u.bind.id = (uint64_t) sock;
+ memcpy(req->u.bind.addr, addr, sizeof(*addr));
+ req->u.bind.len = addr_len;
+
+ init_waitqueue_head(&map->passive.inflight_accept_req);
+
+ list_add_tail(&map->list, &bedata->socketpass_mappings);
+ WRITE_ONCE(sock->sk->sk_send_head, (void *)map);
+ map->active_socket = false;
+
+ bedata->ring.req_prod_pvt++;
+ RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
+ spin_unlock(&bedata->pvcallss_lock);
+ if (notify)
+ notify_remote_via_irq(bedata->irq);
+
+ wait_event(bedata->inflight_req,
+ READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
+
+ map->passive.status = PVCALLS_STATUS_BIND;
+ ret = bedata->rsp[req_id].ret;
+ /* read ret, then set this rsp slot to be reused */
+ smp_mb();
+ WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
+ return 0;
+}
+
static const struct xenbus_device_id pvcalls_front_ids[] = {
{ "pvcalls" },
{ "" }
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index 63b0417..8b0a274 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -6,5 +6,8 @@
int pvcalls_front_socket(struct socket *sock);
int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
int addr_len, int flags);
+int pvcalls_front_bind(struct socket *sock,
+ struct sockaddr *addr,
+ int addr_len);
#endif
--
1.9.1
Send PVCALLS_CONNECT to the backend. Allocate a new ring and evtchn for
the active socket.
Introduce a data structure to keep track of sockets. Introduce a
waitqueue to allow the frontend to wait on data coming from the backend
on the active socket (recvmsg command).
Two mutexes (one of reads and one for writes) will be used to protect
the active socket in and out rings from concurrent accesses.
sock->sk->sk_send_head is not used for ip sockets: reuse the field to
store a pointer to the struct sock_mapping corresponding to the socket.
This way, we can easily get the struct sock_mapping from the struct
socket.
Convert the struct socket pointer into an uint64_t and use it as id for
the new socket to pass to the backend.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 177 +++++++++++++++++++++++++++++++++++++++++---
drivers/xen/pvcalls-front.h | 2 +
2 files changed, 168 insertions(+), 11 deletions(-)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index d1dbcf1..d0f5f42 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -13,6 +13,10 @@
*/
#include <linux/module.h>
+#include <linux/net.h>
+#include <linux/socket.h>
+
+#include <net/sock.h>
#include <xen/events.h>
#include <xen/grant_table.h>
@@ -40,6 +44,24 @@ struct pvcalls_bedata {
};
struct xenbus_device *pvcalls_front_dev;
+struct sock_mapping {
+ bool active_socket;
+ struct list_head list;
+ struct socket *sock;
+ union {
+ struct {
+ int irq;
+ grant_ref_t ref;
+ struct pvcalls_data_intf *ring;
+ struct pvcalls_data data;
+ struct mutex in_mutex;
+ struct mutex out_mutex;
+
+ wait_queue_head_t inflight_conn_req;
+ } active;
+ };
+};
+
static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
{
struct xenbus_device *dev = dev_id;
@@ -84,6 +106,18 @@ static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
return IRQ_HANDLED;
}
+static irqreturn_t pvcalls_front_conn_handler(int irq, void *sock_map)
+{
+ struct sock_mapping *map = sock_map;
+
+ if (map == NULL)
+ return IRQ_HANDLED;
+
+ wake_up_interruptible(&map->active.inflight_conn_req);
+
+ return IRQ_HANDLED;
+}
+
int pvcalls_front_socket(struct socket *sock)
{
struct pvcalls_bedata *bedata;
@@ -137,6 +171,127 @@ int pvcalls_front_socket(struct socket *sock)
return ret;
}
+static struct sock_mapping *create_active(int *evtchn)
+{
+ struct sock_mapping *map = NULL;
+ void *bytes;
+ int ret, irq = -1, i;
+
+ map = kzalloc(sizeof(*map), GFP_KERNEL);
+ if (map == NULL)
+ return NULL;
+
+ init_waitqueue_head(&map->active.inflight_conn_req);
+
+ map->active.ring = (struct pvcalls_data_intf *)
+ __get_free_page(GFP_KERNEL | __GFP_ZERO);
+ if (map->active.ring == NULL)
+ goto out_error;
+ memset(map->active.ring, 0, XEN_PAGE_SIZE);
+ map->active.ring->ring_order = RING_ORDER;
+ bytes = (void *)__get_free_pages(GFP_KERNEL | __GFP_ZERO,
+ map->active.ring->ring_order);
+ if (bytes == NULL)
+ goto out_error;
+ for (i = 0; i < (1 << map->active.ring->ring_order); i++)
+ map->active.ring->ref[i] = gnttab_grant_foreign_access(
+ pvcalls_front_dev->otherend_id,
+ pfn_to_gfn(virt_to_pfn(bytes) + i), 0);
+
+ map->active.ref = gnttab_grant_foreign_access(
+ pvcalls_front_dev->otherend_id,
+ pfn_to_gfn(virt_to_pfn((void *)map->active.ring)), 0);
+
+ map->active.data.in = bytes;
+ map->active.data.out = bytes +
+ XEN_FLEX_RING_SIZE(map->active.ring->ring_order);
+
+ ret = xenbus_alloc_evtchn(pvcalls_front_dev, evtchn);
+ if (ret)
+ goto out_error;
+ irq = bind_evtchn_to_irqhandler(*evtchn, pvcalls_front_conn_handler,
+ 0, "pvcalls-frontend", map);
+ if (irq < 0)
+ goto out_error;
+
+ map->active.irq = irq;
+ map->active_socket = true;
+ mutex_init(&map->active.in_mutex);
+ mutex_init(&map->active.out_mutex);
+
+ return map;
+
+out_error:
+ if (irq >= 0)
+ unbind_from_irqhandler(irq, map);
+ else if (*evtchn >= 0)
+ xenbus_free_evtchn(pvcalls_front_dev, *evtchn);
+ kfree(map->active.data.in);
+ kfree(map->active.ring);
+ kfree(map);
+ return NULL;
+}
+
+int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
+{
+ struct pvcalls_bedata *bedata;
+ struct sock_mapping *map = NULL;
+ struct xen_pvcalls_request *req;
+ int notify, req_id, ret, evtchn;
+
+ if (!pvcalls_front_dev)
+ return -ENETUNREACH;
+ if (addr->sa_family != AF_INET || sock->type != SOCK_STREAM)
+ return -ENOTSUPP;
+
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+ spin_lock(&bedata->pvcallss_lock);
+ req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
+ if (RING_FULL(&bedata->ring) ||
+ READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
+ spin_unlock(&bedata->pvcallss_lock);
+ return -EAGAIN;
+ }
+
+ map = create_active(&evtchn);
+ if (!map) {
+ spin_unlock(&bedata->pvcallss_lock);
+ return -ENOMEM;
+ }
+
+ req = RING_GET_REQUEST(&bedata->ring, req_id);
+ req->req_id = req_id;
+ req->cmd = PVCALLS_CONNECT;
+ req->u.connect.id = (uint64_t)sock;
+ memcpy(req->u.connect.addr, addr, sizeof(*addr));
+ req->u.connect.len = addr_len;
+ req->u.connect.flags = flags;
+ req->u.connect.ref = map->active.ref;
+ req->u.connect.evtchn = evtchn;
+
+ list_add_tail(&map->list, &bedata->socket_mappings);
+ map->sock = sock;
+ WRITE_ONCE(sock->sk->sk_send_head, (void *)map);
+
+ bedata->ring.req_prod_pvt++;
+ RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
+ spin_unlock(&bedata->pvcallss_lock);
+
+ if (notify)
+ notify_remote_via_irq(bedata->irq);
+
+ wait_event(bedata->inflight_req,
+ READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
+
+ ret = bedata->rsp[req_id].ret;
+ /* read ret, then set this rsp slot to be reused */
+ smp_mb();
+ WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
+ return ret;
+}
+
static const struct xenbus_device_id pvcalls_front_ids[] = {
{ "pvcalls" },
{ "" }
@@ -150,7 +305,7 @@ static int pvcalls_front_remove(struct xenbus_device *dev)
static int pvcalls_front_probe(struct xenbus_device *dev,
const struct xenbus_device_id *id)
{
- int ret = -EFAULT, evtchn, ref = -1, i;
+ int ret = -ENOMEM, evtchn, ref = -1, i;
unsigned int max_page_order, function_calls, len;
char *versions;
grant_ref_t gref_head = 0;
@@ -171,15 +326,13 @@ static int pvcalls_front_probe(struct xenbus_device *dev,
return -EINVAL;
}
kfree(versions);
- ret = xenbus_scanf(XBT_NIL, dev->otherend,
- "max-page-order", "%u", &max_page_order);
- if (ret <= 0)
- return -ENODEV;
+ max_page_order = xenbus_read_unsigned(dev->otherend,
+ "max-page-order", 0);
if (max_page_order < RING_ORDER)
return -ENODEV;
- ret = xenbus_scanf(XBT_NIL, dev->otherend,
- "function-calls", "%u", &function_calls);
- if (ret <= 0 || function_calls != 1)
+ function_calls = xenbus_read_unsigned(dev->otherend,
+ "function-calls", 0);
+ if (function_calls != 1)
return -ENODEV;
pr_info("%s max-page-order is %u\n", __func__, max_page_order);
@@ -187,6 +340,8 @@ static int pvcalls_front_probe(struct xenbus_device *dev,
if (!bedata)
return -ENOMEM;
+ dev_set_drvdata(&dev->dev, bedata);
+ pvcalls_front_dev = dev;
init_waitqueue_head(&bedata->inflight_req);
for (i = 0; i < PVCALLS_NR_REQ_PER_RING; i++)
bedata->rsp[i].req_id = PVCALLS_INVALID_ID;
@@ -214,8 +369,10 @@ static int pvcalls_front_probe(struct xenbus_device *dev,
if (ret < 0)
goto error;
bedata->ref = ref = gnttab_claim_grant_reference(&gref_head);
- if (ref < 0)
+ if (ref < 0) {
+ ret = ref;
goto error;
+ }
gnttab_grant_foreign_access_ref(ref, dev->otherend_id,
virt_to_gfn((void *)sring), 0);
@@ -246,8 +403,6 @@ static int pvcalls_front_probe(struct xenbus_device *dev,
INIT_LIST_HEAD(&bedata->socket_mappings);
INIT_LIST_HEAD(&bedata->socketpass_mappings);
spin_lock_init(&bedata->pvcallss_lock);
- dev_set_drvdata(&dev->dev, bedata);
- pvcalls_front_dev = dev;
xenbus_switch_state(dev, XenbusStateInitialised);
return 0;
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index b7dabed..63b0417 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -4,5 +4,7 @@
#include <linux/net.h>
int pvcalls_front_socket(struct socket *sock);
+int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags);
#endif
--
1.9.1
Send a PVCALLS_SOCKET command to the backend, use the masked
req_prod_pvt as req_id. This way, req_id is guaranteed to be between 0
and PVCALLS_NR_REQ_PER_RING. We already have a slot in the rsp array
ready for the response, and there cannot be two outstanding responses
with the same req_id.
Wait for the response by waiting on the inflight_req waitqueue and
check for the req_id field in rsp[req_id]. Use atomic accesses to
read the field. Once a response is received, clear the corresponding rsp
slot by setting req_id to PVCALLS_INVALID_ID. Note that
PVCALLS_INVALID_ID is invalid only from the frontend point of view. It
is not part of the PVCalls protocol.
pvcalls_front_event_handler is in charge of copying responses from the
ring to the appropriate rsp slot. It is done by copying the body of the
response first, then by copying req_id atomically. After the copies,
wake up anybody waiting on waitqueue.
pvcallss_lock protects accesses to the ring.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 94 +++++++++++++++++++++++++++++++++++++++++++++
drivers/xen/pvcalls-front.h | 8 ++++
2 files changed, 102 insertions(+)
create mode 100644 drivers/xen/pvcalls-front.h
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index 5e0b265..d1dbcf1 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -20,6 +20,8 @@
#include <xen/xenbus.h>
#include <xen/interface/io/pvcalls.h>
+#include "pvcalls-front.h"
+
#define PVCALLS_INVALID_ID (UINT_MAX)
#define RING_ORDER XENBUS_MAX_RING_GRANT_ORDER
#define PVCALLS_NR_REQ_PER_RING __CONST_RING_SIZE(xen_pvcalls, XEN_PAGE_SIZE)
@@ -40,9 +42,101 @@ struct pvcalls_bedata {
static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
{
+ struct xenbus_device *dev = dev_id;
+ struct pvcalls_bedata *bedata;
+ struct xen_pvcalls_response *rsp;
+ uint8_t *src, *dst;
+ int req_id = 0, more = 0, done = 0;
+
+ if (dev == NULL)
+ return IRQ_HANDLED;
+
+ bedata = dev_get_drvdata(&dev->dev);
+ if (bedata == NULL)
+ return IRQ_HANDLED;
+
+again:
+ while (RING_HAS_UNCONSUMED_RESPONSES(&bedata->ring)) {
+ rsp = RING_GET_RESPONSE(&bedata->ring, bedata->ring.rsp_cons);
+
+ req_id = rsp->req_id;
+ src = (uint8_t *)&bedata->rsp[req_id];
+ src += sizeof(rsp->req_id);
+ dst = (uint8_t *)rsp;
+ dst += sizeof(rsp->req_id);
+ memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
+ /*
+ * First copy the rest of the data, then req_id. It is
+ * paired with the barrier when accessing bedata->rsp.
+ */
+ smp_wmb();
+ WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
+
+ done = 1;
+ bedata->ring.rsp_cons++;
+ }
+
+ RING_FINAL_CHECK_FOR_RESPONSES(&bedata->ring, more);
+ if (more)
+ goto again;
+ if (done)
+ wake_up(&bedata->inflight_req);
return IRQ_HANDLED;
}
+int pvcalls_front_socket(struct socket *sock)
+{
+ struct pvcalls_bedata *bedata;
+ struct xen_pvcalls_request *req;
+ int notify, req_id, ret;
+
+ if (!pvcalls_front_dev)
+ return -EACCES;
+ /*
+ * PVCalls only supports domain AF_INET,
+ * type SOCK_STREAM and protocol 0 sockets for now.
+ *
+ * Check socket type here, AF_INET and protocol checks are done
+ * by the caller.
+ */
+ if (sock->type != SOCK_STREAM)
+ return -ENOTSUPP;
+
+ bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+ spin_lock(&bedata->pvcallss_lock);
+ req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
+ if (RING_FULL(&bedata->ring) ||
+ READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
+ spin_unlock(&bedata->pvcallss_lock);
+ return -EAGAIN;
+ }
+ req = RING_GET_REQUEST(&bedata->ring, req_id);
+ req->req_id = req_id;
+ req->cmd = PVCALLS_SOCKET;
+ req->u.socket.id = (uint64_t) sock;
+ req->u.socket.domain = AF_INET;
+ req->u.socket.type = SOCK_STREAM;
+ req->u.socket.protocol = 0;
+
+ bedata->ring.req_prod_pvt++;
+ RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
+ spin_unlock(&bedata->pvcallss_lock);
+ if (notify)
+ notify_remote_via_irq(bedata->irq);
+
+ if (wait_event_interruptible(bedata->inflight_req,
+ READ_ONCE(bedata->rsp[req_id].req_id) == req_id) != 0)
+ return -EINTR;
+
+ ret = bedata->rsp[req_id].ret;
+ /* read ret, then set this rsp slot to be reused */
+ smp_mb();
+ WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
+
+ return ret;
+}
+
static const struct xenbus_device_id pvcalls_front_ids[] = {
{ "pvcalls" },
{ "" }
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
new file mode 100644
index 0000000..b7dabed
--- /dev/null
+++ b/drivers/xen/pvcalls-front.h
@@ -0,0 +1,8 @@
+#ifndef __PVCALLS_FRONT_H__
+#define __PVCALLS_FRONT_H__
+
+#include <linux/net.h>
+
+int pvcalls_front_socket(struct socket *sock);
+
+#endif
--
1.9.1
Implement the probe function for the pvcalls frontend. Read the
supported versions, max-page-order and function-calls nodes from
xenstore.
Introduce a data structure named pvcalls_bedata. It contains pointers to
the command ring, the event channel, a list of active sockets and a list
of passive sockets. Lists accesses are protected by a spin_lock.
Introduce a waitqueue to allow waiting for a response on commands sent
to the backend.
Introduce an array of struct xen_pvcalls_response to store commands
responses.
Only one frontend<->backend connection is supported at any given time
for a guest. Store the active frontend device to a static pointer.
Introduce a stub functions for the event handler.
Signed-off-by: Stefano Stabellini <[email protected]>
CC: [email protected]
CC: [email protected]
---
drivers/xen/pvcalls-front.c | 153 ++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 153 insertions(+)
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index a8d38c2..5e0b265 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -20,6 +20,29 @@
#include <xen/xenbus.h>
#include <xen/interface/io/pvcalls.h>
+#define PVCALLS_INVALID_ID (UINT_MAX)
+#define RING_ORDER XENBUS_MAX_RING_GRANT_ORDER
+#define PVCALLS_NR_REQ_PER_RING __CONST_RING_SIZE(xen_pvcalls, XEN_PAGE_SIZE)
+
+struct pvcalls_bedata {
+ struct xen_pvcalls_front_ring ring;
+ grant_ref_t ref;
+ int irq;
+
+ struct list_head socket_mappings;
+ struct list_head socketpass_mappings;
+ spinlock_t pvcallss_lock;
+
+ wait_queue_head_t inflight_req;
+ struct xen_pvcalls_response rsp[PVCALLS_NR_REQ_PER_RING];
+};
+struct xenbus_device *pvcalls_front_dev;
+
+static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
+{
+ return IRQ_HANDLED;
+}
+
static const struct xenbus_device_id pvcalls_front_ids[] = {
{ "pvcalls" },
{ "" }
@@ -33,12 +56,142 @@ static int pvcalls_front_remove(struct xenbus_device *dev)
static int pvcalls_front_probe(struct xenbus_device *dev,
const struct xenbus_device_id *id)
{
+ int ret = -EFAULT, evtchn, ref = -1, i;
+ unsigned int max_page_order, function_calls, len;
+ char *versions;
+ grant_ref_t gref_head = 0;
+ struct xenbus_transaction xbt;
+ struct pvcalls_bedata *bedata = NULL;
+ struct xen_pvcalls_sring *sring;
+
+ if (pvcalls_front_dev != NULL) {
+ dev_err(&dev->dev, "only one PV Calls connection supported\n");
+ return -EINVAL;
+ }
+
+ versions = xenbus_read(XBT_NIL, dev->otherend, "versions", &len);
+ if (!len)
+ return -EINVAL;
+ if (strcmp(versions, "1")) {
+ kfree(versions);
+ return -EINVAL;
+ }
+ kfree(versions);
+ ret = xenbus_scanf(XBT_NIL, dev->otherend,
+ "max-page-order", "%u", &max_page_order);
+ if (ret <= 0)
+ return -ENODEV;
+ if (max_page_order < RING_ORDER)
+ return -ENODEV;
+ ret = xenbus_scanf(XBT_NIL, dev->otherend,
+ "function-calls", "%u", &function_calls);
+ if (ret <= 0 || function_calls != 1)
+ return -ENODEV;
+ pr_info("%s max-page-order is %u\n", __func__, max_page_order);
+
+ bedata = kzalloc(sizeof(struct pvcalls_bedata), GFP_KERNEL);
+ if (!bedata)
+ return -ENOMEM;
+
+ init_waitqueue_head(&bedata->inflight_req);
+ for (i = 0; i < PVCALLS_NR_REQ_PER_RING; i++)
+ bedata->rsp[i].req_id = PVCALLS_INVALID_ID;
+
+ sring = (struct xen_pvcalls_sring *) __get_free_page(GFP_KERNEL |
+ __GFP_ZERO);
+ if (!sring)
+ goto error;
+ SHARED_RING_INIT(sring);
+ FRONT_RING_INIT(&bedata->ring, sring, XEN_PAGE_SIZE);
+
+ ret = xenbus_alloc_evtchn(dev, &evtchn);
+ if (ret)
+ goto error;
+
+ bedata->irq = bind_evtchn_to_irqhandler(evtchn,
+ pvcalls_front_event_handler,
+ 0, "pvcalls-frontend", dev);
+ if (bedata->irq < 0) {
+ ret = bedata->irq;
+ goto error;
+ }
+
+ ret = gnttab_alloc_grant_references(1, &gref_head);
+ if (ret < 0)
+ goto error;
+ bedata->ref = ref = gnttab_claim_grant_reference(&gref_head);
+ if (ref < 0)
+ goto error;
+ gnttab_grant_foreign_access_ref(ref, dev->otherend_id,
+ virt_to_gfn((void *)sring), 0);
+
+ again:
+ ret = xenbus_transaction_start(&xbt);
+ if (ret) {
+ xenbus_dev_fatal(dev, ret, "starting transaction");
+ goto error;
+ }
+ ret = xenbus_printf(xbt, dev->nodename, "version", "%u", 1);
+ if (ret)
+ goto error_xenbus;
+ ret = xenbus_printf(xbt, dev->nodename, "ring-ref", "%d", ref);
+ if (ret)
+ goto error_xenbus;
+ ret = xenbus_printf(xbt, dev->nodename, "port", "%u",
+ evtchn);
+ if (ret)
+ goto error_xenbus;
+ ret = xenbus_transaction_end(xbt, 0);
+ if (ret) {
+ if (ret == -EAGAIN)
+ goto again;
+ xenbus_dev_fatal(dev, ret, "completing transaction");
+ goto error;
+ }
+
+ INIT_LIST_HEAD(&bedata->socket_mappings);
+ INIT_LIST_HEAD(&bedata->socketpass_mappings);
+ spin_lock_init(&bedata->pvcallss_lock);
+ dev_set_drvdata(&dev->dev, bedata);
+ pvcalls_front_dev = dev;
+ xenbus_switch_state(dev, XenbusStateInitialised);
+
return 0;
+
+ error_xenbus:
+ xenbus_transaction_end(xbt, 1);
+ xenbus_dev_fatal(dev, ret, "writing xenstore");
+ error:
+ pvcalls_front_remove(dev);
+ return ret;
}
static void pvcalls_front_changed(struct xenbus_device *dev,
enum xenbus_state backend_state)
{
+ switch (backend_state) {
+ case XenbusStateReconfiguring:
+ case XenbusStateReconfigured:
+ case XenbusStateInitialising:
+ case XenbusStateInitialised:
+ case XenbusStateUnknown:
+ break;
+
+ case XenbusStateInitWait:
+ break;
+
+ case XenbusStateConnected:
+ xenbus_switch_state(dev, XenbusStateConnected);
+ break;
+
+ case XenbusStateClosed:
+ if (dev->state == XenbusStateClosed)
+ break;
+ /* Missed the backend's CLOSING state -- fallthrough */
+ case XenbusStateClosing:
+ xenbus_frontend_closed(dev);
+ break;
+ }
}
static struct xenbus_driver pvcalls_front_driver = {
--
1.9.1
On 7/25/2017 5:21 PM, Stefano Stabellini wrote:
> Implement the probe function for the pvcalls frontend. Read the
> supported versions, max-page-order and function-calls nodes from
> xenstore.
>
> Introduce a data structure named pvcalls_bedata. It contains pointers to
> the command ring, the event channel, a list of active sockets and a list
> of passive sockets. Lists accesses are protected by a spin_lock.
>
> Introduce a waitqueue to allow waiting for a response on commands sent
> to the backend.
>
> Introduce an array of struct xen_pvcalls_response to store commands
> responses.
>
> Only one frontend<->backend connection is supported at any given time
> for a guest. Store the active frontend device to a static pointer.
>
> Introduce a stub functions for the event handler.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-front.c | 153 ++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 153 insertions(+)
>
> diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> index a8d38c2..5e0b265 100644
> --- a/drivers/xen/pvcalls-front.c
> +++ b/drivers/xen/pvcalls-front.c
> @@ -20,6 +20,29 @@
> #include <xen/xenbus.h>
> #include <xen/interface/io/pvcalls.h>
>
> +#define PVCALLS_INVALID_ID (UINT_MAX)
Unnecessary parentheses
> +#define RING_ORDER XENBUS_MAX_RING_GRANT_ORDER
PVCALLS_RING_ORDER?
> +#define PVCALLS_NR_REQ_PER_RING __CONST_RING_SIZE(xen_pvcalls, XEN_PAGE_SIZE)
> +
> +struct pvcalls_bedata {
> + struct xen_pvcalls_front_ring ring;
> + grant_ref_t ref;
> + int irq;
> +
> + struct list_head socket_mappings;
> + struct list_head socketpass_mappings;
> + spinlock_t pvcallss_lock;
> +
> + wait_queue_head_t inflight_req;
> + struct xen_pvcalls_response rsp[PVCALLS_NR_REQ_PER_RING];
> +};
> +struct xenbus_device *pvcalls_front_dev;
static
> +
> +static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> +{
> + return IRQ_HANDLED;
> +}
> +
> static const struct xenbus_device_id pvcalls_front_ids[] = {
> { "pvcalls" },
> { "" }
> @@ -33,12 +56,142 @@ static int pvcalls_front_remove(struct xenbus_device *dev)
> static int pvcalls_front_probe(struct xenbus_device *dev,
> const struct xenbus_device_id *id)
> {
> + int ret = -EFAULT, evtchn, ref = -1, i;
> + unsigned int max_page_order, function_calls, len;
> + char *versions;
> + grant_ref_t gref_head = 0;
> + struct xenbus_transaction xbt;
> + struct pvcalls_bedata *bedata = NULL;
> + struct xen_pvcalls_sring *sring;
> +
> + if (pvcalls_front_dev != NULL) {
> + dev_err(&dev->dev, "only one PV Calls connection supported\n");
> + return -EINVAL;
> + }
> +
> + versions = xenbus_read(XBT_NIL, dev->otherend, "versions", &len);
> + if (!len)
> + return -EINVAL;
> + if (strcmp(versions, "1")) {
> + kfree(versions);
> + return -EINVAL;
> + }
> + kfree(versions);
> + ret = xenbus_scanf(XBT_NIL, dev->otherend,
> + "max-page-order", "%u", &max_page_order);
> + if (ret <= 0)
> + return -ENODEV;
> + if (max_page_order < RING_ORDER)
> + return -ENODEV;
> + ret = xenbus_scanf(XBT_NIL, dev->otherend,
> + "function-calls", "%u", &function_calls);
> + if (ret <= 0 || function_calls != 1)
> + return -ENODEV;
> + pr_info("%s max-page-order is %u\n", __func__, max_page_order);
> +
> + bedata = kzalloc(sizeof(struct pvcalls_bedata), GFP_KERNEL);
> + if (!bedata)
> + return -ENOMEM;
> +
> + init_waitqueue_head(&bedata->inflight_req);
> + for (i = 0; i < PVCALLS_NR_REQ_PER_RING; i++)
> + bedata->rsp[i].req_id = PVCALLS_INVALID_ID;
> +
> + sring = (struct xen_pvcalls_sring *) __get_free_page(GFP_KERNEL |
> + __GFP_ZERO);
> + if (!sring)
> + goto error;
> + SHARED_RING_INIT(sring);
> + FRONT_RING_INIT(&bedata->ring, sring, XEN_PAGE_SIZE);
> +
> + ret = xenbus_alloc_evtchn(dev, &evtchn);
> + if (ret)
> + goto error;
> +
> + bedata->irq = bind_evtchn_to_irqhandler(evtchn,
> + pvcalls_front_event_handler,
> + 0, "pvcalls-frontend", dev);
> + if (bedata->irq < 0) {
> + ret = bedata->irq;
> + goto error;
> + }
> +
> + ret = gnttab_alloc_grant_references(1, &gref_head);
> + if (ret < 0)
> + goto error;
> + bedata->ref = ref = gnttab_claim_grant_reference(&gref_head);
Is ref really needed?
> + if (ref < 0)
> + goto error;
> + gnttab_grant_foreign_access_ref(ref, dev->otherend_id,
> + virt_to_gfn((void *)sring), 0);
> +
> + again:
> + ret = xenbus_transaction_start(&xbt);
> + if (ret) {
> + xenbus_dev_fatal(dev, ret, "starting transaction");
> + goto error;
> + }
> + ret = xenbus_printf(xbt, dev->nodename, "version", "%u", 1);
> + if (ret)
> + goto error_xenbus;
> + ret = xenbus_printf(xbt, dev->nodename, "ring-ref", "%d", ref);
> + if (ret)
> + goto error_xenbus;
> + ret = xenbus_printf(xbt, dev->nodename, "port", "%u",
> + evtchn);
> + if (ret)
> + goto error_xenbus;
> + ret = xenbus_transaction_end(xbt, 0);
> + if (ret) {
> + if (ret == -EAGAIN)
> + goto again;
> + xenbus_dev_fatal(dev, ret, "completing transaction");
> + goto error;
> + }
> +
> + INIT_LIST_HEAD(&bedata->socket_mappings);
> + INIT_LIST_HEAD(&bedata->socketpass_mappings);
> + spin_lock_init(&bedata->pvcallss_lock);
> + dev_set_drvdata(&dev->dev, bedata);
> + pvcalls_front_dev = dev;
> + xenbus_switch_state(dev, XenbusStateInitialised);
> +
> return 0;
> +
> + error_xenbus:
> + xenbus_transaction_end(xbt, 1);
> + xenbus_dev_fatal(dev, ret, "writing xenstore");
> + error:
> + pvcalls_front_remove(dev);
I think patch 12 (where you implement cleanup) could be moved before
this one.
I also think you are leaking bedata on error paths.
-boris
> + return ret;
> }
>
> static void pvcalls_front_changed(struct xenbus_device *dev,
> enum xenbus_state backend_state)
> {
> + switch (backend_state) {
> + case XenbusStateReconfiguring:
> + case XenbusStateReconfigured:
> + case XenbusStateInitialising:
> + case XenbusStateInitialised:
> + case XenbusStateUnknown:
> + break;
> +
> + case XenbusStateInitWait:
> + break;
> +
> + case XenbusStateConnected:
> + xenbus_switch_state(dev, XenbusStateConnected);
> + break;
> +
> + case XenbusStateClosed:
> + if (dev->state == XenbusStateClosed)
> + break;
> + /* Missed the backend's CLOSING state -- fallthrough */
> + case XenbusStateClosing:
> + xenbus_frontend_closed(dev);
> + break;
> + }
> }
>
> static struct xenbus_driver pvcalls_front_driver = {
On 7/25/2017 5:22 PM, Stefano Stabellini wrote:
> Send a PVCALLS_SOCKET command to the backend, use the masked
> req_prod_pvt as req_id. This way, req_id is guaranteed to be between 0
> and PVCALLS_NR_REQ_PER_RING. We already have a slot in the rsp array
> ready for the response, and there cannot be two outstanding responses
> with the same req_id.
>
> Wait for the response by waiting on the inflight_req waitqueue and
> check for the req_id field in rsp[req_id]. Use atomic accesses to
> read the field. Once a response is received, clear the corresponding rsp
> slot by setting req_id to PVCALLS_INVALID_ID. Note that
> PVCALLS_INVALID_ID is invalid only from the frontend point of view. It
> is not part of the PVCalls protocol.
>
> pvcalls_front_event_handler is in charge of copying responses from the
> ring to the appropriate rsp slot. It is done by copying the body of the
> response first, then by copying req_id atomically. After the copies,
> wake up anybody waiting on waitqueue.
>
> pvcallss_lock protects accesses to the ring.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-front.c | 94 +++++++++++++++++++++++++++++++++++++++++++++
> drivers/xen/pvcalls-front.h | 8 ++++
> 2 files changed, 102 insertions(+)
> create mode 100644 drivers/xen/pvcalls-front.h
>
> diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> index 5e0b265..d1dbcf1 100644
> --- a/drivers/xen/pvcalls-front.c
> +++ b/drivers/xen/pvcalls-front.c
> @@ -20,6 +20,8 @@
> #include <xen/xenbus.h>
> #include <xen/interface/io/pvcalls.h>
>
> +#include "pvcalls-front.h"
> +
> #define PVCALLS_INVALID_ID (UINT_MAX)
> #define RING_ORDER XENBUS_MAX_RING_GRANT_ORDER
> #define PVCALLS_NR_REQ_PER_RING __CONST_RING_SIZE(xen_pvcalls, XEN_PAGE_SIZE)
> @@ -40,9 +42,101 @@ struct pvcalls_bedata {
>
> static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> {
> + struct xenbus_device *dev = dev_id;
> + struct pvcalls_bedata *bedata;
> + struct xen_pvcalls_response *rsp;
> + uint8_t *src, *dst;
> + int req_id = 0, more = 0, done = 0;
> +
> + if (dev == NULL)
> + return IRQ_HANDLED;
> +
> + bedata = dev_get_drvdata(&dev->dev);
> + if (bedata == NULL)
> + return IRQ_HANDLED;
> +
> +again:
> + while (RING_HAS_UNCONSUMED_RESPONSES(&bedata->ring)) {
> + rsp = RING_GET_RESPONSE(&bedata->ring, bedata->ring.rsp_cons);
> +
> + req_id = rsp->req_id;
> + src = (uint8_t *)&bedata->rsp[req_id];
> + src += sizeof(rsp->req_id);
> + dst = (uint8_t *)rsp;
> + dst += sizeof(rsp->req_id);
These two lines can be combined (both src and dst)
> + memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
> + /*
> + * First copy the rest of the data, then req_id. It is
> + * paired with the barrier when accessing bedata->rsp.
> + */
> + smp_wmb();
> + WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
> +
> + done = 1;
> + bedata->ring.rsp_cons++;
> + }
> +
> + RING_FINAL_CHECK_FOR_RESPONSES(&bedata->ring, more);
> + if (more)
> + goto again;
> + if (done)
> + wake_up(&bedata->inflight_req);
> return IRQ_HANDLED;
> }
>
> +int pvcalls_front_socket(struct socket *sock)
> +{
> + struct pvcalls_bedata *bedata;
> + struct xen_pvcalls_request *req;
> + int notify, req_id, ret;
> +
> + if (!pvcalls_front_dev)
> + return -EACCES;
> + /*
> + * PVCalls only supports domain AF_INET,
> + * type SOCK_STREAM and protocol 0 sockets for now.
> + *
> + * Check socket type here, AF_INET and protocol checks are done
> + * by the caller.
> + */
> + if (sock->type != SOCK_STREAM)
> + return -ENOTSUPP;
> +
> + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> +
> + spin_lock(&bedata->pvcallss_lock);
> + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> + if (RING_FULL(&bedata->ring) ||
> + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> + spin_unlock(&bedata->pvcallss_lock);
> + return -EAGAIN;
> + }
> + req = RING_GET_REQUEST(&bedata->ring, req_id);
> + req->req_id = req_id;
> + req->cmd = PVCALLS_SOCKET;
> + req->u.socket.id = (uint64_t) sock;
> + req->u.socket.domain = AF_INET;
> + req->u.socket.type = SOCK_STREAM;
> + req->u.socket.protocol = 0;
> +
> + bedata->ring.req_prod_pvt++;
> + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> + spin_unlock(&bedata->pvcallss_lock);
> + if (notify)
> + notify_remote_via_irq(bedata->irq);
> +
> + if (wait_event_interruptible(bedata->inflight_req,
> + READ_ONCE(bedata->rsp[req_id].req_id) == req_id) != 0)
"!= 0" can be dropped
-boris
> + return -EINTR;
> +
> + ret = bedata->rsp[req_id].ret;
> + /* read ret, then set this rsp slot to be reused */
> + smp_mb();
> + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> +
> + return ret;
> +}
> +
> static const struct xenbus_device_id pvcalls_front_ids[] = {
> { "pvcalls" },
> { "" }
> diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> new file mode 100644
> index 0000000..b7dabed
> --- /dev/null
> +++ b/drivers/xen/pvcalls-front.h
> @@ -0,0 +1,8 @@
> +#ifndef __PVCALLS_FRONT_H__
> +#define __PVCALLS_FRONT_H__
> +
> +#include <linux/net.h>
> +
> +int pvcalls_front_socket(struct socket *sock);
> +
> +#endif
On 7/25/2017 5:22 PM, Stefano Stabellini wrote:
> Send PVCALLS_CONNECT to the backend. Allocate a new ring and evtchn for
> the active socket.
>
> Introduce a data structure to keep track of sockets. Introduce a
> waitqueue to allow the frontend to wait on data coming from the backend
> on the active socket (recvmsg command).
>
> Two mutexes (one of reads and one for writes) will be used to protect
> the active socket in and out rings from concurrent accesses.
>
> sock->sk->sk_send_head is not used for ip sockets: reuse the field to
> store a pointer to the struct sock_mapping corresponding to the socket.
> This way, we can easily get the struct sock_mapping from the struct
> socket.
This needs to be documented in the code, not (just) in commit message.
>
> Convert the struct socket pointer into an uint64_t and use it as id for
> the new socket to pass to the backend.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-front.c | 177 +++++++++++++++++++++++++++++++++++++++++---
> drivers/xen/pvcalls-front.h | 2 +
> 2 files changed, 168 insertions(+), 11 deletions(-)
>
> diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> index d1dbcf1..d0f5f42 100644
> --- a/drivers/xen/pvcalls-front.c
> +++ b/drivers/xen/pvcalls-front.c
> @@ -13,6 +13,10 @@
> */
>
> #include <linux/module.h>
> +#include <linux/net.h>
> +#include <linux/socket.h>
> +
> +#include <net/sock.h>
>
> #include <xen/events.h>
> #include <xen/grant_table.h>
> @@ -40,6 +44,24 @@ struct pvcalls_bedata {
> };
> struct xenbus_device *pvcalls_front_dev;
>
> +struct sock_mapping {
> + bool active_socket;
> + struct list_head list;
> + struct socket *sock;
> + union {
> + struct {
> + int irq;
> + grant_ref_t ref;
> + struct pvcalls_data_intf *ring;
> + struct pvcalls_data data;
> + struct mutex in_mutex;
> + struct mutex out_mutex;
> +
> + wait_queue_head_t inflight_conn_req;
> + } active;
> + };
> +};
> +
> static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> {
> struct xenbus_device *dev = dev_id;
> @@ -84,6 +106,18 @@ static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> return IRQ_HANDLED;
> }
>
> +static irqreturn_t pvcalls_front_conn_handler(int irq, void *sock_map)
> +{
> + struct sock_mapping *map = sock_map;
> +
> + if (map == NULL)
> + return IRQ_HANDLED;
> +
> + wake_up_interruptible(&map->active.inflight_conn_req);
> +
> + return IRQ_HANDLED;
> +}
> +
> int pvcalls_front_socket(struct socket *sock)
> {
> struct pvcalls_bedata *bedata;
> @@ -137,6 +171,127 @@ int pvcalls_front_socket(struct socket *sock)
> return ret;
> }
>
> +static struct sock_mapping *create_active(int *evtchn)
> +{
> + struct sock_mapping *map = NULL;
> + void *bytes;
> + int ret, irq = -1, i;
> +
> + map = kzalloc(sizeof(*map), GFP_KERNEL);
> + if (map == NULL)
> + return NULL;
> +
> + init_waitqueue_head(&map->active.inflight_conn_req);
> +
> + map->active.ring = (struct pvcalls_data_intf *)
> + __get_free_page(GFP_KERNEL | __GFP_ZERO);
> + if (map->active.ring == NULL)
> + goto out_error;
> + memset(map->active.ring, 0, XEN_PAGE_SIZE);
> + map->active.ring->ring_order = RING_ORDER;
> + bytes = (void *)__get_free_pages(GFP_KERNEL | __GFP_ZERO,
> + map->active.ring->ring_order);
> + if (bytes == NULL)
> + goto out_error;
> + for (i = 0; i < (1 << map->active.ring->ring_order); i++)
> + map->active.ring->ref[i] = gnttab_grant_foreign_access(
> + pvcalls_front_dev->otherend_id,
> + pfn_to_gfn(virt_to_pfn(bytes) + i), 0);
> +
> + map->active.ref = gnttab_grant_foreign_access(
> + pvcalls_front_dev->otherend_id,
> + pfn_to_gfn(virt_to_pfn((void *)map->active.ring)), 0);
> +
> + map->active.data.in = bytes;
> + map->active.data.out = bytes +
> + XEN_FLEX_RING_SIZE(map->active.ring->ring_order);
> +
> + ret = xenbus_alloc_evtchn(pvcalls_front_dev, evtchn);
> + if (ret)
> + goto out_error;
> + irq = bind_evtchn_to_irqhandler(*evtchn, pvcalls_front_conn_handler,
> + 0, "pvcalls-frontend", map);
> + if (irq < 0)
> + goto out_error;
> +
> + map->active.irq = irq;
> + map->active_socket = true;
> + mutex_init(&map->active.in_mutex);
> + mutex_init(&map->active.out_mutex);
> +
> + return map;
> +
> +out_error:
> + if (irq >= 0)
> + unbind_from_irqhandler(irq, map);
> + else if (*evtchn >= 0)
> + xenbus_free_evtchn(pvcalls_front_dev, *evtchn);
> + kfree(map->active.data.in);
> + kfree(map->active.ring);
> + kfree(map);
> + return NULL;
> +}
> +
> +int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
> + int addr_len, int flags)
> +{
> + struct pvcalls_bedata *bedata;
> + struct sock_mapping *map = NULL;
> + struct xen_pvcalls_request *req;
> + int notify, req_id, ret, evtchn;
> +
> + if (!pvcalls_front_dev)
> + return -ENETUNREACH;
> + if (addr->sa_family != AF_INET || sock->type != SOCK_STREAM)
> + return -ENOTSUPP;
> +
> + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> +
> + spin_lock(&bedata->pvcallss_lock);
> + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> + if (RING_FULL(&bedata->ring) ||
> + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> + spin_unlock(&bedata->pvcallss_lock);
> + return -EAGAIN;
> + }
> +
> + map = create_active(&evtchn);
> + if (!map) {
> + spin_unlock(&bedata->pvcallss_lock);
> + return -ENOMEM;
> + }
> +
> + req = RING_GET_REQUEST(&bedata->ring, req_id);
> + req->req_id = req_id;
> + req->cmd = PVCALLS_CONNECT;
> + req->u.connect.id = (uint64_t)sock;
> + memcpy(req->u.connect.addr, addr, sizeof(*addr));
> + req->u.connect.len = addr_len;
> + req->u.connect.flags = flags;
> + req->u.connect.ref = map->active.ref;
> + req->u.connect.evtchn = evtchn;
> +
> + list_add_tail(&map->list, &bedata->socket_mappings);
> + map->sock = sock;
> + WRITE_ONCE(sock->sk->sk_send_head, (void *)map);
> +
> + bedata->ring.req_prod_pvt++;
> + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> + spin_unlock(&bedata->pvcallss_lock);
> +
> + if (notify)
> + notify_remote_via_irq(bedata->irq);
> +
> + wait_event(bedata->inflight_req,
> + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
> +
> + ret = bedata->rsp[req_id].ret;
> + /* read ret, then set this rsp slot to be reused */
> + smp_mb();
> + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> + return ret;
> +}
> +
> static const struct xenbus_device_id pvcalls_front_ids[] = {
> { "pvcalls" },
> { "" }
> @@ -150,7 +305,7 @@ static int pvcalls_front_remove(struct xenbus_device *dev)
> static int pvcalls_front_probe(struct xenbus_device *dev,
> const struct xenbus_device_id *id)
> {
> - int ret = -EFAULT, evtchn, ref = -1, i;
> + int ret = -ENOMEM, evtchn, ref = -1, i;
> unsigned int max_page_order, function_calls, len;
> char *versions;
> grant_ref_t gref_head = 0;
> @@ -171,15 +326,13 @@ static int pvcalls_front_probe(struct xenbus_device *dev,
> return -EINVAL;
> }
> kfree(versions);
> - ret = xenbus_scanf(XBT_NIL, dev->otherend,
> - "max-page-order", "%u", &max_page_order);
> - if (ret <= 0)
> - return -ENODEV;
> + max_page_order = xenbus_read_unsigned(dev->otherend,
> + "max-page-order", 0);
> if (max_page_order < RING_ORDER)
> return -ENODEV;
> - ret = xenbus_scanf(XBT_NIL, dev->otherend,
> - "function-calls", "%u", &function_calls);
> - if (ret <= 0 || function_calls != 1)
> + function_calls = xenbus_read_unsigned(dev->otherend,
> + "function-calls", 0);
> + if (function_calls != 1)
> return -ENODEV;
> pr_info("%s max-page-order is %u\n", __func__, max_page_order);
>
> @@ -187,6 +340,8 @@ static int pvcalls_front_probe(struct xenbus_device *dev,
> if (!bedata)
> return -ENOMEM;
>
> + dev_set_drvdata(&dev->dev, bedata);
> + pvcalls_front_dev = dev;
> init_waitqueue_head(&bedata->inflight_req);
> for (i = 0; i < PVCALLS_NR_REQ_PER_RING; i++)
> bedata->rsp[i].req_id = PVCALLS_INVALID_ID;
> @@ -214,8 +369,10 @@ static int pvcalls_front_probe(struct xenbus_device *dev,
> if (ret < 0)
> goto error;
> bedata->ref = ref = gnttab_claim_grant_reference(&gref_head);
> - if (ref < 0)
> + if (ref < 0) {
> + ret = ref;
> goto error;
> + }
> gnttab_grant_foreign_access_ref(ref, dev->otherend_id,
> virt_to_gfn((void *)sring), 0);
>
> @@ -246,8 +403,6 @@ static int pvcalls_front_probe(struct xenbus_device *dev,
> INIT_LIST_HEAD(&bedata->socket_mappings);
> INIT_LIST_HEAD(&bedata->socketpass_mappings);
> spin_lock_init(&bedata->pvcallss_lock);
> - dev_set_drvdata(&dev->dev, bedata);
> - pvcalls_front_dev = dev;
> xenbus_switch_state(dev, XenbusStateInitialised);
Why are these changes made here and not in the original patch?
-boris
>
> return 0;
> diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> index b7dabed..63b0417 100644
> --- a/drivers/xen/pvcalls-front.h
> +++ b/drivers/xen/pvcalls-front.h
> @@ -4,5 +4,7 @@
> #include <linux/net.h>
>
> int pvcalls_front_socket(struct socket *sock);
> +int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
> + int addr_len, int flags);
>
> #endif
On 7/25/2017 5:22 PM, Stefano Stabellini wrote:
> Send PVCALLS_BIND to the backend. Introduce a new structure, part of
> struct sock_mapping, to store information specific to passive sockets.
>
> Introduce a status field to keep track of the status of the passive
> socket.
>
> Introduce a waitqueue for the "accept" command (see the accept command
> implementation): it is used to allow only one outstanding accept
> command at any given time and to implement polling on the passive
> socket. Introduce a flags field to keep track of in-flight accept and
> poll commands.
>
> sock->sk->sk_send_head is not used for ip sockets: reuse the field to
> store a pointer to the struct sock_mapping corresponding to the socket.
>
> Convert the struct socket pointer into an uint64_t and use it as id for
> the socket to pass to the backend.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-front.c | 73 +++++++++++++++++++++++++++++++++++++++++++++
> drivers/xen/pvcalls-front.h | 3 ++
> 2 files changed, 76 insertions(+)
>
> diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> index d0f5f42..af2ce20 100644
> --- a/drivers/xen/pvcalls-front.c
> +++ b/drivers/xen/pvcalls-front.c
> @@ -59,6 +59,23 @@ struct sock_mapping {
>
> wait_queue_head_t inflight_conn_req;
> } active;
> + struct {
> + /* Socket status */
> +#define PVCALLS_STATUS_UNINITALIZED 0
> +#define PVCALLS_STATUS_BIND 1
> +#define PVCALLS_STATUS_LISTEN 2
> + uint8_t status;
> + /*
> + * Internal state-machine flags.
> + * Only one accept operation can be inflight for a socket.
> + * Only one poll operation can be inflight for a given socket.
> + */
> +#define PVCALLS_FLAG_ACCEPT_INFLIGHT 0
> +#define PVCALLS_FLAG_POLL_INFLIGHT 1
> +#define PVCALLS_FLAG_POLL_RET 2
> + uint8_t flags;
> + wait_queue_head_t inflight_accept_req;
> + } passive;
> };
> };
>
> @@ -292,6 +309,62 @@ int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
> return ret;
> }
>
> +int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
> +{
> + struct pvcalls_bedata *bedata;
> + struct sock_mapping *map = NULL;
> + struct xen_pvcalls_request *req;
> + int notify, req_id, ret;
> +
> + if (!pvcalls_front_dev)
> + return -ENOTCONN;
> + if (addr->sa_family != AF_INET || sock->type != SOCK_STREAM)
> + return -ENOTSUPP;
> + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> +
> + map = kzalloc(sizeof(*map), GFP_KERNEL);
> + if (map == NULL)
> + return -ENOMEM;
> +
> + spin_lock(&bedata->pvcallss_lock);
> + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> + if (RING_FULL(&bedata->ring) ||
> + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> + kfree(map);
> + spin_unlock(&bedata->pvcallss_lock);
> + return -EAGAIN;
> + }
> + req = RING_GET_REQUEST(&bedata->ring, req_id);
> + req->req_id = req_id;
> + map->sock = sock;
> + req->cmd = PVCALLS_BIND;
> + req->u.bind.id = (uint64_t) sock;
> + memcpy(req->u.bind.addr, addr, sizeof(*addr));
> + req->u.bind.len = addr_len;
> +
> + init_waitqueue_head(&map->passive.inflight_accept_req);
> +
> + list_add_tail(&map->list, &bedata->socketpass_mappings);
> + WRITE_ONCE(sock->sk->sk_send_head, (void *)map);
> + map->active_socket = false;
> +
> + bedata->ring.req_prod_pvt++;
> + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> + spin_unlock(&bedata->pvcallss_lock);
> + if (notify)
> + notify_remote_via_irq(bedata->irq);
> +
> + wait_event(bedata->inflight_req,
> + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
This all looks very similar to previous patches. Can it be factored out?
Also, you've used wait_event_interruptible in socket() implementation.
Why not here (and connect())?
-boris
> +
> + map->passive.status = PVCALLS_STATUS_BIND;
> + ret = bedata->rsp[req_id].ret;
> + /* read ret, then set this rsp slot to be reused */
> + smp_mb();
> + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> + return 0;
> +}
> +
> static const struct xenbus_device_id pvcalls_front_ids[] = {
> { "pvcalls" },
> { "" }
> diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> index 63b0417..8b0a274 100644
> --- a/drivers/xen/pvcalls-front.h
> +++ b/drivers/xen/pvcalls-front.h
> @@ -6,5 +6,8 @@
> int pvcalls_front_socket(struct socket *sock);
> int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
> int addr_len, int flags);
> +int pvcalls_front_bind(struct socket *sock,
> + struct sockaddr *addr,
> + int addr_len);
>
> #endif
On 07/25/2017 05:22 PM, Stefano Stabellini wrote:
> Send PVCALLS_ACCEPT to the backend. Allocate a new active socket. Make
> sure that only one accept command is executed at any given time by
> setting PVCALLS_FLAG_ACCEPT_INFLIGHT and waiting on the
> inflight_accept_req waitqueue.
>
> sock->sk->sk_send_head is not used for ip sockets: reuse the field to
> store a pointer to the struct sock_mapping corresponding to the socket.
>
> Convert the new struct socket pointer into an uint64_t and use it as id
> for the new socket to pass to the backend.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-front.c | 79 +++++++++++++++++++++++++++++++++++++++++++++
> drivers/xen/pvcalls-front.h | 3 ++
> 2 files changed, 82 insertions(+)
>
> diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> index 3b5d50e..b8c4538 100644
> --- a/drivers/xen/pvcalls-front.c
> +++ b/drivers/xen/pvcalls-front.c
> @@ -413,6 +413,85 @@ int pvcalls_front_listen(struct socket *sock, int backlog)
> return ret;
> }
>
> +int pvcalls_front_accept(struct socket *sock, struct socket *newsock, int flags)
> +{
> + struct pvcalls_bedata *bedata;
> + struct sock_mapping *map;
> + struct sock_mapping *map2 = NULL;
> + struct xen_pvcalls_request *req;
> + int notify, req_id, ret, evtchn;
> +
> + if (!pvcalls_front_dev)
> + return -ENOTCONN;
> + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> +
> + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> + if (!map)
> + return -ENOTSOCK;
> +
> + if (map->passive.status != PVCALLS_STATUS_LISTEN)
> + return -EINVAL;
> +
> + /*
> + * Backend only supports 1 inflight accept request, will return
> + * errors for the others
> + */
> + if (test_and_set_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
> + (void *)&map->passive.flags)) {
> + if (wait_event_interruptible(map->passive.inflight_accept_req,
> + !test_and_set_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
> + (void *)&map->passive.flags))
> + != 0)
Unnecessary "!=0".
-boris
> + return -EINTR;
> + }
> +
> +
> + newsock->sk = kzalloc(sizeof(*newsock->sk), GFP_KERNEL);
> + if (newsock->sk == NULL)
> + return -ENOMEM;
> +
> + spin_lock(&bedata->pvcallss_lock);
> + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> + if (RING_FULL(&bedata->ring) ||
> + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> + kfree(newsock->sk);
> + spin_unlock(&bedata->pvcallss_lock);
> + return -EAGAIN;
> + }
> +
> + map2 = create_active(&evtchn);
> +
> + req = RING_GET_REQUEST(&bedata->ring, req_id);
> + req->req_id = req_id;
> + req->cmd = PVCALLS_ACCEPT;
> + req->u.accept.id = (uint64_t) sock;
> + req->u.accept.ref = map2->active.ref;
> + req->u.accept.id_new = (uint64_t) newsock;
> + req->u.accept.evtchn = evtchn;
> +
> + list_add_tail(&map2->list, &bedata->socket_mappings);
> + WRITE_ONCE(newsock->sk->sk_send_head, (void *)map2);
> + map2->sock = newsock;
> +
> + bedata->ring.req_prod_pvt++;
> + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> + spin_unlock(&bedata->pvcallss_lock);
> + if (notify)
> + notify_remote_via_irq(bedata->irq);
> +
> + wait_event(bedata->inflight_req,
> + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
> +
> + clear_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT, (void *)&map->passive.flags);
> + wake_up(&map->passive.inflight_accept_req);
> +
> + ret = bedata->rsp[req_id].ret;
> + /* read ret, then set this rsp slot to be reused */
> + smp_mb();
> + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> + return ret;
> +}
> +
> static const struct xenbus_device_id pvcalls_front_ids[] = {
> { "pvcalls" },
> { "" }
> diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> index aa8fe10..ab4f1da 100644
> --- a/drivers/xen/pvcalls-front.h
> +++ b/drivers/xen/pvcalls-front.h
> @@ -10,5 +10,8 @@ int pvcalls_front_bind(struct socket *sock,
> struct sockaddr *addr,
> int addr_len);
> int pvcalls_front_listen(struct socket *sock, int backlog);
> +int pvcalls_front_accept(struct socket *sock,
> + struct socket *newsock,
> + int flags);
>
> #endif
On 07/25/2017 05:22 PM, Stefano Stabellini wrote:
> Implement recvmsg by copying data from the "in" ring. If not enough data
> is available and the recvmsg call is blocking, then wait on the
> inflight_conn_req waitqueue. Take the active socket in_mutex so that
> only one function can access the ring at any given time.
>
> If not enough data is available on the ring, rather than returning
> immediately or sleep-waiting, spin for up to 5000 cycles. This small
> optimization turns out to improve performance and latency significantly.
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-front.c | 106 ++++++++++++++++++++++++++++++++++++++++++++
> drivers/xen/pvcalls-front.h | 4 ++
> 2 files changed, 110 insertions(+)
>
> diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> index d8ed280..b4ca569 100644
> --- a/drivers/xen/pvcalls-front.c
> +++ b/drivers/xen/pvcalls-front.c
> @@ -96,6 +96,20 @@ static int pvcalls_front_write_todo(struct sock_mapping *map)
> return size - pvcalls_queued(prod, cons, size);
> }
>
> +static bool pvcalls_front_read_todo(struct sock_mapping *map)
> +{
> + struct pvcalls_data_intf *intf = map->active.ring;
> + RING_IDX cons, prod;
> + int32_t error;
> +
> + cons = intf->in_cons;
> + prod = intf->in_prod;
> + error = intf->in_error;
> + return (error != 0 ||
> + pvcalls_queued(prod, cons,
> + XEN_FLEX_RING_SIZE(intf->ring_order))) != 0;
> +}
> +
> static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> {
> struct xenbus_device *dev = dev_id;
> @@ -418,6 +432,98 @@ int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg,
> return tot_sent;
> }
>
> +static int __read_ring(struct pvcalls_data_intf *intf,
> + struct pvcalls_data *data,
> + struct iov_iter *msg_iter,
> + size_t len, int flags)
> +{
> + RING_IDX cons, prod, size, masked_prod, masked_cons;
> + RING_IDX array_size = XEN_FLEX_RING_SIZE(intf->ring_order);
> + int32_t error;
> +
> + cons = intf->in_cons;
> + prod = intf->in_prod;
> + error = intf->in_error;
> + /* get pointers before reading from the ring */
> + virt_rmb();
> + if (error < 0)
> + return error;
> +
> + size = pvcalls_queued(prod, cons, array_size);
> + masked_prod = pvcalls_mask(prod, array_size);
> + masked_cons = pvcalls_mask(cons, array_size);
> +
> + if (size == 0)
> + return 0;
> +
> + if (len > size)
> + len = size;
> +
> + if (masked_prod > masked_cons) {
> + copy_to_iter(data->in + masked_cons, len, msg_iter);
> + } else {
> + if (len > (array_size - masked_cons)) {
> + copy_to_iter(data->in + masked_cons,
> + array_size - masked_cons, msg_iter);
> + copy_to_iter(data->in,
> + len - (array_size - masked_cons),
> + msg_iter);
> + } else {
> + copy_to_iter(data->in + masked_cons, len, msg_iter);
> + }
> + }
> + /* read data from the ring before increasing the index */
> + virt_mb();
> + if (!(flags & MSG_PEEK))
> + intf->in_cons += len;
> +
> + return len;
> +}
> +
> +int pvcalls_front_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> + int flags)
> +{
> + struct pvcalls_bedata *bedata;
> + int ret = -EAGAIN;
> + struct sock_mapping *map;
> + int count = 0;
> +
> + if (!pvcalls_front_dev)
> + return -ENOTCONN;
> + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> +
> + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> + if (!map)
> + return -ENOTSOCK;
> +
> + if (flags & (MSG_CMSG_CLOEXEC|MSG_ERRQUEUE|MSG_OOB|MSG_TRUNC))
> + return -EOPNOTSUPP;
> +
> + mutex_lock(&map->active.in_mutex);
> + if (len > XEN_FLEX_RING_SIZE(map->active.ring->ring_order))
> + len = XEN_FLEX_RING_SIZE(map->active.ring->ring_order);
> +
> + while (!(flags & MSG_DONTWAIT) && !pvcalls_front_read_todo(map)) {
> + if (count < PVCALLS_FRONT_MAX_SPIN)
> + count++;
> + else
> + wait_event_interruptible(map->active.inflight_conn_req,
> + pvcalls_front_read_todo(map));
> + }
Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
counting non-sleeping iterations but here we are sleeping so
PVCALLS_FRONT_MAX_SPIN (5000) may take a while.
In fact, what shouldn't this waiting be a function of MSG_DONTWAIT
and/or socket's O_NONBLOCK?
-boris
> + ret = __read_ring(map->active.ring, &map->active.data,
> + &msg->msg_iter, len, flags);
> +
> + if (ret > 0)
> + notify_remote_via_irq(map->active.irq);
> + if (ret == 0)
> + ret = -EAGAIN;
> + if (ret == -ENOTCONN)
> + ret = 0;
> +
> + mutex_unlock(&map->active.in_mutex);
> + return ret;
> +}
> +
> int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
> {
> struct pvcalls_bedata *bedata;
> diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> index d937c24..de24041 100644
> --- a/drivers/xen/pvcalls-front.h
> +++ b/drivers/xen/pvcalls-front.h
> @@ -16,5 +16,9 @@ int pvcalls_front_accept(struct socket *sock,
> int pvcalls_front_sendmsg(struct socket *sock,
> struct msghdr *msg,
> size_t len);
> +int pvcalls_front_recvmsg(struct socket *sock,
> + struct msghdr *msg,
> + size_t len,
> + int flags);
>
> #endif
>> + while (!(flags & MSG_DONTWAIT) && !pvcalls_front_read_todo(map)) {
>> + if (count < PVCALLS_FRONT_MAX_SPIN)
>> + count++;
>> + else
>> + wait_event_interruptible(map->active.inflight_conn_req,
>> + pvcalls_front_read_todo(map));
>> + }
> Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
> counting non-sleeping iterations but here we are sleeping so
> PVCALLS_FRONT_MAX_SPIN (5000) may take a while.
>
> In fact, what shouldn't this waiting be a function of MSG_DONTWAIT
err, which it already is. But the question still stands (except for
MSG_DONTWAIT).
-boris
> and/or socket's O_NONBLOCK?
On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> On 7/25/2017 5:22 PM, Stefano Stabellini wrote:
> > Send a PVCALLS_SOCKET command to the backend, use the masked
> > req_prod_pvt as req_id. This way, req_id is guaranteed to be between 0
> > and PVCALLS_NR_REQ_PER_RING. We already have a slot in the rsp array
> > ready for the response, and there cannot be two outstanding responses
> > with the same req_id.
> >
> > Wait for the response by waiting on the inflight_req waitqueue and
> > check for the req_id field in rsp[req_id]. Use atomic accesses to
> > read the field. Once a response is received, clear the corresponding rsp
> > slot by setting req_id to PVCALLS_INVALID_ID. Note that
> > PVCALLS_INVALID_ID is invalid only from the frontend point of view. It
> > is not part of the PVCalls protocol.
> >
> > pvcalls_front_event_handler is in charge of copying responses from the
> > ring to the appropriate rsp slot. It is done by copying the body of the
> > response first, then by copying req_id atomically. After the copies,
> > wake up anybody waiting on waitqueue.
> >
> > pvcallss_lock protects accesses to the ring.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-front.c | 94
> > +++++++++++++++++++++++++++++++++++++++++++++
> > drivers/xen/pvcalls-front.h | 8 ++++
> > 2 files changed, 102 insertions(+)
> > create mode 100644 drivers/xen/pvcalls-front.h
> >
> > diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> > index 5e0b265..d1dbcf1 100644
> > --- a/drivers/xen/pvcalls-front.c
> > +++ b/drivers/xen/pvcalls-front.c
> > @@ -20,6 +20,8 @@
> > #include <xen/xenbus.h>
> > #include <xen/interface/io/pvcalls.h>
> > +#include "pvcalls-front.h"
> > +
> > #define PVCALLS_INVALID_ID (UINT_MAX)
> > #define RING_ORDER XENBUS_MAX_RING_GRANT_ORDER
> > #define PVCALLS_NR_REQ_PER_RING __CONST_RING_SIZE(xen_pvcalls,
> > XEN_PAGE_SIZE)
> > @@ -40,9 +42,101 @@ struct pvcalls_bedata {
> > static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> > {
> > + struct xenbus_device *dev = dev_id;
> > + struct pvcalls_bedata *bedata;
> > + struct xen_pvcalls_response *rsp;
> > + uint8_t *src, *dst;
> > + int req_id = 0, more = 0, done = 0;
> > +
> > + if (dev == NULL)
> > + return IRQ_HANDLED;
> > +
> > + bedata = dev_get_drvdata(&dev->dev);
> > + if (bedata == NULL)
> > + return IRQ_HANDLED;
> > +
> > +again:
> > + while (RING_HAS_UNCONSUMED_RESPONSES(&bedata->ring)) {
> > + rsp = RING_GET_RESPONSE(&bedata->ring, bedata->ring.rsp_cons);
> > +
> > + req_id = rsp->req_id;
> > + src = (uint8_t *)&bedata->rsp[req_id];
> > + src += sizeof(rsp->req_id);
> > + dst = (uint8_t *)rsp;
> > + dst += sizeof(rsp->req_id);
>
> These two lines can be combined (both src and dst)
I'll do that
> > + memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
> > + /*
> > + * First copy the rest of the data, then req_id. It is
> > + * paired with the barrier when accessing bedata->rsp.
> > + */
> > + smp_wmb();
> > + WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
> > +
> > + done = 1;
> > + bedata->ring.rsp_cons++;
> > + }
> > +
> > + RING_FINAL_CHECK_FOR_RESPONSES(&bedata->ring, more);
> > + if (more)
> > + goto again;
> > + if (done)
> > + wake_up(&bedata->inflight_req);
> > return IRQ_HANDLED;
> > }
> > +int pvcalls_front_socket(struct socket *sock)
> > +{
> > + struct pvcalls_bedata *bedata;
> > + struct xen_pvcalls_request *req;
> > + int notify, req_id, ret;
> > +
> > + if (!pvcalls_front_dev)
> > + return -EACCES;
> > + /*
> > + * PVCalls only supports domain AF_INET,
> > + * type SOCK_STREAM and protocol 0 sockets for now.
> > + *
> > + * Check socket type here, AF_INET and protocol checks are done
> > + * by the caller.
> > + */
> > + if (sock->type != SOCK_STREAM)
> > + return -ENOTSUPP;
> > +
> > + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> > +
> > + spin_lock(&bedata->pvcallss_lock);
> > + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> > + if (RING_FULL(&bedata->ring) ||
> > + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> > + spin_unlock(&bedata->pvcallss_lock);
> > + return -EAGAIN;
> > + }
> > + req = RING_GET_REQUEST(&bedata->ring, req_id);
> > + req->req_id = req_id;
> > + req->cmd = PVCALLS_SOCKET;
> > + req->u.socket.id = (uint64_t) sock;
> > + req->u.socket.domain = AF_INET;
> > + req->u.socket.type = SOCK_STREAM;
> > + req->u.socket.protocol = 0;
> > +
> > + bedata->ring.req_prod_pvt++;
> > + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> > + spin_unlock(&bedata->pvcallss_lock);
> > + if (notify)
> > + notify_remote_via_irq(bedata->irq);
> > +
> > + if (wait_event_interruptible(bedata->inflight_req,
> > + READ_ONCE(bedata->rsp[req_id].req_id) == req_id) != 0)
>
> "!= 0" can be dropped
OK
> > + return -EINTR;
> > +
> > + ret = bedata->rsp[req_id].ret;
> > + /* read ret, then set this rsp slot to be reused */
> > + smp_mb();
> > + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> > +
> > + return ret;
> > +}
> > +
> > static const struct xenbus_device_id pvcalls_front_ids[] = {
> > { "pvcalls" },
> > { "" }
> > diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> > new file mode 100644
> > index 0000000..b7dabed
> > --- /dev/null
> > +++ b/drivers/xen/pvcalls-front.h
> > @@ -0,0 +1,8 @@
> > +#ifndef __PVCALLS_FRONT_H__
> > +#define __PVCALLS_FRONT_H__
> > +
> > +#include <linux/net.h>
> > +
> > +int pvcalls_front_socket(struct socket *sock);
> > +
> > +#endif
>
On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> On 7/25/2017 5:22 PM, Stefano Stabellini wrote:
> > Send PVCALLS_CONNECT to the backend. Allocate a new ring and evtchn for
> > the active socket.
> >
> > Introduce a data structure to keep track of sockets. Introduce a
> > waitqueue to allow the frontend to wait on data coming from the backend
> > on the active socket (recvmsg command).
> >
> > Two mutexes (one of reads and one for writes) will be used to protect
> > the active socket in and out rings from concurrent accesses.
> >
> > sock->sk->sk_send_head is not used for ip sockets: reuse the field to
> > store a pointer to the struct sock_mapping corresponding to the socket.
> > This way, we can easily get the struct sock_mapping from the struct
> > socket.
>
> This needs to be documented in the code, not (just) in commit message.
I'll add an in-code comment
> >
> > Convert the struct socket pointer into an uint64_t and use it as id for
> > the new socket to pass to the backend.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-front.c | 177
> > +++++++++++++++++++++++++++++++++++++++++---
> > drivers/xen/pvcalls-front.h | 2 +
> > 2 files changed, 168 insertions(+), 11 deletions(-)
> >
> > diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> > index d1dbcf1..d0f5f42 100644
> > --- a/drivers/xen/pvcalls-front.c
> > +++ b/drivers/xen/pvcalls-front.c
> > @@ -13,6 +13,10 @@
> > */
> > #include <linux/module.h>
> > +#include <linux/net.h>
> > +#include <linux/socket.h>
> > +
> > +#include <net/sock.h>
> > #include <xen/events.h>
> > #include <xen/grant_table.h>
> > @@ -40,6 +44,24 @@ struct pvcalls_bedata {
> > };
> > struct xenbus_device *pvcalls_front_dev;
> > +struct sock_mapping {
> > + bool active_socket;
> > + struct list_head list;
> > + struct socket *sock;
> > + union {
> > + struct {
> > + int irq;
> > + grant_ref_t ref;
> > + struct pvcalls_data_intf *ring;
> > + struct pvcalls_data data;
> > + struct mutex in_mutex;
> > + struct mutex out_mutex;
> > +
> > + wait_queue_head_t inflight_conn_req;
> > + } active;
> > + };
> > +};
> > +
> > static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> > {
> > struct xenbus_device *dev = dev_id;
> > @@ -84,6 +106,18 @@ static irqreturn_t pvcalls_front_event_handler(int irq,
> > void *dev_id)
> > return IRQ_HANDLED;
> > }
> > +static irqreturn_t pvcalls_front_conn_handler(int irq, void *sock_map)
> > +{
> > + struct sock_mapping *map = sock_map;
> > +
> > + if (map == NULL)
> > + return IRQ_HANDLED;
> > +
> > + wake_up_interruptible(&map->active.inflight_conn_req);
> > +
> > + return IRQ_HANDLED;
> > +}
> > +
> > int pvcalls_front_socket(struct socket *sock)
> > {
> > struct pvcalls_bedata *bedata;
> > @@ -137,6 +171,127 @@ int pvcalls_front_socket(struct socket *sock)
> > return ret;
> > }
> > +static struct sock_mapping *create_active(int *evtchn)
> > +{
> > + struct sock_mapping *map = NULL;
> > + void *bytes;
> > + int ret, irq = -1, i;
> > +
> > + map = kzalloc(sizeof(*map), GFP_KERNEL);
> > + if (map == NULL)
> > + return NULL;
> > +
> > + init_waitqueue_head(&map->active.inflight_conn_req);
> > +
> > + map->active.ring = (struct pvcalls_data_intf *)
> > + __get_free_page(GFP_KERNEL | __GFP_ZERO);
> > + if (map->active.ring == NULL)
> > + goto out_error;
> > + memset(map->active.ring, 0, XEN_PAGE_SIZE);
> > + map->active.ring->ring_order = RING_ORDER;
> > + bytes = (void *)__get_free_pages(GFP_KERNEL | __GFP_ZERO,
> > + map->active.ring->ring_order);
> > + if (bytes == NULL)
> > + goto out_error;
> > + for (i = 0; i < (1 << map->active.ring->ring_order); i++)
> > + map->active.ring->ref[i] = gnttab_grant_foreign_access(
> > + pvcalls_front_dev->otherend_id,
> > + pfn_to_gfn(virt_to_pfn(bytes) + i), 0);
> > +
> > + map->active.ref = gnttab_grant_foreign_access(
> > + pvcalls_front_dev->otherend_id,
> > + pfn_to_gfn(virt_to_pfn((void *)map->active.ring)), 0);
> > +
> > + map->active.data.in = bytes;
> > + map->active.data.out = bytes +
> > + XEN_FLEX_RING_SIZE(map->active.ring->ring_order);
> > +
> > + ret = xenbus_alloc_evtchn(pvcalls_front_dev, evtchn);
> > + if (ret)
> > + goto out_error;
> > + irq = bind_evtchn_to_irqhandler(*evtchn, pvcalls_front_conn_handler,
> > + 0, "pvcalls-frontend", map);
> > + if (irq < 0)
> > + goto out_error;
> > +
> > + map->active.irq = irq;
> > + map->active_socket = true;
> > + mutex_init(&map->active.in_mutex);
> > + mutex_init(&map->active.out_mutex);
> > +
> > + return map;
> > +
> > +out_error:
> > + if (irq >= 0)
> > + unbind_from_irqhandler(irq, map);
> > + else if (*evtchn >= 0)
> > + xenbus_free_evtchn(pvcalls_front_dev, *evtchn);
> > + kfree(map->active.data.in);
> > + kfree(map->active.ring);
> > + kfree(map);
> > + return NULL;
> > +}
> > +
> > +int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
> > + int addr_len, int flags)
> > +{
> > + struct pvcalls_bedata *bedata;
> > + struct sock_mapping *map = NULL;
> > + struct xen_pvcalls_request *req;
> > + int notify, req_id, ret, evtchn;
> > +
> > + if (!pvcalls_front_dev)
> > + return -ENETUNREACH;
> > + if (addr->sa_family != AF_INET || sock->type != SOCK_STREAM)
> > + return -ENOTSUPP;
> > +
> > + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> > +
> > + spin_lock(&bedata->pvcallss_lock);
> > + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> > + if (RING_FULL(&bedata->ring) ||
> > + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> > + spin_unlock(&bedata->pvcallss_lock);
> > + return -EAGAIN;
> > + }
> > +
> > + map = create_active(&evtchn);
> > + if (!map) {
> > + spin_unlock(&bedata->pvcallss_lock);
> > + return -ENOMEM;
> > + }
> > +
> > + req = RING_GET_REQUEST(&bedata->ring, req_id);
> > + req->req_id = req_id;
> > + req->cmd = PVCALLS_CONNECT;
> > + req->u.connect.id = (uint64_t)sock;
> > + memcpy(req->u.connect.addr, addr, sizeof(*addr));
> > + req->u.connect.len = addr_len;
> > + req->u.connect.flags = flags;
> > + req->u.connect.ref = map->active.ref;
> > + req->u.connect.evtchn = evtchn;
> > +
> > + list_add_tail(&map->list, &bedata->socket_mappings);
> > + map->sock = sock;
> > + WRITE_ONCE(sock->sk->sk_send_head, (void *)map);
> > +
> > + bedata->ring.req_prod_pvt++;
> > + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> > + spin_unlock(&bedata->pvcallss_lock);
> > +
> > + if (notify)
> > + notify_remote_via_irq(bedata->irq);
> > +
> > + wait_event(bedata->inflight_req,
> > + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
> > +
> > + ret = bedata->rsp[req_id].ret;
> > + /* read ret, then set this rsp slot to be reused */
> > + smp_mb();
> > + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> > + return ret;
> > +}
> > +
> > static const struct xenbus_device_id pvcalls_front_ids[] = {
> > { "pvcalls" },
> > { "" }
> > @@ -150,7 +305,7 @@ static int pvcalls_front_remove(struct xenbus_device
> > *dev)
> > static int pvcalls_front_probe(struct xenbus_device *dev,
> > const struct xenbus_device_id *id)
> > {
> > - int ret = -EFAULT, evtchn, ref = -1, i;
> > + int ret = -ENOMEM, evtchn, ref = -1, i;
> > unsigned int max_page_order, function_calls, len;
> > char *versions;
> > grant_ref_t gref_head = 0;
> > @@ -171,15 +326,13 @@ static int pvcalls_front_probe(struct xenbus_device
> > *dev,
> > return -EINVAL;
> > }
> > kfree(versions);
> > - ret = xenbus_scanf(XBT_NIL, dev->otherend,
> > - "max-page-order", "%u", &max_page_order);
> > - if (ret <= 0)
> > - return -ENODEV;
> > + max_page_order = xenbus_read_unsigned(dev->otherend,
> > + "max-page-order", 0);
> > if (max_page_order < RING_ORDER)
> > return -ENODEV;
> > - ret = xenbus_scanf(XBT_NIL, dev->otherend,
> > - "function-calls", "%u", &function_calls);
> > - if (ret <= 0 || function_calls != 1)
> > + function_calls = xenbus_read_unsigned(dev->otherend,
> > + "function-calls", 0);
> > + if (function_calls != 1)
> > return -ENODEV;
> > pr_info("%s max-page-order is %u\n", __func__, max_page_order);
> > @@ -187,6 +340,8 @@ static int pvcalls_front_probe(struct xenbus_device
> > *dev,
> > if (!bedata)
> > return -ENOMEM;
> > + dev_set_drvdata(&dev->dev, bedata);
> > + pvcalls_front_dev = dev;
> > init_waitqueue_head(&bedata->inflight_req);
> > for (i = 0; i < PVCALLS_NR_REQ_PER_RING; i++)
> > bedata->rsp[i].req_id = PVCALLS_INVALID_ID;
> > @@ -214,8 +369,10 @@ static int pvcalls_front_probe(struct xenbus_device
> > *dev,
> > if (ret < 0)
> > goto error;
> > bedata->ref = ref = gnttab_claim_grant_reference(&gref_head);
> > - if (ref < 0)
> > + if (ref < 0) {
> > + ret = ref;
> > goto error;
> > + }
> > gnttab_grant_foreign_access_ref(ref, dev->otherend_id,
> > virt_to_gfn((void *)sring), 0);
> > @@ -246,8 +403,6 @@ static int pvcalls_front_probe(struct xenbus_device
> > *dev,
> > INIT_LIST_HEAD(&bedata->socket_mappings);
> > INIT_LIST_HEAD(&bedata->socketpass_mappings);
> > spin_lock_init(&bedata->pvcallss_lock);
> > - dev_set_drvdata(&dev->dev, bedata);
> > - pvcalls_front_dev = dev;
> > xenbus_switch_state(dev, XenbusStateInitialised);
>
> Why are these changes made here and not in the original patch?
I made a mistake in updating the patches for v2 :-(
I'll fix it.
> > return 0;
> > diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> > index b7dabed..63b0417 100644
> > --- a/drivers/xen/pvcalls-front.h
> > +++ b/drivers/xen/pvcalls-front.h
> > @@ -4,5 +4,7 @@
> > #include <linux/net.h>
> > int pvcalls_front_socket(struct socket *sock);
> > +int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
> > + int addr_len, int flags);
> > #endif
>
On 07/25/2017 05:22 PM, Stefano Stabellini wrote:
> For active sockets, check the indexes and use the inflight_conn_req
> waitqueue to wait.
>
> For passive sockets, send PVCALLS_POLL to the backend. Use the
> inflight_accept_req waitqueue if an accept is outstanding. Otherwise use
> the inflight_req waitqueue: inflight_req is awaken when a new response
> is received; on wakeup we check whether the POLL response is arrived by
> looking at the PVCALLS_FLAG_POLL_RET flag. We set the flag from
> pvcalls_front_event_handler, if the response was for a POLL command.
>
> In pvcalls_front_event_handler, get the struct socket pointer from the
> poll id (we previously converted struct socket* to uint64_t and used it
> as id).
>
> Signed-off-by: Stefano Stabellini <[email protected]>
> CC: [email protected]
> CC: [email protected]
> ---
> drivers/xen/pvcalls-front.c | 134 ++++++++++++++++++++++++++++++++++++++++----
> drivers/xen/pvcalls-front.h | 3 +
> 2 files changed, 126 insertions(+), 11 deletions(-)
>
> diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> index b4ca569..833b717 100644
> --- a/drivers/xen/pvcalls-front.c
> +++ b/drivers/xen/pvcalls-front.c
> @@ -130,17 +130,35 @@ static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> rsp = RING_GET_RESPONSE(&bedata->ring, bedata->ring.rsp_cons);
>
> req_id = rsp->req_id;
> - src = (uint8_t *)&bedata->rsp[req_id];
> - src += sizeof(rsp->req_id);
> - dst = (uint8_t *)rsp;
> - dst += sizeof(rsp->req_id);
> - memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
> - /*
> - * First copy the rest of the data, then req_id. It is
> - * paired with the barrier when accessing bedata->rsp.
> - */
> - smp_wmb();
> - WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
> + if (rsp->cmd == PVCALLS_POLL) {
> + struct socket *sock = (struct socket *) rsp->u.poll.id;
> + struct sock_mapping *map =
> + (struct sock_mapping *)
> + READ_ONCE(sock->sk->sk_send_head);
> +
> + set_bit(PVCALLS_FLAG_POLL_RET,
> + (void *)&map->passive.flags);
> + /*
> + * Set RET, then clear INFLIGHT. It pairs with
> + * the checks at the beginning of
> + * pvcalls_front_poll_passive.
> + */
> + smp_wmb();
virt_wmb() (here, below, and I think I saw it somewhere else)
> + clear_bit(PVCALLS_FLAG_POLL_INFLIGHT,
> + (void *)&map->passive.flags);
> + } else {
> + src = (uint8_t *)&bedata->rsp[req_id];
> + src += sizeof(rsp->req_id);
> + dst = (uint8_t *)rsp;
> + dst += sizeof(rsp->req_id);
> + memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
> + /*
> + * First copy the rest of the data, then req_id. It is
> + * paired with the barrier when accessing bedata->rsp.
> + */
> + smp_wmb();
> + WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
> + }
>
> done = 1;
> bedata->ring.rsp_cons++;
> @@ -707,6 +725,100 @@ int pvcalls_front_accept(struct socket *sock, struct socket *newsock, int flags)
> return ret;
> }
>
> +static unsigned int pvcalls_front_poll_passive(struct file *file,
> + struct pvcalls_bedata *bedata,
> + struct sock_mapping *map,
> + poll_table *wait)
> +{
> + int notify, req_id;
> + struct xen_pvcalls_request *req;
> +
> + if (test_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
> + (void *)&map->passive.flags)) {
> + poll_wait(file, &map->passive.inflight_accept_req, wait);
> + return 0;
> + }
> +
> + if (test_and_clear_bit(PVCALLS_FLAG_POLL_RET,
> + (void *)&map->passive.flags))
> + return POLLIN;
> +
> + /*
> + * First check RET, then INFLIGHT. No barriers necessary to
> + * ensure execution ordering because of the conditional
> + * instructions creating control dependencies.
> + */
> +
> + if (test_and_set_bit(PVCALLS_FLAG_POLL_INFLIGHT,
> + (void *)&map->passive.flags)) {
> + poll_wait(file, &bedata->inflight_req, wait);
> + return 0;
> + }
> +
> + spin_lock(&bedata->pvcallss_lock);
> + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> + if (RING_FULL(&bedata->ring) ||
> + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> + spin_unlock(&bedata->pvcallss_lock);
> + return -EAGAIN;
> + }
> + req = RING_GET_REQUEST(&bedata->ring, req_id);
> + req->req_id = req_id;
> + req->cmd = PVCALLS_POLL;
> + req->u.poll.id = (uint64_t) map->sock;
> +
> + bedata->ring.req_prod_pvt++;
> + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> + spin_unlock(&bedata->pvcallss_lock);
> + if (notify)
> + notify_remote_via_irq(bedata->irq);
> +
> + poll_wait(file, &bedata->inflight_req, wait);
> + return 0;
> +}
> +
> +static unsigned int pvcalls_front_poll_active(struct file *file,
> + struct pvcalls_bedata *bedata,
> + struct sock_mapping *map,
> + poll_table *wait)
> +{
> + unsigned int mask = 0;
> + int32_t in_error, out_error;
> + struct pvcalls_data_intf *intf = map->active.ring;
> +
> + out_error = intf->out_error;
> + in_error = intf->in_error;
> +
> + poll_wait(file, &map->active.inflight_conn_req, wait);
> + if (pvcalls_front_write_todo(map))
> + mask |= POLLOUT | POLLWRNORM;
> + if (pvcalls_front_read_todo(map))
> + mask |= POLLIN | POLLRDNORM;
> + if (in_error != 0 || out_error != 0)
> + mask |= POLLERR;
> +
> + return mask;
> +}
> +
> +unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
> + poll_table *wait)
> +{
> + struct pvcalls_bedata *bedata;
> + struct sock_mapping *map;
> +
> + if (!pvcalls_front_dev)
> + return POLLNVAL;
> + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> +
> + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> + if (!map)
> + return POLLNVAL;
> + if (map->active_socket)
> + return pvcalls_front_poll_active(file, bedata, map, wait);
> + else
> + return pvcalls_front_poll_passive(file, bedata, map, wait);
> +}
All other routines return an int (0 or error code). Shouldn't they all
have the same return type? (In fact, where are all these routines called
from?)
-boris
> +
> static const struct xenbus_device_id pvcalls_front_ids[] = {
> { "pvcalls" },
> { "" }
> diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> index de24041..25e05b8 100644
> --- a/drivers/xen/pvcalls-front.h
> +++ b/drivers/xen/pvcalls-front.h
> @@ -20,5 +20,8 @@ int pvcalls_front_recvmsg(struct socket *sock,
> struct msghdr *msg,
> size_t len,
> int flags);
> +unsigned int pvcalls_front_poll(struct file *file,
> + struct socket *sock,
> + poll_table *wait);
>
> #endif
On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> On 07/25/2017 05:22 PM, Stefano Stabellini wrote:
> > Send PVCALLS_ACCEPT to the backend. Allocate a new active socket. Make
> > sure that only one accept command is executed at any given time by
> > setting PVCALLS_FLAG_ACCEPT_INFLIGHT and waiting on the
> > inflight_accept_req waitqueue.
> >
> > sock->sk->sk_send_head is not used for ip sockets: reuse the field to
> > store a pointer to the struct sock_mapping corresponding to the socket.
> >
> > Convert the new struct socket pointer into an uint64_t and use it as id
> > for the new socket to pass to the backend.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-front.c | 79 +++++++++++++++++++++++++++++++++++++++++++++
> > drivers/xen/pvcalls-front.h | 3 ++
> > 2 files changed, 82 insertions(+)
> >
> > diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> > index 3b5d50e..b8c4538 100644
> > --- a/drivers/xen/pvcalls-front.c
> > +++ b/drivers/xen/pvcalls-front.c
> > @@ -413,6 +413,85 @@ int pvcalls_front_listen(struct socket *sock, int backlog)
> > return ret;
> > }
> >
> > +int pvcalls_front_accept(struct socket *sock, struct socket *newsock, int flags)
> > +{
> > + struct pvcalls_bedata *bedata;
> > + struct sock_mapping *map;
> > + struct sock_mapping *map2 = NULL;
> > + struct xen_pvcalls_request *req;
> > + int notify, req_id, ret, evtchn;
> > +
> > + if (!pvcalls_front_dev)
> > + return -ENOTCONN;
> > + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> > +
> > + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> > + if (!map)
> > + return -ENOTSOCK;
> > +
> > + if (map->passive.status != PVCALLS_STATUS_LISTEN)
> > + return -EINVAL;
> > +
> > + /*
> > + * Backend only supports 1 inflight accept request, will return
> > + * errors for the others
> > + */
> > + if (test_and_set_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
> > + (void *)&map->passive.flags)) {
> > + if (wait_event_interruptible(map->passive.inflight_accept_req,
> > + !test_and_set_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
> > + (void *)&map->passive.flags))
> > + != 0)
>
> Unnecessary "!=0".
OK
> > + return -EINTR;
> > + }
> > +
> > +
> > + newsock->sk = kzalloc(sizeof(*newsock->sk), GFP_KERNEL);
> > + if (newsock->sk == NULL)
> > + return -ENOMEM;
> > +
> > + spin_lock(&bedata->pvcallss_lock);
> > + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> > + if (RING_FULL(&bedata->ring) ||
> > + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> > + kfree(newsock->sk);
> > + spin_unlock(&bedata->pvcallss_lock);
> > + return -EAGAIN;
> > + }
> > +
> > + map2 = create_active(&evtchn);
> > +
> > + req = RING_GET_REQUEST(&bedata->ring, req_id);
> > + req->req_id = req_id;
> > + req->cmd = PVCALLS_ACCEPT;
> > + req->u.accept.id = (uint64_t) sock;
> > + req->u.accept.ref = map2->active.ref;
> > + req->u.accept.id_new = (uint64_t) newsock;
> > + req->u.accept.evtchn = evtchn;
> > +
> > + list_add_tail(&map2->list, &bedata->socket_mappings);
> > + WRITE_ONCE(newsock->sk->sk_send_head, (void *)map2);
> > + map2->sock = newsock;
> > +
> > + bedata->ring.req_prod_pvt++;
> > + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> > + spin_unlock(&bedata->pvcallss_lock);
> > + if (notify)
> > + notify_remote_via_irq(bedata->irq);
> > +
> > + wait_event(bedata->inflight_req,
> > + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
> > +
> > + clear_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT, (void *)&map->passive.flags);
> > + wake_up(&map->passive.inflight_accept_req);
> > +
> > + ret = bedata->rsp[req_id].ret;
> > + /* read ret, then set this rsp slot to be reused */
> > + smp_mb();
> > + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> > + return ret;
> > +}
> > +
> > static const struct xenbus_device_id pvcalls_front_ids[] = {
> > { "pvcalls" },
> > { "" }
> > diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> > index aa8fe10..ab4f1da 100644
> > --- a/drivers/xen/pvcalls-front.h
> > +++ b/drivers/xen/pvcalls-front.h
> > @@ -10,5 +10,8 @@ int pvcalls_front_bind(struct socket *sock,
> > struct sockaddr *addr,
> > int addr_len);
> > int pvcalls_front_listen(struct socket *sock, int backlog);
> > +int pvcalls_front_accept(struct socket *sock,
> > + struct socket *newsock,
> > + int flags);
> >
> > #endif
>
On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> On 7/25/2017 5:22 PM, Stefano Stabellini wrote:
> > Send PVCALLS_BIND to the backend. Introduce a new structure, part of
> > struct sock_mapping, to store information specific to passive sockets.
> >
> > Introduce a status field to keep track of the status of the passive
> > socket.
> >
> > Introduce a waitqueue for the "accept" command (see the accept command
> > implementation): it is used to allow only one outstanding accept
> > command at any given time and to implement polling on the passive
> > socket. Introduce a flags field to keep track of in-flight accept and
> > poll commands.
> >
> > sock->sk->sk_send_head is not used for ip sockets: reuse the field to
> > store a pointer to the struct sock_mapping corresponding to the socket.
> >
> > Convert the struct socket pointer into an uint64_t and use it as id for
> > the socket to pass to the backend.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-front.c | 73
> > +++++++++++++++++++++++++++++++++++++++++++++
> > drivers/xen/pvcalls-front.h | 3 ++
> > 2 files changed, 76 insertions(+)
> >
> > diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> > index d0f5f42..af2ce20 100644
> > --- a/drivers/xen/pvcalls-front.c
> > +++ b/drivers/xen/pvcalls-front.c
> > @@ -59,6 +59,23 @@ struct sock_mapping {
> > wait_queue_head_t inflight_conn_req;
> > } active;
> > + struct {
> > + /* Socket status */
> > +#define PVCALLS_STATUS_UNINITALIZED 0
> > +#define PVCALLS_STATUS_BIND 1
> > +#define PVCALLS_STATUS_LISTEN 2
> > + uint8_t status;
> > + /*
> > + * Internal state-machine flags.
> > + * Only one accept operation can be inflight for a socket.
> > + * Only one poll operation can be inflight for a given socket.
> > + */
> > +#define PVCALLS_FLAG_ACCEPT_INFLIGHT 0
> > +#define PVCALLS_FLAG_POLL_INFLIGHT 1
> > +#define PVCALLS_FLAG_POLL_RET 2
> > + uint8_t flags;
> > + wait_queue_head_t inflight_accept_req;
> > + } passive;
> > };
> > };
> > @@ -292,6 +309,62 @@ int pvcalls_front_connect(struct socket *sock, struct
> > sockaddr *addr,
> > return ret;
> > }
> > +int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int
> > addr_len)
> > +{
> > + struct pvcalls_bedata *bedata;
> > + struct sock_mapping *map = NULL;
> > + struct xen_pvcalls_request *req;
> > + int notify, req_id, ret;
> > +
> > + if (!pvcalls_front_dev)
> > + return -ENOTCONN;
> > + if (addr->sa_family != AF_INET || sock->type != SOCK_STREAM)
> > + return -ENOTSUPP;
> > + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> > +
> > + map = kzalloc(sizeof(*map), GFP_KERNEL);
> > + if (map == NULL)
> > + return -ENOMEM;
> > +
> > + spin_lock(&bedata->pvcallss_lock);
> > + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> > + if (RING_FULL(&bedata->ring) ||
> > + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> > + kfree(map);
> > + spin_unlock(&bedata->pvcallss_lock);
> > + return -EAGAIN;
> > + }
> > + req = RING_GET_REQUEST(&bedata->ring, req_id);
> > + req->req_id = req_id;
> > + map->sock = sock;
> > + req->cmd = PVCALLS_BIND;
> > + req->u.bind.id = (uint64_t) sock;
> > + memcpy(req->u.bind.addr, addr, sizeof(*addr));
> > + req->u.bind.len = addr_len;
> > +
> > + init_waitqueue_head(&map->passive.inflight_accept_req);
> > +
> > + list_add_tail(&map->list, &bedata->socketpass_mappings);
> > + WRITE_ONCE(sock->sk->sk_send_head, (void *)map);
> > + map->active_socket = false;
> > +
> > + bedata->ring.req_prod_pvt++;
> > + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> > + spin_unlock(&bedata->pvcallss_lock);
> > + if (notify)
> > + notify_remote_via_irq(bedata->irq);
> > +
> > + wait_event(bedata->inflight_req,
> > + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
>
> This all looks very similar to previous patches. Can it be factored out?
You are right that the pattern is the same for all commands:
- get a request
- fill the request
- possibly do something else
- wait
however each request is different, the struct and fields are different.
There are spin_lock and spin_unlock calls intermingled. I am not sure I
can factor out much of this. Maybe I could create a static inline or
macro as a syntactic sugar to replace the wait call, but that's pretty
much it I think.
> Also, you've used wait_event_interruptible in socket() implementation. Why not
> here (and connect())?
My intention was to use wait_event to wait for replies everywhere but I
missed some of them in the conversion (I used to use
wait_event_interruptible in early versions of the code).
The reason to use wait_event is that it makes it easier to handle the
rsp slot in bedata (bedata->rsp[req_id]): in case of EINTR the response
in bedata->rsp would not be cleared by anybody. If we use wait_event
there is no such problem, and the backend could still return EINTR and
we would handle it just fine as any other responses.
I'll make sure to use wait_event to wait for a response (like here), and
wait_event_interruptible elsewhere (like in recvmsg, where we don't risk
leaking a rsp slot).
> > +
> > + map->passive.status = PVCALLS_STATUS_BIND;
> > + ret = bedata->rsp[req_id].ret;
> > + /* read ret, then set this rsp slot to be reused */
> > + smp_mb();
> > + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> > + return 0;
> > +}
> > +
> > static const struct xenbus_device_id pvcalls_front_ids[] = {
> > { "pvcalls" },
> > { "" }
> > diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> > index 63b0417..8b0a274 100644
> > --- a/drivers/xen/pvcalls-front.h
> > +++ b/drivers/xen/pvcalls-front.h
> > @@ -6,5 +6,8 @@
> > int pvcalls_front_socket(struct socket *sock);
> > int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
> > int addr_len, int flags);
> > +int pvcalls_front_bind(struct socket *sock,
> > + struct sockaddr *addr,
> > + int addr_len);
> > #endif
On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> >> + count++;
> >> + else
> >> + wait_event_interruptible(map->active.inflight_conn_req,
> >> + pvcalls_front_read_todo(map));
> >> + }
> > Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
> > counting non-sleeping iterations but here we are sleeping so
> > PVCALLS_FRONT_MAX_SPIN (5000) may take a while.
> >
> > In fact, what shouldn't this waiting be a function of MSG_DONTWAIT
>
> err, which it already is. But the question still stands (except for
> MSG_DONTWAIT).
The code (admittedly unintuitive) is busy-looping (non-sleeping) for
5000 iterations *before* attempting to sleep. So in that regard, recvmsg
and sendmsg use PVCALLS_FRONT_MAX_SPIN in the same way: only for
non-sleeping iterations.
On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> On 07/25/2017 05:22 PM, Stefano Stabellini wrote:
> > For active sockets, check the indexes and use the inflight_conn_req
> > waitqueue to wait.
> >
> > For passive sockets, send PVCALLS_POLL to the backend. Use the
> > inflight_accept_req waitqueue if an accept is outstanding. Otherwise use
> > the inflight_req waitqueue: inflight_req is awaken when a new response
> > is received; on wakeup we check whether the POLL response is arrived by
> > looking at the PVCALLS_FLAG_POLL_RET flag. We set the flag from
> > pvcalls_front_event_handler, if the response was for a POLL command.
> >
> > In pvcalls_front_event_handler, get the struct socket pointer from the
> > poll id (we previously converted struct socket* to uint64_t and used it
> > as id).
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-front.c | 134 ++++++++++++++++++++++++++++++++++++++++----
> > drivers/xen/pvcalls-front.h | 3 +
> > 2 files changed, 126 insertions(+), 11 deletions(-)
> >
> > diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> > index b4ca569..833b717 100644
> > --- a/drivers/xen/pvcalls-front.c
> > +++ b/drivers/xen/pvcalls-front.c
> > @@ -130,17 +130,35 @@ static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> > rsp = RING_GET_RESPONSE(&bedata->ring, bedata->ring.rsp_cons);
> >
> > req_id = rsp->req_id;
> > - src = (uint8_t *)&bedata->rsp[req_id];
> > - src += sizeof(rsp->req_id);
> > - dst = (uint8_t *)rsp;
> > - dst += sizeof(rsp->req_id);
> > - memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
> > - /*
> > - * First copy the rest of the data, then req_id. It is
> > - * paired with the barrier when accessing bedata->rsp.
> > - */
> > - smp_wmb();
> > - WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
> > + if (rsp->cmd == PVCALLS_POLL) {
> > + struct socket *sock = (struct socket *) rsp->u.poll.id;
> > + struct sock_mapping *map =
> > + (struct sock_mapping *)
> > + READ_ONCE(sock->sk->sk_send_head);
> > +
> > + set_bit(PVCALLS_FLAG_POLL_RET,
> > + (void *)&map->passive.flags);
> > + /*
> > + * Set RET, then clear INFLIGHT. It pairs with
> > + * the checks at the beginning of
> > + * pvcalls_front_poll_passive.
> > + */
> > + smp_wmb();
>
> virt_wmb() (here, below, and I think I saw it somewhere else)
This smp_wmb is an internal barrier to the frontend, it doesn't
synchronize with the backend. It synchronizes only within the frontend.
A uniprocessor frontend wouldn't actually need this barrier. I admit it
is a bit confusing, I'll add a comment about this.
> > + clear_bit(PVCALLS_FLAG_POLL_INFLIGHT,
> > + (void *)&map->passive.flags);
> > + } else {
> > + src = (uint8_t *)&bedata->rsp[req_id];
> > + src += sizeof(rsp->req_id);
> > + dst = (uint8_t *)rsp;
> > + dst += sizeof(rsp->req_id);
> > + memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
> > + /*
> > + * First copy the rest of the data, then req_id. It is
> > + * paired with the barrier when accessing bedata->rsp.
> > + */
> > + smp_wmb();
> > + WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
> > + }
> >
> > done = 1;
> > bedata->ring.rsp_cons++;
> > @@ -707,6 +725,100 @@ int pvcalls_front_accept(struct socket *sock, struct socket *newsock, int flags)
> > return ret;
> > }
> >
> > +static unsigned int pvcalls_front_poll_passive(struct file *file,
> > + struct pvcalls_bedata *bedata,
> > + struct sock_mapping *map,
> > + poll_table *wait)
> > +{
> > + int notify, req_id;
> > + struct xen_pvcalls_request *req;
> > +
> > + if (test_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
> > + (void *)&map->passive.flags)) {
> > + poll_wait(file, &map->passive.inflight_accept_req, wait);
> > + return 0;
> > + }
> > +
> > + if (test_and_clear_bit(PVCALLS_FLAG_POLL_RET,
> > + (void *)&map->passive.flags))
> > + return POLLIN;
> > +
> > + /*
> > + * First check RET, then INFLIGHT. No barriers necessary to
> > + * ensure execution ordering because of the conditional
> > + * instructions creating control dependencies.
> > + */
> > +
> > + if (test_and_set_bit(PVCALLS_FLAG_POLL_INFLIGHT,
> > + (void *)&map->passive.flags)) {
> > + poll_wait(file, &bedata->inflight_req, wait);
> > + return 0;
> > + }
> > +
> > + spin_lock(&bedata->pvcallss_lock);
> > + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> > + if (RING_FULL(&bedata->ring) ||
> > + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> > + spin_unlock(&bedata->pvcallss_lock);
> > + return -EAGAIN;
> > + }
> > + req = RING_GET_REQUEST(&bedata->ring, req_id);
> > + req->req_id = req_id;
> > + req->cmd = PVCALLS_POLL;
> > + req->u.poll.id = (uint64_t) map->sock;
> > +
> > + bedata->ring.req_prod_pvt++;
> > + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> > + spin_unlock(&bedata->pvcallss_lock);
> > + if (notify)
> > + notify_remote_via_irq(bedata->irq);
> > +
> > + poll_wait(file, &bedata->inflight_req, wait);
> > + return 0;
> > +}
> > +
> > +static unsigned int pvcalls_front_poll_active(struct file *file,
> > + struct pvcalls_bedata *bedata,
> > + struct sock_mapping *map,
> > + poll_table *wait)
> > +{
> > + unsigned int mask = 0;
> > + int32_t in_error, out_error;
> > + struct pvcalls_data_intf *intf = map->active.ring;
> > +
> > + out_error = intf->out_error;
> > + in_error = intf->in_error;
> > +
> > + poll_wait(file, &map->active.inflight_conn_req, wait);
> > + if (pvcalls_front_write_todo(map))
> > + mask |= POLLOUT | POLLWRNORM;
> > + if (pvcalls_front_read_todo(map))
> > + mask |= POLLIN | POLLRDNORM;
> > + if (in_error != 0 || out_error != 0)
> > + mask |= POLLERR;
> > +
> > + return mask;
> > +}
> > +
> > +unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
> > + poll_table *wait)
> > +{
> > + struct pvcalls_bedata *bedata;
> > + struct sock_mapping *map;
> > +
> > + if (!pvcalls_front_dev)
> > + return POLLNVAL;
> > + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> > +
> > + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> > + if (!map)
> > + return POLLNVAL;
> > + if (map->active_socket)
> > + return pvcalls_front_poll_active(file, bedata, map, wait);
> > + else
> > + return pvcalls_front_poll_passive(file, bedata, map, wait);
> > +}
>
> All other routines return an int (0 or error code). Shouldn't they all
> have the same return type? (In fact, where are all these routines called
> from?)
poll is the only one different because it is meant to be used to
implement the "poll" function which is the only one to return unsigned
int instead of int. They caller is not part of this series, but give a
look at include/linux/net.h:struct proto_ops, it should give you a
pretty good idea.
> > static const struct xenbus_device_id pvcalls_front_ids[] = {
> > { "pvcalls" },
> > { "" }
> > diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> > index de24041..25e05b8 100644
> > --- a/drivers/xen/pvcalls-front.h
> > +++ b/drivers/xen/pvcalls-front.h
> > @@ -20,5 +20,8 @@ int pvcalls_front_recvmsg(struct socket *sock,
> > struct msghdr *msg,
> > size_t len,
> > int flags);
> > +unsigned int pvcalls_front_poll(struct file *file,
> > + struct socket *sock,
> > + poll_table *wait);
> >
> > #endif
>
On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> On 7/25/2017 5:21 PM, Stefano Stabellini wrote:
> > Implement the probe function for the pvcalls frontend. Read the
> > supported versions, max-page-order and function-calls nodes from
> > xenstore.
> >
> > Introduce a data structure named pvcalls_bedata. It contains pointers to
> > the command ring, the event channel, a list of active sockets and a list
> > of passive sockets. Lists accesses are protected by a spin_lock.
> >
> > Introduce a waitqueue to allow waiting for a response on commands sent
> > to the backend.
> >
> > Introduce an array of struct xen_pvcalls_response to store commands
> > responses.
> >
> > Only one frontend<->backend connection is supported at any given time
> > for a guest. Store the active frontend device to a static pointer.
> >
> > Introduce a stub functions for the event handler.
> >
> > Signed-off-by: Stefano Stabellini <[email protected]>
> > CC: [email protected]
> > CC: [email protected]
> > ---
> > drivers/xen/pvcalls-front.c | 153
> > ++++++++++++++++++++++++++++++++++++++++++++
> > 1 file changed, 153 insertions(+)
> >
> > diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> > index a8d38c2..5e0b265 100644
> > --- a/drivers/xen/pvcalls-front.c
> > +++ b/drivers/xen/pvcalls-front.c
> > @@ -20,6 +20,29 @@
> > #include <xen/xenbus.h>
> > #include <xen/interface/io/pvcalls.h>
> > +#define PVCALLS_INVALID_ID (UINT_MAX)
>
> Unnecessary parentheses
OK
> > +#define RING_ORDER XENBUS_MAX_RING_GRANT_ORDER
>
> PVCALLS_RING_ORDER?
Sure
> > +#define PVCALLS_NR_REQ_PER_RING __CONST_RING_SIZE(xen_pvcalls,
> > XEN_PAGE_SIZE)
> > +
> > +struct pvcalls_bedata {
> > + struct xen_pvcalls_front_ring ring;
> > + grant_ref_t ref;
> > + int irq;
> > +
> > + struct list_head socket_mappings;
> > + struct list_head socketpass_mappings;
> > + spinlock_t pvcallss_lock;
> > +
> > + wait_queue_head_t inflight_req;
> > + struct xen_pvcalls_response rsp[PVCALLS_NR_REQ_PER_RING];
> > +};
> > +struct xenbus_device *pvcalls_front_dev;
>
> static
good point, I'll fix
> > +
> > +static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> > +{
> > + return IRQ_HANDLED;
> > +}
> > +
> > static const struct xenbus_device_id pvcalls_front_ids[] = {
> > { "pvcalls" },
> > { "" }
> > @@ -33,12 +56,142 @@ static int pvcalls_front_remove(struct xenbus_device
> > *dev)
> > static int pvcalls_front_probe(struct xenbus_device *dev,
> > const struct xenbus_device_id *id)
> > {
> > + int ret = -EFAULT, evtchn, ref = -1, i;
> > + unsigned int max_page_order, function_calls, len;
> > + char *versions;
> > + grant_ref_t gref_head = 0;
> > + struct xenbus_transaction xbt;
> > + struct pvcalls_bedata *bedata = NULL;
> > + struct xen_pvcalls_sring *sring;
> > +
> > + if (pvcalls_front_dev != NULL) {
> > + dev_err(&dev->dev, "only one PV Calls connection
> > supported\n");
> > + return -EINVAL;
> > + }
> > +
> > + versions = xenbus_read(XBT_NIL, dev->otherend, "versions", &len);
> > + if (!len)
> > + return -EINVAL;
> > + if (strcmp(versions, "1")) {
> > + kfree(versions);
> > + return -EINVAL;
> > + }
> > + kfree(versions);
> > + ret = xenbus_scanf(XBT_NIL, dev->otherend,
> > + "max-page-order", "%u", &max_page_order);
> > + if (ret <= 0)
> > + return -ENODEV;
> > + if (max_page_order < RING_ORDER)
> > + return -ENODEV;
> > + ret = xenbus_scanf(XBT_NIL, dev->otherend,
> > + "function-calls", "%u", &function_calls);
> > + if (ret <= 0 || function_calls != 1)
> > + return -ENODEV;
> > + pr_info("%s max-page-order is %u\n", __func__, max_page_order);
> > +
> > + bedata = kzalloc(sizeof(struct pvcalls_bedata), GFP_KERNEL);
> > + if (!bedata)
> > + return -ENOMEM;
> > +
> > + init_waitqueue_head(&bedata->inflight_req);
> > + for (i = 0; i < PVCALLS_NR_REQ_PER_RING; i++)
> > + bedata->rsp[i].req_id = PVCALLS_INVALID_ID;
> > +
> > + sring = (struct xen_pvcalls_sring *) __get_free_page(GFP_KERNEL |
> > + __GFP_ZERO);
> > + if (!sring)
> > + goto error;
> > + SHARED_RING_INIT(sring);
> > + FRONT_RING_INIT(&bedata->ring, sring, XEN_PAGE_SIZE);
> > +
> > + ret = xenbus_alloc_evtchn(dev, &evtchn);
> > + if (ret)
> > + goto error;
> > +
> > + bedata->irq = bind_evtchn_to_irqhandler(evtchn,
> > + pvcalls_front_event_handler,
> > + 0, "pvcalls-frontend", dev);
> > + if (bedata->irq < 0) {
> > + ret = bedata->irq;
> > + goto error;
> > + }
> > +
> > + ret = gnttab_alloc_grant_references(1, &gref_head);
> > + if (ret < 0)
> > + goto error;
> > + bedata->ref = ref = gnttab_claim_grant_reference(&gref_head);
>
> Is ref really needed?
No, I'll remove it
> > + if (ref < 0)
> > + goto error;
> > + gnttab_grant_foreign_access_ref(ref, dev->otherend_id,
> > + virt_to_gfn((void *)sring), 0);
> > +
> > + again:
> > + ret = xenbus_transaction_start(&xbt);
> > + if (ret) {
> > + xenbus_dev_fatal(dev, ret, "starting transaction");
> > + goto error;
> > + }
> > + ret = xenbus_printf(xbt, dev->nodename, "version", "%u", 1);
> > + if (ret)
> > + goto error_xenbus;
> > + ret = xenbus_printf(xbt, dev->nodename, "ring-ref", "%d", ref);
> > + if (ret)
> > + goto error_xenbus;
> > + ret = xenbus_printf(xbt, dev->nodename, "port", "%u",
> > + evtchn);
> > + if (ret)
> > + goto error_xenbus;
> > + ret = xenbus_transaction_end(xbt, 0);
> > + if (ret) {
> > + if (ret == -EAGAIN)
> > + goto again;
> > + xenbus_dev_fatal(dev, ret, "completing transaction");
> > + goto error;
> > + }
> > +
> > + INIT_LIST_HEAD(&bedata->socket_mappings);
> > + INIT_LIST_HEAD(&bedata->socketpass_mappings);
> > + spin_lock_init(&bedata->pvcallss_lock);
> > + dev_set_drvdata(&dev->dev, bedata);
> > + pvcalls_front_dev = dev;
> > + xenbus_switch_state(dev, XenbusStateInitialised);
> > +
> > return 0;
> > +
> > + error_xenbus:
> > + xenbus_transaction_end(xbt, 1);
> > + xenbus_dev_fatal(dev, ret, "writing xenstore");
> > + error:
> > + pvcalls_front_remove(dev);
>
> I think patch 12 (where you implement cleanup) could be moved before this one.
I'll move the patch
> I also think you are leaking bedata on error paths.
bedata is freed by pvcalls_front_remove (kfree(bedata)), why do you say
so?
> > + return ret;
> > }
> > static void pvcalls_front_changed(struct xenbus_device *dev,
> > enum xenbus_state backend_state)
> > {
> > + switch (backend_state) {
> > + case XenbusStateReconfiguring:
> > + case XenbusStateReconfigured:
> > + case XenbusStateInitialising:
> > + case XenbusStateInitialised:
> > + case XenbusStateUnknown:
> > + break;
> > +
> > + case XenbusStateInitWait:
> > + break;
> > +
> > + case XenbusStateConnected:
> > + xenbus_switch_state(dev, XenbusStateConnected);
> > + break;
> > +
> > + case XenbusStateClosed:
> > + if (dev->state == XenbusStateClosed)
> > + break;
> > + /* Missed the backend's CLOSING state -- fallthrough */
> > + case XenbusStateClosing:
> > + xenbus_frontend_closed(dev);
> > + break;
> > + }
> > }
> > static struct xenbus_driver pvcalls_front_driver = {
>> This all looks very similar to previous patches. Can it be factored out?
> You are right that the pattern is the same for all commands:
> - get a request
> - fill the request
> - possibly do something else
> - wait
> however each request is different, the struct and fields are different.
> There are spin_lock and spin_unlock calls intermingled. I am not sure I
> can factor out much of this. Maybe I could create a static inline or
> macro as a syntactic sugar to replace the wait call, but that's pretty
> much it I think.
Maybe you could factor out common fragments, not necessarily the whole
thing at once?
For example,
static inline int get_request(*bedata, int *req_id)
{
*req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
if (RING_FULL(&bedata->ring) ||
READ_ONCE(bedata->rsp[*req_id].req_id) != PVCALLS_INVALID_ID) {
return -EAGAIN;
return 0;
}
(or some such)
>
>
>> Also, you've used wait_event_interruptible in socket() implementation. Why not
>> here (and connect())?
> My intention was to use wait_event to wait for replies everywhere but I
> missed some of them in the conversion (I used to use
> wait_event_interruptible in early versions of the code).
>
> The reason to use wait_event is that it makes it easier to handle the
> rsp slot in bedata (bedata->rsp[req_id]): in case of EINTR the response
> in bedata->rsp would not be cleared by anybody. If we use wait_event
> there is no such problem, and the backend could still return EINTR and
> we would handle it just fine as any other responses.
I was actually wondering about this myself when I was looking at
socket() but then I somehow convinced myself (incorrectly!) that it was OK.
-boris
On 07/26/2017 08:08 PM, Stefano Stabellini wrote:
> On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
>>>> + count++;
>>>> + else
>>>> + wait_event_interruptible(map->active.inflight_conn_req,
>>>> + pvcalls_front_read_todo(map));
>>>> + }
>>> Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
>>> counting non-sleeping iterations but here we are sleeping so
>>> PVCALLS_FRONT_MAX_SPIN (5000) may take a while.
>>>
>>> In fact, what shouldn't this waiting be a function of MSG_DONTWAIT
>> err, which it already is. But the question still stands (except for
>> MSG_DONTWAIT).
> The code (admittedly unintuitive) is busy-looping (non-sleeping) for
> 5000 iterations *before* attempting to sleep. So in that regard, recvmsg
> and sendmsg use PVCALLS_FRONT_MAX_SPIN in the same way: only for
> non-sleeping iterations.
>
OK.
Why not go directly into wait_event_interruptible()? I see you write in
the commit message
If not enough data is available on the ring, rather than returning
immediately or sleep-waiting, spin for up to 5000 cycles. This small
optimization turns out to improve performance and latency significantly.
Is this because of scheduling latency? I think this should be mentioned not just in the commit message but also as a comment in the code.
(I also think it's not "not enough data" but rather "no data"?)
-boris
>>> static int pvcalls_front_probe(struct xenbus_device *dev,
>>> const struct xenbus_device_id *id)
>>> {
>>> + int ret = -EFAULT, evtchn, ref = -1, i;
>>> + unsigned int max_page_order, function_calls, len;
>>> + char *versions;
>>> + grant_ref_t gref_head = 0;
>>> + struct xenbus_transaction xbt;
>>> + struct pvcalls_bedata *bedata = NULL;
>>> + struct xen_pvcalls_sring *sring;
>>> +
>>> + if (pvcalls_front_dev != NULL) {
>>> + dev_err(&dev->dev, "only one PV Calls connection
>>> supported\n");
>>> + return -EINVAL;
>>> + }
>>> +
>>> + versions = xenbus_read(XBT_NIL, dev->otherend, "versions", &len);
>>> + if (!len)
>>> + return -EINVAL;
>>> + if (strcmp(versions, "1")) {
>>> + kfree(versions);
>>> + return -EINVAL;
>>> + }
>>> + kfree(versions);
>>> + ret = xenbus_scanf(XBT_NIL, dev->otherend,
>>> + "max-page-order", "%u", &max_page_order);
>>> + if (ret <= 0)
>>> + return -ENODEV;
>>> + if (max_page_order < RING_ORDER)
>>> + return -ENODEV;
>>> + ret = xenbus_scanf(XBT_NIL, dev->otherend,
>>> + "function-calls", "%u", &function_calls);
>>> + if (ret <= 0 || function_calls != 1)
>>> + return -ENODEV;
>>> + pr_info("%s max-page-order is %u\n", __func__, max_page_order);
>>> +
>>> + bedata = kzalloc(sizeof(struct pvcalls_bedata), GFP_KERNEL);
>>> + if (!bedata)
>>> + return -ENOMEM;
>>> +
>>> + init_waitqueue_head(&bedata->inflight_req);
>>> + for (i = 0; i < PVCALLS_NR_REQ_PER_RING; i++)
>>> + bedata->rsp[i].req_id = PVCALLS_INVALID_ID;
>>> +
>>> + sring = (struct xen_pvcalls_sring *) __get_free_page(GFP_KERNEL |
>>> + __GFP_ZERO);
>>> + if (!sring)
>>> + goto error;
>>> + SHARED_RING_INIT(sring);
>>> + FRONT_RING_INIT(&bedata->ring, sring, XEN_PAGE_SIZE);
>>> +
>>> + ret = xenbus_alloc_evtchn(dev, &evtchn);
>>> + if (ret)
>>> + goto error;
>>> +
>>> + bedata->irq = bind_evtchn_to_irqhandler(evtchn,
>>> + pvcalls_front_event_handler,
>>> + 0, "pvcalls-frontend", dev);
>>> + if (bedata->irq < 0) {
>>> + ret = bedata->irq;
>>> + goto error;
>>> + }
>>> +
>>> + ret = gnttab_alloc_grant_references(1, &gref_head);
>>> + if (ret < 0)
>>> + goto error;
>>> + bedata->ref = ref = gnttab_claim_grant_reference(&gref_head);
>> Is ref really needed?
> No, I'll remove it
>
>
>>> + if (ref < 0)
>>> + goto error;
>>> + gnttab_grant_foreign_access_ref(ref, dev->otherend_id,
>>> + virt_to_gfn((void *)sring), 0);
>>> +
>>> + again:
>>> + ret = xenbus_transaction_start(&xbt);
>>> + if (ret) {
>>> + xenbus_dev_fatal(dev, ret, "starting transaction");
>>> + goto error;
>>> + }
>>> + ret = xenbus_printf(xbt, dev->nodename, "version", "%u", 1);
>>> + if (ret)
>>> + goto error_xenbus;
>>> + ret = xenbus_printf(xbt, dev->nodename, "ring-ref", "%d", ref);
>>> + if (ret)
>>> + goto error_xenbus;
>>> + ret = xenbus_printf(xbt, dev->nodename, "port", "%u",
>>> + evtchn);
>>> + if (ret)
>>> + goto error_xenbus;
>>> + ret = xenbus_transaction_end(xbt, 0);
>>> + if (ret) {
>>> + if (ret == -EAGAIN)
>>> + goto again;
>>> + xenbus_dev_fatal(dev, ret, "completing transaction");
>>> + goto error;
>>> + }
>>> +
>>> + INIT_LIST_HEAD(&bedata->socket_mappings);
>>> + INIT_LIST_HEAD(&bedata->socketpass_mappings);
>>> + spin_lock_init(&bedata->pvcallss_lock);
>>> + dev_set_drvdata(&dev->dev, bedata);
>>> + pvcalls_front_dev = dev;
>>> + xenbus_switch_state(dev, XenbusStateInitialised);
>>> +
>>> return 0;
>>> +
>>> + error_xenbus:
>>> + xenbus_transaction_end(xbt, 1);
>>> + xenbus_dev_fatal(dev, ret, "writing xenstore");
>>> + error:
>>> + pvcalls_front_remove(dev);
>> I think patch 12 (where you implement cleanup) could be moved before this one.
> I'll move the patch
>
>
>> I also think you are leaking bedata on error paths.
> bedata is freed by pvcalls_front_remove (kfree(bedata)), why do you say
> so?
bedata there is read from dev_get_drvdata() and here you assign drvdata
at the very end.
Come think of it, pvcalls_front_remove() should probably first check
whether bedata is valid. Or drvdata should be assigned right away in
this routine, before any 'got error/error_xenbus'.
-boris
>
>
>>> + return ret;
>>> }
>>>
> +int pvcalls_front_release(struct socket *sock)
> +{
> + struct pvcalls_bedata *bedata;
> + struct sock_mapping *map;
> + int req_id, notify;
> + struct xen_pvcalls_request *req;
> +
> + if (!pvcalls_front_dev)
> + return -EIO;
> + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> + if (!bedata)
> + return -EIO;
Some (all?) other ops don't check bedata validity. Should they all do?
> +
> + if (sock->sk == NULL)
> + return 0;
> +
> + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> + if (map == NULL)
> + return 0;
> +
> + spin_lock(&bedata->pvcallss_lock);
> + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> + if (RING_FULL(&bedata->ring) ||
> + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> + spin_unlock(&bedata->pvcallss_lock);
> + return -EAGAIN;
> + }
> + WRITE_ONCE(sock->sk->sk_send_head, NULL);
> +
> + req = RING_GET_REQUEST(&bedata->ring, req_id);
> + req->req_id = req_id;
> + req->cmd = PVCALLS_RELEASE;
> + req->u.release.id = (uint64_t)sock;
> +
> + bedata->ring.req_prod_pvt++;
> + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> + spin_unlock(&bedata->pvcallss_lock);
> + if (notify)
> + notify_remote_via_irq(bedata->irq);
> +
> + wait_event(bedata->inflight_req,
> + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
> +
> + if (map->active_socket) {
> + /*
> + * Set in_error and wake up inflight_conn_req to force
> + * recvmsg waiters to exit.
> + */
> + map->active.ring->in_error = -EBADF;
> + wake_up_interruptible(&map->active.inflight_conn_req);
> +
> + mutex_lock(&map->active.in_mutex);
> + mutex_lock(&map->active.out_mutex);
> + pvcalls_front_free_map(bedata, map);
> + mutex_unlock(&map->active.out_mutex);
> + mutex_unlock(&map->active.in_mutex);
> + kfree(map);
Since you are locking here I assume you expect that someone else might
also be trying to lock the map. But you are freeing it immediately after
unlocking. Wouldn't that mean that whoever is trying to grab the lock
might then dereference freed memory?
-boris
> + } else {
> + spin_lock(&bedata->pvcallss_lock);
> + list_del_init(&map->list);
> + kfree(map);
> + spin_unlock(&bedata->pvcallss_lock);
> + }
> + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> +
> + return 0;
> +}
> +
> static const struct xenbus_device_id pvcalls_front_ids[] = {
> { "pvcalls" },
> { "" }
> diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> index 25e05b8..3332978 100644
> --- a/drivers/xen/pvcalls-front.h
> +++ b/drivers/xen/pvcalls-front.h
> @@ -23,5 +23,6 @@ int pvcalls_front_recvmsg(struct socket *sock,
> unsigned int pvcalls_front_poll(struct file *file,
> struct socket *sock,
> poll_table *wait);
> +int pvcalls_front_release(struct socket *sock);
>
> #endif
On Thu, 27 Jul 2017, Boris Ostrovsky wrote:
> >>> static int pvcalls_front_probe(struct xenbus_device *dev,
> >>> const struct xenbus_device_id *id)
> >>> {
> >>> + int ret = -EFAULT, evtchn, ref = -1, i;
> >>> + unsigned int max_page_order, function_calls, len;
> >>> + char *versions;
> >>> + grant_ref_t gref_head = 0;
> >>> + struct xenbus_transaction xbt;
> >>> + struct pvcalls_bedata *bedata = NULL;
> >>> + struct xen_pvcalls_sring *sring;
> >>> +
> >>> + if (pvcalls_front_dev != NULL) {
> >>> + dev_err(&dev->dev, "only one PV Calls connection
> >>> supported\n");
> >>> + return -EINVAL;
> >>> + }
> >>> +
> >>> + versions = xenbus_read(XBT_NIL, dev->otherend, "versions", &len);
> >>> + if (!len)
> >>> + return -EINVAL;
> >>> + if (strcmp(versions, "1")) {
> >>> + kfree(versions);
> >>> + return -EINVAL;
> >>> + }
> >>> + kfree(versions);
> >>> + ret = xenbus_scanf(XBT_NIL, dev->otherend,
> >>> + "max-page-order", "%u", &max_page_order);
> >>> + if (ret <= 0)
> >>> + return -ENODEV;
> >>> + if (max_page_order < RING_ORDER)
> >>> + return -ENODEV;
> >>> + ret = xenbus_scanf(XBT_NIL, dev->otherend,
> >>> + "function-calls", "%u", &function_calls);
> >>> + if (ret <= 0 || function_calls != 1)
> >>> + return -ENODEV;
> >>> + pr_info("%s max-page-order is %u\n", __func__, max_page_order);
> >>> +
> >>> + bedata = kzalloc(sizeof(struct pvcalls_bedata), GFP_KERNEL);
> >>> + if (!bedata)
> >>> + return -ENOMEM;
> >>> +
> >>> + init_waitqueue_head(&bedata->inflight_req);
> >>> + for (i = 0; i < PVCALLS_NR_REQ_PER_RING; i++)
> >>> + bedata->rsp[i].req_id = PVCALLS_INVALID_ID;
> >>> +
> >>> + sring = (struct xen_pvcalls_sring *) __get_free_page(GFP_KERNEL |
> >>> + __GFP_ZERO);
> >>> + if (!sring)
> >>> + goto error;
> >>> + SHARED_RING_INIT(sring);
> >>> + FRONT_RING_INIT(&bedata->ring, sring, XEN_PAGE_SIZE);
> >>> +
> >>> + ret = xenbus_alloc_evtchn(dev, &evtchn);
> >>> + if (ret)
> >>> + goto error;
> >>> +
> >>> + bedata->irq = bind_evtchn_to_irqhandler(evtchn,
> >>> + pvcalls_front_event_handler,
> >>> + 0, "pvcalls-frontend", dev);
> >>> + if (bedata->irq < 0) {
> >>> + ret = bedata->irq;
> >>> + goto error;
> >>> + }
> >>> +
> >>> + ret = gnttab_alloc_grant_references(1, &gref_head);
> >>> + if (ret < 0)
> >>> + goto error;
> >>> + bedata->ref = ref = gnttab_claim_grant_reference(&gref_head);
> >> Is ref really needed?
> > No, I'll remove it
> >
> >
> >>> + if (ref < 0)
> >>> + goto error;
> >>> + gnttab_grant_foreign_access_ref(ref, dev->otherend_id,
> >>> + virt_to_gfn((void *)sring), 0);
> >>> +
> >>> + again:
> >>> + ret = xenbus_transaction_start(&xbt);
> >>> + if (ret) {
> >>> + xenbus_dev_fatal(dev, ret, "starting transaction");
> >>> + goto error;
> >>> + }
> >>> + ret = xenbus_printf(xbt, dev->nodename, "version", "%u", 1);
> >>> + if (ret)
> >>> + goto error_xenbus;
> >>> + ret = xenbus_printf(xbt, dev->nodename, "ring-ref", "%d", ref);
> >>> + if (ret)
> >>> + goto error_xenbus;
> >>> + ret = xenbus_printf(xbt, dev->nodename, "port", "%u",
> >>> + evtchn);
> >>> + if (ret)
> >>> + goto error_xenbus;
> >>> + ret = xenbus_transaction_end(xbt, 0);
> >>> + if (ret) {
> >>> + if (ret == -EAGAIN)
> >>> + goto again;
> >>> + xenbus_dev_fatal(dev, ret, "completing transaction");
> >>> + goto error;
> >>> + }
> >>> +
> >>> + INIT_LIST_HEAD(&bedata->socket_mappings);
> >>> + INIT_LIST_HEAD(&bedata->socketpass_mappings);
> >>> + spin_lock_init(&bedata->pvcallss_lock);
> >>> + dev_set_drvdata(&dev->dev, bedata);
> >>> + pvcalls_front_dev = dev;
> >>> + xenbus_switch_state(dev, XenbusStateInitialised);
> >>> +
> >>> return 0;
> >>> +
> >>> + error_xenbus:
> >>> + xenbus_transaction_end(xbt, 1);
> >>> + xenbus_dev_fatal(dev, ret, "writing xenstore");
> >>> + error:
> >>> + pvcalls_front_remove(dev);
> >> I think patch 12 (where you implement cleanup) could be moved before this one.
> > I'll move the patch
> >
> >
> >> I also think you are leaking bedata on error paths.
> > bedata is freed by pvcalls_front_remove (kfree(bedata)), why do you say
> > so?
>
> bedata there is read from dev_get_drvdata() and here you assign drvdata
> at the very end.
>
> Come think of it, pvcalls_front_remove() should probably first check
> whether bedata is valid. Or drvdata should be assigned right away in
> this routine, before any 'got error/error_xenbus'.
Yes, I'll do that
On Thu, 27 Jul 2017, Boris Ostrovsky wrote:
> >> This all looks very similar to previous patches. Can it be factored out?
> > You are right that the pattern is the same for all commands:
> > - get a request
> > - fill the request
> > - possibly do something else
> > - wait
> > however each request is different, the struct and fields are different.
> > There are spin_lock and spin_unlock calls intermingled. I am not sure I
> > can factor out much of this. Maybe I could create a static inline or
> > macro as a syntactic sugar to replace the wait call, but that's pretty
> > much it I think.
>
> Maybe you could factor out common fragments, not necessarily the whole
> thing at once?
>
> For example,
>
> static inline int get_request(*bedata, int *req_id)
> {
>
> *req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> if (RING_FULL(&bedata->ring) ||
> READ_ONCE(bedata->rsp[*req_id].req_id) != PVCALLS_INVALID_ID) {
> return -EAGAIN;
> return 0;
> }
>
> (or some such)
You are right, the code looks better this way. I'll add it.
On Thu, 27 Jul 2017, Boris Ostrovsky wrote:
> On 07/26/2017 08:08 PM, Stefano Stabellini wrote:
> > On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> >>>> + count++;
> >>>> + else
> >>>> + wait_event_interruptible(map->active.inflight_conn_req,
> >>>> + pvcalls_front_read_todo(map));
> >>>> + }
> >>> Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
> >>> counting non-sleeping iterations but here we are sleeping so
> >>> PVCALLS_FRONT_MAX_SPIN (5000) may take a while.
> >>>
> >>> In fact, what shouldn't this waiting be a function of MSG_DONTWAIT
> >> err, which it already is. But the question still stands (except for
> >> MSG_DONTWAIT).
> > The code (admittedly unintuitive) is busy-looping (non-sleeping) for
> > 5000 iterations *before* attempting to sleep. So in that regard, recvmsg
> > and sendmsg use PVCALLS_FRONT_MAX_SPIN in the same way: only for
> > non-sleeping iterations.
> >
>
> OK.
>
> Why not go directly into wait_event_interruptible()? I see you write in
> the commit message
>
> If not enough data is available on the ring, rather than returning
> immediately or sleep-waiting, spin for up to 5000 cycles. This small
> optimization turns out to improve performance and latency significantly.
>
>
> Is this because of scheduling latency? I think this should be mentioned not just in the commit message but also as a comment in the code.
It tries to mitigate scheduling latencies on both ends (dom0 and domU)
when the ring buffer is the bottleneck (high bandwidth connections). But
to be honest with you, it's mostly beneficial in the sendmsg case,
because for recvmsg we also introduce a busy-wait in regular
circumstances, when no data is actually available. I confirmed this
statement with a quick iperf test. I'll remove the spin from recvmsg and
keep it in sendmsg.
>
> (I also think it's not "not enough data" but rather "no data"?)
you are right
On Thu, 27 Jul 2017, Boris Ostrovsky wrote:
> > +int pvcalls_front_release(struct socket *sock)
> > +{
> > + struct pvcalls_bedata *bedata;
> > + struct sock_mapping *map;
> > + int req_id, notify;
> > + struct xen_pvcalls_request *req;
> > +
> > + if (!pvcalls_front_dev)
> > + return -EIO;
> > + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> > + if (!bedata)
> > + return -EIO;
>
> Some (all?) other ops don't check bedata validity. Should they all do?
No, I don't think they should: dev_set_drvdata is called in the probe
function (pvcalls_front_probe). I'll remove it.
> > +
> > + if (sock->sk == NULL)
> > + return 0;
> > +
> > + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> > + if (map == NULL)
> > + return 0;
> > +
> > + spin_lock(&bedata->pvcallss_lock);
> > + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> > + if (RING_FULL(&bedata->ring) ||
> > + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> > + spin_unlock(&bedata->pvcallss_lock);
> > + return -EAGAIN;
> > + }
> > + WRITE_ONCE(sock->sk->sk_send_head, NULL);
> > +
> > + req = RING_GET_REQUEST(&bedata->ring, req_id);
> > + req->req_id = req_id;
> > + req->cmd = PVCALLS_RELEASE;
> > + req->u.release.id = (uint64_t)sock;
> > +
> > + bedata->ring.req_prod_pvt++;
> > + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> > + spin_unlock(&bedata->pvcallss_lock);
> > + if (notify)
> > + notify_remote_via_irq(bedata->irq);
> > +
> > + wait_event(bedata->inflight_req,
> > + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
> > +
> > + if (map->active_socket) {
> > + /*
> > + * Set in_error and wake up inflight_conn_req to force
> > + * recvmsg waiters to exit.
> > + */
> > + map->active.ring->in_error = -EBADF;
> > + wake_up_interruptible(&map->active.inflight_conn_req);
> > +
> > + mutex_lock(&map->active.in_mutex);
> > + mutex_lock(&map->active.out_mutex);
> > + pvcalls_front_free_map(bedata, map);
> > + mutex_unlock(&map->active.out_mutex);
> > + mutex_unlock(&map->active.in_mutex);
> > + kfree(map);
>
> Since you are locking here I assume you expect that someone else might
> also be trying to lock the map. But you are freeing it immediately after
> unlocking. Wouldn't that mean that whoever is trying to grab the lock
> might then dereference freed memory?
The lock is to make sure there are no recvmsg or sendmsg in progress. We
are sure that no newer sendmsg or recvmsg are waiting for
pvcalls_front_release to release the lock because before send a message
to the backend we set sk_send_head to NULL.
> > + } else {
> > + spin_lock(&bedata->pvcallss_lock);
> > + list_del_init(&map->list);
> > + kfree(map);
> > + spin_unlock(&bedata->pvcallss_lock);
> > + }
> > + WRITE_ONCE(bedata->rsp[req_id].req_id, PVCALLS_INVALID_ID);
> > +
> > + return 0;
> > +}
> > +
> > static const struct xenbus_device_id pvcalls_front_ids[] = {
> > { "pvcalls" },
> > { "" }
> > diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> > index 25e05b8..3332978 100644
> > --- a/drivers/xen/pvcalls-front.h
> > +++ b/drivers/xen/pvcalls-front.h
> > @@ -23,5 +23,6 @@ int pvcalls_front_recvmsg(struct socket *sock,
> > unsigned int pvcalls_front_poll(struct file *file,
> > struct socket *sock,
> > poll_table *wait);
> > +int pvcalls_front_release(struct socket *sock);
> >
> > #endif
On 07/31/2017 06:34 PM, Stefano Stabellini wrote:
> On Thu, 27 Jul 2017, Boris Ostrovsky wrote:
>>> +int pvcalls_front_release(struct socket *sock)
>>> +{
>>> + struct pvcalls_bedata *bedata;
>>> + struct sock_mapping *map;
>>> + int req_id, notify;
>>> + struct xen_pvcalls_request *req;
>>> +
>>> + if (!pvcalls_front_dev)
>>> + return -EIO;
>>> + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
>>> + if (!bedata)
>>> + return -EIO;
>> Some (all?) other ops don't check bedata validity. Should they all do?
> No, I don't think they should: dev_set_drvdata is called in the probe
> function (pvcalls_front_probe). I'll remove it.
>
>
>>> +
>>> + if (sock->sk == NULL)
>>> + return 0;
>>> +
>>> + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
>>> + if (map == NULL)
>>> + return 0;
>>> +
>>> + spin_lock(&bedata->pvcallss_lock);
>>> + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
>>> + if (RING_FULL(&bedata->ring) ||
>>> + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
>>> + spin_unlock(&bedata->pvcallss_lock);
>>> + return -EAGAIN;
>>> + }
>>> + WRITE_ONCE(sock->sk->sk_send_head, NULL);
>>> +
>>> + req = RING_GET_REQUEST(&bedata->ring, req_id);
>>> + req->req_id = req_id;
>>> + req->cmd = PVCALLS_RELEASE;
>>> + req->u.release.id = (uint64_t)sock;
>>> +
>>> + bedata->ring.req_prod_pvt++;
>>> + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
>>> + spin_unlock(&bedata->pvcallss_lock);
>>> + if (notify)
>>> + notify_remote_via_irq(bedata->irq);
>>> +
>>> + wait_event(bedata->inflight_req,
>>> + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
>>> +
>>> + if (map->active_socket) {
>>> + /*
>>> + * Set in_error and wake up inflight_conn_req to force
>>> + * recvmsg waiters to exit.
>>> + */
>>> + map->active.ring->in_error = -EBADF;
>>> + wake_up_interruptible(&map->active.inflight_conn_req);
>>> +
>>> + mutex_lock(&map->active.in_mutex);
>>> + mutex_lock(&map->active.out_mutex);
>>> + pvcalls_front_free_map(bedata, map);
>>> + mutex_unlock(&map->active.out_mutex);
>>> + mutex_unlock(&map->active.in_mutex);
>>> + kfree(map);
>> Since you are locking here I assume you expect that someone else might
>> also be trying to lock the map. But you are freeing it immediately after
>> unlocking. Wouldn't that mean that whoever is trying to grab the lock
>> might then dereference freed memory?
> The lock is to make sure there are no recvmsg or sendmsg in progress. We
> are sure that no newer sendmsg or recvmsg are waiting for
> pvcalls_front_release to release the lock because before send a message
> to the backend we set sk_send_head to NULL.
Is there a chance that whoever is potentially calling send/rcvmsg has
checked that sk_send_head is non-NULL but hasn't grabbed the lock yet?
Freeing a structure containing a lock right after releasing the lock
looks weird (to me). Is there any other way to synchronize with
sender/receiver? Any other lock?
BTW, I also noticed that in rcvmsg you are calling
wait_event_interruptible() while holding the lock. Have you tested with
CONFIG_DEBUG_ATOMIC_SLEEP? (or maybe it's some other config option that
would complain about those sorts of thing)
-boris
On 01/08/17 17:23, Boris Ostrovsky wrote:
> On 07/31/2017 06:34 PM, Stefano Stabellini wrote:
>> On Thu, 27 Jul 2017, Boris Ostrovsky wrote:
>>>> +int pvcalls_front_release(struct socket *sock)
>>>> +{
>>>> + struct pvcalls_bedata *bedata;
>>>> + struct sock_mapping *map;
>>>> + int req_id, notify;
>>>> + struct xen_pvcalls_request *req;
>>>> +
>>>> + if (!pvcalls_front_dev)
>>>> + return -EIO;
>>>> + bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
>>>> + if (!bedata)
>>>> + return -EIO;
>>> Some (all?) other ops don't check bedata validity. Should they all do?
>> No, I don't think they should: dev_set_drvdata is called in the probe
>> function (pvcalls_front_probe). I'll remove it.
>>
>>
>>>> +
>>>> + if (sock->sk == NULL)
>>>> + return 0;
>>>> +
>>>> + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
>>>> + if (map == NULL)
>>>> + return 0;
>>>> +
>>>> + spin_lock(&bedata->pvcallss_lock);
>>>> + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
>>>> + if (RING_FULL(&bedata->ring) ||
>>>> + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
>>>> + spin_unlock(&bedata->pvcallss_lock);
>>>> + return -EAGAIN;
>>>> + }
>>>> + WRITE_ONCE(sock->sk->sk_send_head, NULL);
>>>> +
>>>> + req = RING_GET_REQUEST(&bedata->ring, req_id);
>>>> + req->req_id = req_id;
>>>> + req->cmd = PVCALLS_RELEASE;
>>>> + req->u.release.id = (uint64_t)sock;
>>>> +
>>>> + bedata->ring.req_prod_pvt++;
>>>> + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
>>>> + spin_unlock(&bedata->pvcallss_lock);
>>>> + if (notify)
>>>> + notify_remote_via_irq(bedata->irq);
>>>> +
>>>> + wait_event(bedata->inflight_req,
>>>> + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
>>>> +
>>>> + if (map->active_socket) {
>>>> + /*
>>>> + * Set in_error and wake up inflight_conn_req to force
>>>> + * recvmsg waiters to exit.
>>>> + */
>>>> + map->active.ring->in_error = -EBADF;
>>>> + wake_up_interruptible(&map->active.inflight_conn_req);
>>>> +
>>>> + mutex_lock(&map->active.in_mutex);
>>>> + mutex_lock(&map->active.out_mutex);
>>>> + pvcalls_front_free_map(bedata, map);
>>>> + mutex_unlock(&map->active.out_mutex);
>>>> + mutex_unlock(&map->active.in_mutex);
>>>> + kfree(map);
>>> Since you are locking here I assume you expect that someone else might
>>> also be trying to lock the map. But you are freeing it immediately after
>>> unlocking. Wouldn't that mean that whoever is trying to grab the lock
>>> might then dereference freed memory?
>> The lock is to make sure there are no recvmsg or sendmsg in progress. We
>> are sure that no newer sendmsg or recvmsg are waiting for
>> pvcalls_front_release to release the lock because before send a message
>> to the backend we set sk_send_head to NULL.
>
> Is there a chance that whoever is potentially calling send/rcvmsg has
> checked that sk_send_head is non-NULL but hasn't grabbed the lock yet?
>
> Freeing a structure containing a lock right after releasing the lock
> looks weird (to me). Is there any other way to synchronize with
> sender/receiver? Any other lock?
Right. This looks fishy. Either you don't need the locks or you can't
just free the area right after releasing the lock.
> BTW, I also noticed that in rcvmsg you are calling
> wait_event_interruptible() while holding the lock. Have you tested with
> CONFIG_DEBUG_ATOMIC_SLEEP? (or maybe it's some other config option that
> would complain about those sorts of thing)
I believe sleeping while holding a mutex is allowed. Sleeping in
spinlocked paths is bad.
BTW: You are looking for CONFIG_DEBUG_MUTEXES (see
Documentation/locking/mutex-design.txt ).
Juergen
>> BTW, I also noticed that in rcvmsg you are calling
>> wait_event_interruptible() while holding the lock. Have you tested with
>> CONFIG_DEBUG_ATOMIC_SLEEP? (or maybe it's some other config option that
>> would complain about those sorts of thing)
> I believe sleeping while holding a mutex is allowed. Sleeping in
> spinlocked paths is bad.
Oh, right. I was thinking about spinlocks. Sorry.
-boris
>
> BTW: You are looking for CONFIG_DEBUG_MUTEXES (see
> Documentation/locking/mutex-design.txt ).
>
>
> Juergen
On Tue, 1 Aug 2017, Juergen Gross wrote:
> >>>> + if (sock->sk == NULL)
> >>>> + return 0;
> >>>> +
> >>>> + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> >>>> + if (map == NULL)
> >>>> + return 0;
> >>>> +
> >>>> + spin_lock(&bedata->pvcallss_lock);
> >>>> + req_id = bedata->ring.req_prod_pvt & (RING_SIZE(&bedata->ring) - 1);
> >>>> + if (RING_FULL(&bedata->ring) ||
> >>>> + READ_ONCE(bedata->rsp[req_id].req_id) != PVCALLS_INVALID_ID) {
> >>>> + spin_unlock(&bedata->pvcallss_lock);
> >>>> + return -EAGAIN;
> >>>> + }
> >>>> + WRITE_ONCE(sock->sk->sk_send_head, NULL);
> >>>> +
> >>>> + req = RING_GET_REQUEST(&bedata->ring, req_id);
> >>>> + req->req_id = req_id;
> >>>> + req->cmd = PVCALLS_RELEASE;
> >>>> + req->u.release.id = (uint64_t)sock;
> >>>> +
> >>>> + bedata->ring.req_prod_pvt++;
> >>>> + RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> >>>> + spin_unlock(&bedata->pvcallss_lock);
> >>>> + if (notify)
> >>>> + notify_remote_via_irq(bedata->irq);
> >>>> +
> >>>> + wait_event(bedata->inflight_req,
> >>>> + READ_ONCE(bedata->rsp[req_id].req_id) == req_id);
> >>>> +
> >>>> + if (map->active_socket) {
> >>>> + /*
> >>>> + * Set in_error and wake up inflight_conn_req to force
> >>>> + * recvmsg waiters to exit.
> >>>> + */
> >>>> + map->active.ring->in_error = -EBADF;
> >>>> + wake_up_interruptible(&map->active.inflight_conn_req);
> >>>> +
> >>>> + mutex_lock(&map->active.in_mutex);
> >>>> + mutex_lock(&map->active.out_mutex);
> >>>> + pvcalls_front_free_map(bedata, map);
> >>>> + mutex_unlock(&map->active.out_mutex);
> >>>> + mutex_unlock(&map->active.in_mutex);
> >>>> + kfree(map);
> >>> Since you are locking here I assume you expect that someone else might
> >>> also be trying to lock the map. But you are freeing it immediately after
> >>> unlocking. Wouldn't that mean that whoever is trying to grab the lock
> >>> might then dereference freed memory?
> >> The lock is to make sure there are no recvmsg or sendmsg in progress. We
> >> are sure that no newer sendmsg or recvmsg are waiting for
> >> pvcalls_front_release to release the lock because before send a message
> >> to the backend we set sk_send_head to NULL.
> >
> > Is there a chance that whoever is potentially calling send/rcvmsg has
> > checked that sk_send_head is non-NULL but hasn't grabbed the lock yet?
> >
> > Freeing a structure containing a lock right after releasing the lock
> > looks weird (to me). Is there any other way to synchronize with
> > sender/receiver? Any other lock?
>
> Right. This looks fishy. Either you don't need the locks or you can't
> just free the area right after releasing the lock.
I changed this code, you'll see soon in the new patch series I am going
to send. There were two very similar mutex_unlock/kfree problems:
1) pvcalls_front_release
2) pvcalls_front_remove
For 2), I introduced a refcount. I only free the data structs when the
refcount reaches 0.
For 1), I could introduce a similar refcount that would serve the same
purpose, but instead I used mutex_trylock, effectively using the
internal count in in_mutex and out_mutex for the same purpose.