2021-05-08 16:34:00

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 00/19] virtio/vsock: introduce SOCK_SEQPACKET support

This patchset implements support of SOCK_SEQPACKET for virtio
transport.
As SOCK_SEQPACKET guarantees to save record boundaries, so to
do it, new bit for field 'flags' was added: SEQ_EOR. This bit is
set to 1 in last RW packet of message.
Now as packets of one socket are not reordered neither on vsock
nor on vhost transport layers, such bit allows to restore original
message on receiver's side. If user's buffer is smaller than message
length, when all out of size data is dropped.
Maximum length of datagram is not limited as in stream socket,
because same credit logic is used. Difference with stream socket is
that user is not woken up until whole record is received or error
occurred. Implementation also supports 'MSG_TRUNC' flags.
Tests also implemented.

Thanks to [email protected] for encouragements and initial design
recommendations.

Arseny Krasnov (19):
af_vsock: update functions for connectible socket
af_vsock: separate wait data loop
af_vsock: separate receive data loop
af_vsock: implement SEQPACKET receive loop
af_vsock: implement send logic for SEQPACKET
af_vsock: rest of SEQPACKET support
af_vsock: update comments for stream sockets
virtio/vsock: set packet's type in virtio_transport_send_pkt_info()
virtio/vsock: simplify credit update function API
virtio/vsock: defines and constants for SEQPACKET
virtio/vsock: dequeue callback for SOCK_SEQPACKET
virtio/vsock: add SEQPACKET receive logic
virtio/vsock: rest of SOCK_SEQPACKET support
virtio/vsock: enable SEQPACKET for transport
vhost/vsock: enable SEQPACKET for transport
vsock/loopback: enable SEQPACKET for transport
vsock_test: add SOCK_SEQPACKET tests
virtio/vsock: update trace event for SEQPACKET
af_vsock: serialize writes to shared socket

drivers/vhost/vsock.c | 42 +-
include/linux/virtio_vsock.h | 9 +
include/net/af_vsock.h | 8 +
.../events/vsock_virtio_transport_common.h | 5 +-
include/uapi/linux/virtio_vsock.h | 9 +
net/vmw_vsock/af_vsock.c | 417 +++++++++++------
net/vmw_vsock/virtio_transport.c | 25 +
net/vmw_vsock/virtio_transport_common.c | 129 ++++-
net/vmw_vsock/vsock_loopback.c | 11 +
tools/testing/vsock/util.c | 32 +-
tools/testing/vsock/util.h | 3 +
tools/testing/vsock/vsock_test.c | 63 +++
12 files changed, 594 insertions(+), 159 deletions(-)

v8 -> v9:
General changelog:
- see per patch change log.

Per patch changelog:
see every patch after '---' line.

v7 -> v8:
General changelog:
- whole idea is simplified: channel now considered reliable,
so SEQ_BEGIN, SEQ_END, 'msg_len' and 'msg_id' were removed.
Only thing that is used to mark end of message is bit in
'flags' field of packet header: VIRTIO_VSOCK_SEQ_EOR. Packet
with such bit set to 1 means, that this is last packet of
message.

- POSIX MSG_EOR support is removed, as there is no exact
description how it works.

- all changes to 'include/uapi/linux/virtio_vsock.h' moved
to dedicated patch, as these changes linked with patch to
spec.

- patch 'virtio/vsock: SEQPACKET feature bit support' now merged
to 'virtio/vsock: setup SEQPACKET ops for transport'.

- patch 'vhost/vsock: SEQPACKET feature bit support' now merged
to 'vhost/vsock: setup SEQPACKET ops for transport'.

Per patch changelog:
see every patch after '---' line.

v6 -> v7:
General changelog:
- virtio transport callback for message length now removed
from transport. Length of record is returned by dequeue
callback.

- function which tries to get message length now returns 0
when rx queue is empty. Also length of current message in
progress is set to 0, when message processed or error
happens.

- patches for virtio feature bit moved after patches with
transport ops.

Per patch changelog:
see every patch after '---' line.

v5 -> v6:
General changelog:
- virtio transport specific callbacks which send SEQ_BEGIN or
SEQ_END now hidden inside virtio transport. Only enqueue,
dequeue and record length callbacks are provided by transport.

- virtio feature bit for SEQPACKET socket support introduced:
VIRTIO_VSOCK_F_SEQPACKET.

- 'msg_cnt' field in 'struct virtio_vsock_seq_hdr' renamed to
'msg_id' and used as id.

Per patch changelog:
- 'af_vsock: separate wait data loop':
1) Commit message updated.
2) 'prepare_to_wait()' moved inside while loop(thanks to
Jorgen Hansen).
Marked 'Reviewed-by' with 1), but as 2) I removed R-b.

- 'af_vsock: separate receive data loop': commit message
updated.
Marked 'Reviewed-by' with that fix.

- 'af_vsock: implement SEQPACKET receive loop': style fixes.

- 'af_vsock: rest of SEQPACKET support':
1) 'module_put()' added when transport callback check failed.
2) Now only 'seqpacket_allow()' callback called to check
support of SEQPACKET by transport.

- 'af_vsock: update comments for stream sockets': commit message
updated.
Marked 'Reviewed-by' with that fix.

- 'virtio/vsock: set packet's type in send':
1) Commit message updated.
2) Parameter 'type' from 'virtio_transport_send_credit_update()'
also removed in this patch instead of in next.

- 'virtio/vsock: dequeue callback for SOCK_SEQPACKET': SEQPACKET
related state wrapped to special struct.

- 'virtio/vsock: update trace event for SEQPACKET': format strings
now not broken by new lines.

v4 -> v5:
- patches reorganized:
1) Setting of packet's type in 'virtio_transport_send_pkt_info()'
is moved to separate patch.
2) Simplifying of 'virtio_transport_send_credit_update()' is
moved to separate patch and before main virtio/vsock patches.
- style problem fixed
- in 'af_vsock: separate receive data loop' extra 'release_sock()'
removed
- added trace event fields for SEQPACKET
- in 'af_vsock: separate wait data loop':
1) 'vsock_wait_data()' removed 'goto out;'
2) Comment for invalid data amount is changed.
- in 'af_vsock: rest of SEQPACKET support', 'new_transport' pointer
check is moved after 'try_module_get()'
- in 'af_vsock: update comments for stream sockets', 'connect-oriented'
replaced with 'connection-oriented'
- in 'loopback/vsock: setup SEQPACKET ops for transport',
'loopback/vsock' replaced with 'vsock/loopback'

v3 -> v4:
- SEQPACKET specific metadata moved from packet header to payload
and called 'virtio_vsock_seq_hdr'
- record integrity check:
1) SEQ_END operation was added, which marks end of record.
2) Both SEQ_BEGIN and SEQ_END carries counter which is incremented
on every marker send.
- af_vsock.c: socket operations for STREAM and SEQPACKET call same
functions instead of having own "gates" differs only by names:
'vsock_seqpacket/stream_getsockopt()' now replaced with
'vsock_connectible_getsockopt()'.
- af_vsock.c: 'seqpacket_dequeue' callback returns error and flag that
record ready. There is no need to return number of copied bytes,
because case when record received successfully is checked at virtio
transport layer, when SEQ_END is processed. Also user doesn't need
number of copied bytes, because 'recv()' from SEQPACKET could return
error, length of users's buffer or length of whole record(both are
known in af_vsock.c).
- af_vsock.c: both wait loops in af_vsock.c(for data and space) moved
to separate functions because now both called from several places.
- af_vsock.c: 'vsock_assign_transport()' checks that 'new_transport'
pointer is not NULL and returns 'ESOCKTNOSUPPORT' instead of 'ENODEV'
if failed to use transport.
- tools/testing/vsock/vsock_test.c: rename tests

v2 -> v3:
- patches reorganized: split for prepare and implementation patches
- local variables are declared in "Reverse Christmas tree" manner
- virtio_transport_common.c: valid leXX_to_cpu() for vsock header
fields access
- af_vsock.c: 'vsock_connectible_*sockopt()' added as shared code
between stream and seqpacket sockets.
- af_vsock.c: loops in '__vsock_*_recvmsg()' refactored.
- af_vsock.c: 'vsock_wait_data()' refactored.

v1 -> v2:
- patches reordered: af_vsock.c related changes now before virtio vsock
- patches reorganized: more small patches, where +/- are not mixed
- tests for SOCK_SEQPACKET added
- all commit messages updated
- af_vsock.c: 'vsock_pre_recv_check()' inlined to
'vsock_connectible_recvmsg()'
- af_vsock.c: 'vsock_assign_transport()' returns ENODEV if transport
was not found
- virtio_transport_common.c: transport callback for seqpacket dequeue
- virtio_transport_common.c: simplified
'virtio_transport_recv_connected()'
- virtio_transport_common.c: send reset on socket and packet type
mismatch.

Signed-off-by: Arseny Krasnov <[email protected]>

--
2.25.1


2021-05-08 16:34:19

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 01/19] af_vsock: update functions for connectible socket

This prepares af_vsock.c for SEQPACKET support: some functions such
as setsockopt(), getsockopt(), connect(), recvmsg(), sendmsg() are
shared between both types of sockets, so rename them in general
manner.

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
net/vmw_vsock/af_vsock.c | 64 +++++++++++++++++++++-------------------
1 file changed, 34 insertions(+), 30 deletions(-)

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 92a72f0e0d94..7dd8e70d78cd 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -604,8 +604,8 @@ static void vsock_pending_work(struct work_struct *work)

/**** SOCKET OPERATIONS ****/

-static int __vsock_bind_stream(struct vsock_sock *vsk,
- struct sockaddr_vm *addr)
+static int __vsock_bind_connectible(struct vsock_sock *vsk,
+ struct sockaddr_vm *addr)
{
static u32 port;
struct sockaddr_vm new_addr;
@@ -685,7 +685,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
switch (sk->sk_socket->type) {
case SOCK_STREAM:
spin_lock_bh(&vsock_table_lock);
- retval = __vsock_bind_stream(vsk, addr);
+ retval = __vsock_bind_connectible(vsk, addr);
spin_unlock_bh(&vsock_table_lock);
break;

@@ -768,6 +768,11 @@ static struct sock *__vsock_create(struct net *net,
return sk;
}

+static bool sock_type_connectible(u16 type)
+{
+ return type == SOCK_STREAM;
+}
+
static void __vsock_release(struct sock *sk, int level)
{
if (sk) {
@@ -786,7 +791,7 @@ static void __vsock_release(struct sock *sk, int level)

if (vsk->transport)
vsk->transport->release(vsk);
- else if (sk->sk_type == SOCK_STREAM)
+ else if (sock_type_connectible(sk->sk_type))
vsock_remove_sock(vsk);

sock_orphan(sk);
@@ -948,7 +953,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
lock_sock(sk);
if (sock->state == SS_UNCONNECTED) {
err = -ENOTCONN;
- if (sk->sk_type == SOCK_STREAM)
+ if (sock_type_connectible(sk->sk_type))
goto out;
} else {
sock->state = SS_DISCONNECTING;
@@ -961,7 +966,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
sk->sk_shutdown |= mode;
sk->sk_state_change(sk);

- if (sk->sk_type == SOCK_STREAM) {
+ if (sock_type_connectible(sk->sk_type)) {
sock_reset_flag(sk, SOCK_DONE);
vsock_send_shutdown(sk, mode);
}
@@ -1016,7 +1021,7 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
if (!(sk->sk_shutdown & SEND_SHUTDOWN))
mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;

- } else if (sock->type == SOCK_STREAM) {
+ } else if (sock_type_connectible(sk->sk_type)) {
const struct vsock_transport *transport;

lock_sock(sk);
@@ -1263,8 +1268,8 @@ static void vsock_connect_timeout(struct work_struct *work)
sock_put(sk);
}

-static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
- int addr_len, int flags)
+static int vsock_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
{
int err;
struct sock *sk;
@@ -1414,7 +1419,7 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,

lock_sock(listener);

- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sock->type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1491,7 +1496,7 @@ static int vsock_listen(struct socket *sock, int backlog)

lock_sock(sk);

- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sk->sk_type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1535,11 +1540,11 @@ static void vsock_update_buffer_size(struct vsock_sock *vsk,
vsk->buffer_size = val;
}

-static int vsock_stream_setsockopt(struct socket *sock,
- int level,
- int optname,
- sockptr_t optval,
- unsigned int optlen)
+static int vsock_connectible_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
{
int err;
struct sock *sk;
@@ -1617,10 +1622,10 @@ static int vsock_stream_setsockopt(struct socket *sock,
return err;
}

-static int vsock_stream_getsockopt(struct socket *sock,
- int level, int optname,
- char __user *optval,
- int __user *optlen)
+static int vsock_connectible_getsockopt(struct socket *sock,
+ int level, int optname,
+ char __user *optval,
+ int __user *optlen)
{
int err;
int len;
@@ -1688,8 +1693,8 @@ static int vsock_stream_getsockopt(struct socket *sock,
return 0;
}

-static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
- size_t len)
+static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
{
struct sock *sk;
struct vsock_sock *vsk;
@@ -1828,10 +1833,9 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
return err;
}

-
static int
-vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
- int flags)
+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+ int flags)
{
struct sock *sk;
struct vsock_sock *vsk;
@@ -2007,7 +2011,7 @@ static const struct proto_ops vsock_stream_ops = {
.owner = THIS_MODULE,
.release = vsock_release,
.bind = vsock_bind,
- .connect = vsock_stream_connect,
+ .connect = vsock_connect,
.socketpair = sock_no_socketpair,
.accept = vsock_accept,
.getname = vsock_getname,
@@ -2015,10 +2019,10 @@ static const struct proto_ops vsock_stream_ops = {
.ioctl = sock_no_ioctl,
.listen = vsock_listen,
.shutdown = vsock_shutdown,
- .setsockopt = vsock_stream_setsockopt,
- .getsockopt = vsock_stream_getsockopt,
- .sendmsg = vsock_stream_sendmsg,
- .recvmsg = vsock_stream_recvmsg,
+ .setsockopt = vsock_connectible_setsockopt,
+ .getsockopt = vsock_connectible_getsockopt,
+ .sendmsg = vsock_connectible_sendmsg,
+ .recvmsg = vsock_connectible_recvmsg,
.mmap = sock_no_mmap,
.sendpage = sock_no_sendpage,
};
--
2.25.1

2021-05-08 16:34:51

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 02/19] af_vsock: separate wait data loop

This moves wait loop for data to dedicated function, because later it
will be used by SEQPACKET data receive loop. While moving the code
around, let's update an old comment.

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
net/vmw_vsock/af_vsock.c | 156 +++++++++++++++++++++------------------
1 file changed, 84 insertions(+), 72 deletions(-)

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 7dd8e70d78cd..4269e80b02cd 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1833,6 +1833,69 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
return err;
}

+static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
+ long timeout,
+ struct vsock_transport_recv_notify_data *recv_data,
+ size_t target)
+{
+ const struct vsock_transport *transport;
+ struct vsock_sock *vsk;
+ s64 data;
+ int err;
+
+ vsk = vsock_sk(sk);
+ err = 0;
+ transport = vsk->transport;
+
+ while ((data = vsock_stream_has_data(vsk)) == 0) {
+ prepare_to_wait(sk_sleep(sk), wait, TASK_INTERRUPTIBLE);
+
+ if (sk->sk_err != 0 ||
+ (sk->sk_shutdown & RCV_SHUTDOWN) ||
+ (vsk->peer_shutdown & SEND_SHUTDOWN)) {
+ break;
+ }
+
+ /* Don't wait for non-blocking sockets. */
+ if (timeout == 0) {
+ err = -EAGAIN;
+ break;
+ }
+
+ if (recv_data) {
+ err = transport->notify_recv_pre_block(vsk, target, recv_data);
+ if (err < 0)
+ break;
+ }
+
+ release_sock(sk);
+ timeout = schedule_timeout(timeout);
+ lock_sock(sk);
+
+ if (signal_pending(current)) {
+ err = sock_intr_errno(timeout);
+ break;
+ } else if (timeout == 0) {
+ err = -EAGAIN;
+ break;
+ }
+ }
+
+ finish_wait(sk_sleep(sk), wait);
+
+ if (err)
+ return err;
+
+ /* Internal transport error when checking for available
+ * data. XXX This should be changed to a connection
+ * reset in a later change.
+ */
+ if (data < 0)
+ return -ENOMEM;
+
+ return data;
+}
+
static int
vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
int flags)
@@ -1912,85 +1975,34 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,


while (1) {
- s64 ready;
+ ssize_t read;

- prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
- ready = vsock_stream_has_data(vsk);
+ err = vsock_wait_data(sk, &wait, timeout, &recv_data, target);
+ if (err <= 0)
+ break;

- if (ready == 0) {
- if (sk->sk_err != 0 ||
- (sk->sk_shutdown & RCV_SHUTDOWN) ||
- (vsk->peer_shutdown & SEND_SHUTDOWN)) {
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
- /* Don't wait for non-blocking sockets. */
- if (timeout == 0) {
- err = -EAGAIN;
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
-
- err = transport->notify_recv_pre_block(
- vsk, target, &recv_data);
- if (err < 0) {
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
- release_sock(sk);
- timeout = schedule_timeout(timeout);
- lock_sock(sk);
-
- if (signal_pending(current)) {
- err = sock_intr_errno(timeout);
- finish_wait(sk_sleep(sk), &wait);
- break;
- } else if (timeout == 0) {
- err = -EAGAIN;
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
- } else {
- ssize_t read;
-
- finish_wait(sk_sleep(sk), &wait);
-
- if (ready < 0) {
- /* Invalid queue pair content. XXX This should
- * be changed to a connection reset in a later
- * change.
- */
-
- err = -ENOMEM;
- goto out;
- }
-
- err = transport->notify_recv_pre_dequeue(
- vsk, target, &recv_data);
- if (err < 0)
- break;
+ err = transport->notify_recv_pre_dequeue(vsk, target,
+ &recv_data);
+ if (err < 0)
+ break;

- read = transport->stream_dequeue(
- vsk, msg,
- len - copied, flags);
- if (read < 0) {
- err = -ENOMEM;
- break;
- }
+ read = transport->stream_dequeue(vsk, msg, len - copied, flags);
+ if (read < 0) {
+ err = -ENOMEM;
+ break;
+ }

- copied += read;
+ copied += read;

- err = transport->notify_recv_post_dequeue(
- vsk, target, read,
- !(flags & MSG_PEEK), &recv_data);
- if (err < 0)
- goto out;
+ err = transport->notify_recv_post_dequeue(vsk, target, read,
+ !(flags & MSG_PEEK), &recv_data);
+ if (err < 0)
+ goto out;

- if (read >= target || flags & MSG_PEEK)
- break;
+ if (read >= target || flags & MSG_PEEK)
+ break;

- target -= read;
- }
+ target -= read;
}

if (sk->sk_err)
--
2.25.1

2021-05-08 16:35:07

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 03/19] af_vsock: separate receive data loop

Move STREAM specific data receive logic to '__vsock_stream_recvmsg()'
dedicated function, while checks, that will be same for both STREAM
and SEQPACKET sockets, stays in 'vsock_connectible_recvmsg()' shared
functions.

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
net/vmw_vsock/af_vsock.c | 116 ++++++++++++++++++++++-----------------
1 file changed, 67 insertions(+), 49 deletions(-)

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 4269e80b02cd..c4f6bfa1e381 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1896,65 +1896,22 @@ static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
return data;
}

-static int
-vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
- int flags)
+static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
{
- struct sock *sk;
- struct vsock_sock *vsk;
+ struct vsock_transport_recv_notify_data recv_data;
const struct vsock_transport *transport;
- int err;
- size_t target;
+ struct vsock_sock *vsk;
ssize_t copied;
+ size_t target;
long timeout;
- struct vsock_transport_recv_notify_data recv_data;
+ int err;

DEFINE_WAIT(wait);

- sk = sock->sk;
vsk = vsock_sk(sk);
- err = 0;
-
- lock_sock(sk);
-
transport = vsk->transport;

- if (!transport || sk->sk_state != TCP_ESTABLISHED) {
- /* Recvmsg is supposed to return 0 if a peer performs an
- * orderly shutdown. Differentiate between that case and when a
- * peer has not connected or a local shutdown occurred with the
- * SOCK_DONE flag.
- */
- if (sock_flag(sk, SOCK_DONE))
- err = 0;
- else
- err = -ENOTCONN;
-
- goto out;
- }
-
- if (flags & MSG_OOB) {
- err = -EOPNOTSUPP;
- goto out;
- }
-
- /* We don't check peer_shutdown flag here since peer may actually shut
- * down, but there can be data in the queue that a local socket can
- * receive.
- */
- if (sk->sk_shutdown & RCV_SHUTDOWN) {
- err = 0;
- goto out;
- }
-
- /* It is valid on Linux to pass in a zero-length receive buffer. This
- * is not an error. We may as well bail out now.
- */
- if (!len) {
- err = 0;
- goto out;
- }
-
/* We must not copy less than target bytes into the user's buffer
* before returning successfully, so we wait for the consume queue to
* have that much data to consume before dequeueing. Note that this
@@ -2013,6 +1970,67 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
if (copied > 0)
err = copied;

+out:
+ return err;
+}
+
+static int
+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+ int flags)
+{
+ struct sock *sk;
+ struct vsock_sock *vsk;
+ const struct vsock_transport *transport;
+ int err;
+
+ DEFINE_WAIT(wait);
+
+ sk = sock->sk;
+ vsk = vsock_sk(sk);
+ err = 0;
+
+ lock_sock(sk);
+
+ transport = vsk->transport;
+
+ if (!transport || sk->sk_state != TCP_ESTABLISHED) {
+ /* Recvmsg is supposed to return 0 if a peer performs an
+ * orderly shutdown. Differentiate between that case and when a
+ * peer has not connected or a local shutdown occurred with the
+ * SOCK_DONE flag.
+ */
+ if (sock_flag(sk, SOCK_DONE))
+ err = 0;
+ else
+ err = -ENOTCONN;
+
+ goto out;
+ }
+
+ if (flags & MSG_OOB) {
+ err = -EOPNOTSUPP;
+ goto out;
+ }
+
+ /* We don't check peer_shutdown flag here since peer may actually shut
+ * down, but there can be data in the queue that a local socket can
+ * receive.
+ */
+ if (sk->sk_shutdown & RCV_SHUTDOWN) {
+ err = 0;
+ goto out;
+ }
+
+ /* It is valid on Linux to pass in a zero-length receive buffer. This
+ * is not an error. We may as well bail out now.
+ */
+ if (!len) {
+ err = 0;
+ goto out;
+ }
+
+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
+
out:
release_sock(sk);
return err;
--
2.25.1

2021-05-08 16:35:42

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 04/19] af_vsock: implement SEQPACKET receive loop

This adds receive loop for SEQPACKET. It looks like receive loop for
STREAM, but there is a little bit difference:
1) It doesn't call notify callbacks.
2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
there is no sense for these values in SEQPACKET case.
3) It waits until whole record is received or error is found during
receiving.
4) It processes and sets 'MSG_TRUNC' flag.

So to avoid extra conditions for two types of socket inside one loop, two
independent functions were created.

Signed-off-by: Arseny Krasnov <[email protected]>
---
v8 -> v9:
1) 'tmp_record_len' renamed to 'fragment_len'.
2) MSG_TRUNC handled in af_vsock.c instead of transport.
3) 'flags' still passed to transport for MSG_PEEK support.

include/net/af_vsock.h | 4 +++
net/vmw_vsock/af_vsock.c | 72 +++++++++++++++++++++++++++++++++++++++-
2 files changed, 75 insertions(+), 1 deletion(-)

diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index b1c717286993..5175f5a52ce1 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -135,6 +135,10 @@ struct vsock_transport {
bool (*stream_is_active)(struct vsock_sock *);
bool (*stream_allow)(u32 cid, u32 port);

+ /* SEQ_PACKET. */
+ ssize_t (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
+ int flags, bool *msg_ready);
+
/* Notification. */
int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index c4f6bfa1e381..78b9af545ca8 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1974,6 +1974,73 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
return err;
}

+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
+{
+ const struct vsock_transport *transport;
+ bool msg_ready;
+ struct vsock_sock *vsk;
+ ssize_t record_len;
+ long timeout;
+ int err = 0;
+ DEFINE_WAIT(wait);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ msg_ready = false;
+ record_len = 0;
+
+ while (1) {
+ ssize_t fragment_len;
+
+ if (vsock_wait_data(sk, &wait, timeout, NULL, 0) <= 0) {
+ /* In case of any loop break(timeout, signal
+ * interrupt or shutdown), we report user that
+ * nothing was copied.
+ */
+ err = 0;
+ break;
+ }
+
+ fragment_len = transport->seqpacket_dequeue(vsk, msg, flags, &msg_ready);
+
+ if (fragment_len < 0) {
+ err = -ENOMEM;
+ break;
+ }
+
+ record_len += fragment_len;
+
+ if (msg_ready)
+ break;
+ }
+
+ if (sk->sk_err)
+ err = -sk->sk_err;
+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
+ err = 0;
+
+ if (msg_ready && err == 0) {
+ /* User sets MSG_TRUNC, so return real length of
+ * packet.
+ */
+ if (flags & MSG_TRUNC)
+ err = record_len;
+ else
+ err = len - msg->msg_iter.count;
+
+ /* Always set MSG_TRUNC if real length of packet is
+ * bigger than user's buffer.
+ */
+ if (record_len > len)
+ msg->msg_flags |= MSG_TRUNC;
+ }
+
+ return err;
+}
+
static int
vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
int flags)
@@ -2029,7 +2096,10 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
goto out;
}

- err = __vsock_stream_recvmsg(sk, msg, len, flags);
+ if (sk->sk_type == SOCK_STREAM)
+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
+ else
+ err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);

out:
release_sock(sk);
--
2.25.1

2021-05-08 16:36:02

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 05/19] af_vsock: implement send logic for SEQPACKET

This adds some logic to current stream enqueue function for SEQPACKET
support:
1) Use transport's seqpacket enqueue callback.
2) Return value from enqueue function is whole record length or error
for SOCK_SEQPACKET.

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
include/net/af_vsock.h | 2 ++
net/vmw_vsock/af_vsock.c | 20 +++++++++++++++-----
2 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index 5175f5a52ce1..5860027d5173 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -138,6 +138,8 @@ struct vsock_transport {
/* SEQ_PACKET. */
ssize_t (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
int flags, bool *msg_ready);
+ int (*seqpacket_enqueue)(struct vsock_sock *vsk, struct msghdr *msg,
+ size_t len);

/* Notification. */
int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 78b9af545ca8..3f9cfcce1e42 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1808,9 +1808,13 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
* responsibility to check how many bytes we were able to send.
*/

- written = transport->stream_enqueue(
- vsk, msg,
- len - total_written);
+ if (sk->sk_type == SOCK_SEQPACKET) {
+ written = transport->seqpacket_enqueue(vsk,
+ msg, len - total_written);
+ } else {
+ written = transport->stream_enqueue(vsk,
+ msg, len - total_written);
+ }
if (written < 0) {
err = -ENOMEM;
goto out_err;
@@ -1826,8 +1830,14 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
}

out_err:
- if (total_written > 0)
- err = total_written;
+ if (total_written > 0) {
+ /* Return number of written bytes only if:
+ * 1) SOCK_STREAM socket.
+ * 2) SOCK_SEQPACKET socket when whole buffer is sent.
+ */
+ if (sk->sk_type == SOCK_STREAM || total_written == len)
+ err = total_written;
+ }
out:
release_sock(sk);
return err;
--
2.25.1

2021-05-08 16:36:16

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 06/19] af_vsock: rest of SEQPACKET support

This does rest of SOCK_SEQPACKET support:
1) Adds socket ops for SEQPACKET type.
2) Allows to create socket with SEQPACKET type.

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
include/net/af_vsock.h | 1 +
net/vmw_vsock/af_vsock.c | 36 +++++++++++++++++++++++++++++++++++-
2 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index 5860027d5173..1747c0b564ef 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -140,6 +140,7 @@ struct vsock_transport {
int flags, bool *msg_ready);
int (*seqpacket_enqueue)(struct vsock_sock *vsk, struct msghdr *msg,
size_t len);
+ bool (*seqpacket_allow)(u32 remote_cid);

/* Notification. */
int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 3f9cfcce1e42..5819e8fd9eaf 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -452,6 +452,7 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
new_transport = transport_dgram;
break;
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
if (vsock_use_local_transport(remote_cid))
new_transport = transport_local;
else if (remote_cid <= VMADDR_CID_HOST || !transport_h2g ||
@@ -484,6 +485,14 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
if (!new_transport || !try_module_get(new_transport->module))
return -ENODEV;

+ if (sk->sk_type == SOCK_SEQPACKET) {
+ if (!new_transport->seqpacket_allow ||
+ !new_transport->seqpacket_allow(remote_cid)) {
+ module_put(new_transport->module);
+ return -ESOCKTNOSUPPORT;
+ }
+ }
+
ret = new_transport->init(vsk, psk);
if (ret) {
module_put(new_transport->module);
@@ -684,6 +693,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)

switch (sk->sk_socket->type) {
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
spin_lock_bh(&vsock_table_lock);
retval = __vsock_bind_connectible(vsk, addr);
spin_unlock_bh(&vsock_table_lock);
@@ -770,7 +780,7 @@ static struct sock *__vsock_create(struct net *net,

static bool sock_type_connectible(u16 type)
{
- return type == SOCK_STREAM;
+ return (type == SOCK_STREAM) || (type == SOCK_SEQPACKET);
}

static void __vsock_release(struct sock *sk, int level)
@@ -2137,6 +2147,27 @@ static const struct proto_ops vsock_stream_ops = {
.sendpage = sock_no_sendpage,
};

+static const struct proto_ops vsock_seqpacket_ops = {
+ .family = PF_VSOCK,
+ .owner = THIS_MODULE,
+ .release = vsock_release,
+ .bind = vsock_bind,
+ .connect = vsock_connect,
+ .socketpair = sock_no_socketpair,
+ .accept = vsock_accept,
+ .getname = vsock_getname,
+ .poll = vsock_poll,
+ .ioctl = sock_no_ioctl,
+ .listen = vsock_listen,
+ .shutdown = vsock_shutdown,
+ .setsockopt = vsock_connectible_setsockopt,
+ .getsockopt = vsock_connectible_getsockopt,
+ .sendmsg = vsock_connectible_sendmsg,
+ .recvmsg = vsock_connectible_recvmsg,
+ .mmap = sock_no_mmap,
+ .sendpage = sock_no_sendpage,
+};
+
static int vsock_create(struct net *net, struct socket *sock,
int protocol, int kern)
{
@@ -2157,6 +2188,9 @@ static int vsock_create(struct net *net, struct socket *sock,
case SOCK_STREAM:
sock->ops = &vsock_stream_ops;
break;
+ case SOCK_SEQPACKET:
+ sock->ops = &vsock_seqpacket_ops;
+ break;
default:
return -ESOCKTNOSUPPORT;
}
--
2.25.1

2021-05-08 16:36:22

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 07/19] af_vsock: update comments for stream sockets

This replaces 'stream' to 'connection oriented' in comments as
SEQPACKET is also connection oriented.

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
net/vmw_vsock/af_vsock.c | 31 +++++++++++++++++--------------
1 file changed, 17 insertions(+), 14 deletions(-)

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 5819e8fd9eaf..7790728465f4 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -415,8 +415,8 @@ static void vsock_deassign_transport(struct vsock_sock *vsk)

/* Assign a transport to a socket and call the .init transport callback.
*
- * Note: for stream socket this must be called when vsk->remote_addr is set
- * (e.g. during the connect() or when a connection request on a listener
+ * Note: for connection oriented socket this must be called when vsk->remote_addr
+ * is set (e.g. during the connect() or when a connection request on a listener
* socket is received).
* The vsk->remote_addr is used to decide which transport to use:
* - remote CID == VMADDR_CID_LOCAL or g2h->local_cid or VMADDR_CID_HOST if
@@ -470,10 +470,10 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
return 0;

/* transport->release() must be called with sock lock acquired.
- * This path can only be taken during vsock_stream_connect(),
- * where we have already held the sock lock.
- * In the other cases, this function is called on a new socket
- * which is not assigned to any transport.
+ * This path can only be taken during vsock_connect(), where we
+ * have already held the sock lock. In the other cases, this
+ * function is called on a new socket which is not assigned to
+ * any transport.
*/
vsk->transport->release(vsk);
vsock_deassign_transport(vsk);
@@ -658,9 +658,10 @@ static int __vsock_bind_connectible(struct vsock_sock *vsk,

vsock_addr_init(&vsk->local_addr, new_addr.svm_cid, new_addr.svm_port);

- /* Remove stream sockets from the unbound list and add them to the hash
- * table for easy lookup by its address. The unbound list is simply an
- * extra entry at the end of the hash table, a trick used by AF_UNIX.
+ /* Remove connection oriented sockets from the unbound list and add them
+ * to the hash table for easy lookup by its address. The unbound list
+ * is simply an extra entry at the end of the hash table, a trick used
+ * by AF_UNIX.
*/
__vsock_remove_bound(vsk);
__vsock_insert_bound(vsock_bound_sockets(&vsk->local_addr), vsk);
@@ -952,10 +953,10 @@ static int vsock_shutdown(struct socket *sock, int mode)
if ((mode & ~SHUTDOWN_MASK) || !mode)
return -EINVAL;

- /* If this is a STREAM socket and it is not connected then bail out
- * immediately. If it is a DGRAM socket then we must first kick the
- * socket so that it wakes up from any sleeping calls, for example
- * recv(), and then afterwards return the error.
+ /* If this is a connection oriented socket and it is not connected then
+ * bail out immediately. If it is a DGRAM socket then we must first
+ * kick the socket so that it wakes up from any sleeping calls, for
+ * example recv(), and then afterwards return the error.
*/

sk = sock->sk;
@@ -1727,7 +1728,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,

transport = vsk->transport;

- /* Callers should not provide a destination with stream sockets. */
+ /* Callers should not provide a destination with connection oriented
+ * sockets.
+ */
if (msg->msg_namelen) {
err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
goto out;
--
2.25.1

2021-05-08 16:37:18

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 10/19] virtio/vsock: defines and constants for SEQPACKET

This adds set of defines and constants for SOCK_SEQPACKET
support in vsock. Here is link to spec patch, which uses it:

https://lists.oasis-open.org/archives/virtio-comment/202103/msg00069.html

Signed-off-by: Arseny Krasnov <[email protected]>
---
include/uapi/linux/virtio_vsock.h | 9 +++++++++
1 file changed, 9 insertions(+)

diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
index 1d57ed3d84d2..3dd3555b2740 100644
--- a/include/uapi/linux/virtio_vsock.h
+++ b/include/uapi/linux/virtio_vsock.h
@@ -38,6 +38,9 @@
#include <linux/virtio_ids.h>
#include <linux/virtio_config.h>

+/* The feature bitmap for virtio vsock */
+#define VIRTIO_VSOCK_F_SEQPACKET 1 /* SOCK_SEQPACKET supported */
+
struct virtio_vsock_config {
__le64 guest_cid;
} __attribute__((packed));
@@ -65,6 +68,7 @@ struct virtio_vsock_hdr {

enum virtio_vsock_type {
VIRTIO_VSOCK_TYPE_STREAM = 1,
+ VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
};

enum virtio_vsock_op {
@@ -91,4 +95,9 @@ enum virtio_vsock_shutdown {
VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
};

+/* VIRTIO_VSOCK_OP_RW flags values */
+enum virtio_vsock_rw {
+ VIRTIO_VSOCK_SEQ_EOR = 1,
+};
+
#endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
--
2.25.1

2021-05-08 16:37:22

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 11/19] virtio/vsock: dequeue callback for SOCK_SEQPACKET

This adds transport callback and it's logic for SEQPACKET dequeue.
Callback fetches RW packets from rx queue of socket until whole record
is copied(if user's buffer is full, user is not woken up). This is done
to not stall sender, because if we wake up user and it leaves syscall,
nobody will send credit update for rest of record, and sender will wait
for next enter of read syscall at receiver's side. So if user buffer is
full, we just send credit update and drop data.

Signed-off-by: Arseny Krasnov <[email protected]>
---
v8 -> v9:
1) Check for RW packet type is removed from loop(all packet now
considered RW).
2) Locking in loop is fixed.
3) cpu_to_le32()/le32_to_cpu() now used.
4) MSG_TRUNC handling removed from transport.

include/linux/virtio_vsock.h | 5 ++
net/vmw_vsock/virtio_transport_common.c | 64 +++++++++++++++++++++++++
2 files changed, 69 insertions(+)

diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index dc636b727179..02acf6e9ae04 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);

+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ int flags,
+ bool *msg_ready);
s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);

diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index ad0d34d41444..f649a21dd23b 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -393,6 +393,58 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
return err;
}

+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ int flags,
+ bool *msg_ready)
+{
+ struct virtio_vsock_sock *vvs = vsk->trans;
+ struct virtio_vsock_pkt *pkt;
+ int err = 0;
+ size_t user_buf_len = msg->msg_iter.count;
+
+ *msg_ready = false;
+ spin_lock_bh(&vvs->rx_lock);
+
+ while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {
+ size_t bytes_to_copy;
+ size_t pkt_len;
+
+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
+ bytes_to_copy = min(user_buf_len, pkt_len);
+
+ if (bytes_to_copy) {
+ /* sk_lock is held by caller so no one else can dequeue.
+ * Unlock rx_lock since memcpy_to_msg() may sleep.
+ */
+ spin_unlock_bh(&vvs->rx_lock);
+
+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
+ err = -EINVAL;
+ } else {
+ err += pkt_len;
+ user_buf_len -= bytes_to_copy;
+ }
+
+ spin_lock_bh(&vvs->rx_lock);
+ }
+
+ if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)
+ *msg_ready = true;
+
+ virtio_transport_dec_rx_pkt(vvs, pkt);
+ list_del(&pkt->list);
+ virtio_transport_free_pkt(pkt);
+ }
+
+ spin_unlock_bh(&vvs->rx_lock);
+
+ virtio_transport_send_credit_update(vsk);
+
+ return err;
+}
+
ssize_t
virtio_transport_stream_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
@@ -405,6 +457,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
}
EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);

+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ int flags, bool *msg_ready)
+{
+ if (flags & MSG_PEEK)
+ return -EOPNOTSUPP;
+
+ return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags, msg_ready);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
+
int
virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
--
2.25.1

2021-05-08 16:37:24

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 12/19] virtio/vsock: add SEQPACKET receive logic

This modifies current receive logic for SEQPACKET support:
1) Inserts 'RW' packet to socket's rx queue, but without merging with
buffer of last packet in queue.
2) Performs check for packet and socket types on receive(if mismatch,
then reset connection).

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
net/vmw_vsock/virtio_transport_common.c | 28 +++++++++++++++++++++++--
1 file changed, 26 insertions(+), 2 deletions(-)

diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index f649a21dd23b..7fea0a2192f7 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -165,6 +165,14 @@ void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
}
EXPORT_SYMBOL_GPL(virtio_transport_deliver_tap_pkt);

+static u16 virtio_transport_get_type(struct sock *sk)
+{
+ if (sk->sk_type == SOCK_STREAM)
+ return VIRTIO_VSOCK_TYPE_STREAM;
+ else
+ return VIRTIO_VSOCK_TYPE_SEQPACKET;
+}
+
/* This function can only be used on connecting/connected sockets,
* since a socket assigned to a transport is required.
*
@@ -980,11 +988,15 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
/* If there is space in the last packet queued, we copy the
* new packet in its buffer.
*/
- if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
+ if ((pkt->len <= last_pkt->buf_len - last_pkt->len) &&
+ !(le32_to_cpu(last_pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)) {
memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
pkt->len);
last_pkt->len += pkt->len;
free_pkt = true;
+
+ if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)
+ last_pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
goto out;
}
}
@@ -1150,6 +1162,12 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
return 0;
}

+static bool virtio_transport_valid_type(u16 type)
+{
+ return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
+ (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
+}
+
/* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
* lock.
*/
@@ -1175,7 +1193,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
le32_to_cpu(pkt->hdr.buf_alloc),
le32_to_cpu(pkt->hdr.fwd_cnt));

- if (le16_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_STREAM) {
+ if (!virtio_transport_valid_type(le16_to_cpu(pkt->hdr.type))) {
(void)virtio_transport_reset_no_sock(t, pkt);
goto free_pkt;
}
@@ -1192,6 +1210,12 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
}
}

+ if (virtio_transport_get_type(sk) != le16_to_cpu(pkt->hdr.type)) {
+ (void)virtio_transport_reset_no_sock(t, pkt);
+ sock_put(sk);
+ goto free_pkt;
+ }
+
vsk = vsock_sk(sk);

lock_sock(sk);
--
2.25.1

2021-05-08 16:37:42

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 13/19] virtio/vsock: rest of SOCK_SEQPACKET support

This adds rest of logic for SEQPACKET:
1) Send SHUTDOWN on socket close for SEQPACKET type.
2) Set SEQPACKET packet type during send.
3) 'seqpacket_allow' flag to virtio transport.
4) Set 'VIRTIO_VSOCK_SEQ_EOR' bit in flags for last
packet of message.

Signed-off-by: Arseny Krasnov <[email protected]>
---
v8 -> v9:
1) Use cpu_to_le32() to set VIRTIO_VSOCK_SEQ_EOR.

include/linux/virtio_vsock.h | 4 ++++
net/vmw_vsock/virtio_transport_common.c | 17 +++++++++++++++--
2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index 02acf6e9ae04..7360ab7ea0af 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -80,6 +80,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);

+int
+virtio_transport_seqpacket_enqueue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ size_t len);
ssize_t
virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 7fea0a2192f7..b6608b4ac7c2 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -74,6 +74,10 @@ virtio_transport_alloc_pkt(struct virtio_vsock_pkt_info *info,
err = memcpy_from_msg(pkt->buf, info->msg, len);
if (err)
goto out;
+
+ if (info->msg->msg_iter.count == 0)
+ pkt->hdr.flags = cpu_to_le32(info->flags |
+ VIRTIO_VSOCK_SEQ_EOR);
}

trace_virtio_transport_alloc_pkt(src_cid, src_port,
@@ -187,7 +191,7 @@ static int virtio_transport_send_pkt_info(struct vsock_sock *vsk,
struct virtio_vsock_pkt *pkt;
u32 pkt_len = info->pkt_len;

- info->type = VIRTIO_VSOCK_TYPE_STREAM;
+ info->type = virtio_transport_get_type(sk_vsock(vsk));

t_ops = virtio_transport_get_ops(vsk);
if (unlikely(!t_ops))
@@ -477,6 +481,15 @@ virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
}
EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);

+int
+virtio_transport_seqpacket_enqueue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ size_t len)
+{
+ return virtio_transport_stream_enqueue(vsk, msg, len);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_enqueue);
+
int
virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
@@ -911,7 +924,7 @@ void virtio_transport_release(struct vsock_sock *vsk)
struct sock *sk = &vsk->sk;
bool remove_sock = true;

- if (sk->sk_type == SOCK_STREAM)
+ if (sk->sk_type == SOCK_STREAM || sk->sk_type == SOCK_SEQPACKET)
remove_sock = virtio_transport_close(vsk);

if (remove_sock) {
--
2.25.1

2021-05-08 16:38:12

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 08/19] virtio/vsock: set packet's type in virtio_transport_send_pkt_info()

This moves passing type of packet from 'info' structure to 'virtio_
transport_send_pkt_info()' function. There is no need to set type of
packet which differs from type of socket. Since at current time only
stream type is supported, set it directly in 'virtio_transport_send_
pkt_info()', so callers don't need to set it.

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
net/vmw_vsock/virtio_transport_common.c | 19 +++++--------------
1 file changed, 5 insertions(+), 14 deletions(-)

diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 902cb6dd710b..6503a8370130 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -179,6 +179,8 @@ static int virtio_transport_send_pkt_info(struct vsock_sock *vsk,
struct virtio_vsock_pkt *pkt;
u32 pkt_len = info->pkt_len;

+ info->type = VIRTIO_VSOCK_TYPE_STREAM;
+
t_ops = virtio_transport_get_ops(vsk);
if (unlikely(!t_ops))
return -EFAULT;
@@ -270,12 +272,10 @@ void virtio_transport_put_credit(struct virtio_vsock_sock *vvs, u32 credit)
EXPORT_SYMBOL_GPL(virtio_transport_put_credit);

static int virtio_transport_send_credit_update(struct vsock_sock *vsk,
- int type,
struct virtio_vsock_hdr *hdr)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_CREDIT_UPDATE,
- .type = type,
.vsk = vsk,
};

@@ -383,11 +383,8 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
* messages, we set the limit to a high value. TODO: experiment
* with different values.
*/
- if (free_space < VIRTIO_VSOCK_MAX_PKT_BUF_SIZE) {
- virtio_transport_send_credit_update(vsk,
- VIRTIO_VSOCK_TYPE_STREAM,
- NULL);
- }
+ if (free_space < VIRTIO_VSOCK_MAX_PKT_BUF_SIZE)
+ virtio_transport_send_credit_update(vsk, NULL);

return total;

@@ -496,8 +493,7 @@ void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)

vvs->buf_alloc = *val;

- virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_STREAM,
- NULL);
+ virtio_transport_send_credit_update(vsk, NULL);
}
EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);

@@ -624,7 +620,6 @@ int virtio_transport_connect(struct vsock_sock *vsk)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_REQUEST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.vsk = vsk,
};

@@ -636,7 +631,6 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_SHUTDOWN,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.flags = (mode & RCV_SHUTDOWN ?
VIRTIO_VSOCK_SHUTDOWN_RCV : 0) |
(mode & SEND_SHUTDOWN ?
@@ -665,7 +659,6 @@ virtio_transport_stream_enqueue(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RW,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.msg = msg,
.pkt_len = len,
.vsk = vsk,
@@ -688,7 +681,6 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.reply = !!pkt,
.vsk = vsk,
};
@@ -1000,7 +992,6 @@ virtio_transport_send_response(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RESPONSE,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.remote_cid = le64_to_cpu(pkt->hdr.src_cid),
.remote_port = le32_to_cpu(pkt->hdr.src_port),
.reply = true,
--
2.25.1

2021-05-08 16:38:16

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 15/19] vhost/vsock: enable SEQPACKET for transport

This removes:
1) Ignore of non-stream type of packets.
This adds:
1) Handling of SEQPACKET bit: if guest sets features with this bit cleared,
then SOCK_SEQPACKET support will be disabled.
2) 'seqpacket_allow()' callback.
3) Handling of SEQ_EOR bit: when vhost places data in buffers of guest's
rx queue, keep this bit set only when last piece of data is copied.

Signed-off-by: Arseny Krasnov <[email protected]>
---
v8 -> v9:
1) Move 'seqpacket_allow' to 'struct vhost_vsock'.
2) Use cpu_to_le32()/le32_to_cpu() to work with 'flags' of packet.

drivers/vhost/vsock.c | 42 +++++++++++++++++++++++++++++++++++++++---
1 file changed, 39 insertions(+), 3 deletions(-)

diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index 5e78fb719602..3395b25d4a35 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -31,7 +31,8 @@

enum {
VHOST_VSOCK_FEATURES = VHOST_FEATURES |
- (1ULL << VIRTIO_F_ACCESS_PLATFORM)
+ (1ULL << VIRTIO_F_ACCESS_PLATFORM) |
+ (1ULL << VIRTIO_VSOCK_F_SEQPACKET)
};

enum {
@@ -56,6 +57,7 @@ struct vhost_vsock {
atomic_t queued_replies;

u32 guest_cid;
+ bool seqpacket_allow;
};

static u32 vhost_transport_get_local_cid(void)
@@ -112,6 +114,7 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
size_t nbytes;
size_t iov_len, payload_len;
int head;
+ bool restore_flag = false;

spin_lock_bh(&vsock->send_pkt_list_lock);
if (list_empty(&vsock->send_pkt_list)) {
@@ -174,6 +177,12 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
/* Set the correct length in the header */
pkt->hdr.len = cpu_to_le32(payload_len);

+ if (pkt->off + payload_len < pkt->len &&
+ le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) {
+ pkt->hdr.flags &= ~cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
+ restore_flag = true;
+ }
+
nbytes = copy_to_iter(&pkt->hdr, sizeof(pkt->hdr), &iov_iter);
if (nbytes != sizeof(pkt->hdr)) {
virtio_transport_free_pkt(pkt);
@@ -181,6 +190,9 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
break;
}

+ if (restore_flag)
+ pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
+
nbytes = copy_to_iter(pkt->buf + pkt->off, payload_len,
&iov_iter);
if (nbytes != payload_len) {
@@ -354,8 +366,7 @@ vhost_vsock_alloc_pkt(struct vhost_virtqueue *vq,
return NULL;
}

- if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM)
- pkt->len = le32_to_cpu(pkt->hdr.len);
+ pkt->len = le32_to_cpu(pkt->hdr.len);

/* No payload */
if (!pkt->len)
@@ -398,6 +409,8 @@ static bool vhost_vsock_more_replies(struct vhost_vsock *vsock)
return val < vq->num;
}

+static bool vhost_transport_seqpacket_allow(u32 remote_cid);
+
static struct virtio_transport vhost_transport = {
.transport = {
.module = THIS_MODULE,
@@ -424,6 +437,10 @@ static struct virtio_transport vhost_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,

+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+ .seqpacket_enqueue = virtio_transport_seqpacket_enqueue,
+ .seqpacket_allow = vhost_transport_seqpacket_allow,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
@@ -441,6 +458,22 @@ static struct virtio_transport vhost_transport = {
.send_pkt = vhost_transport_send_pkt,
};

+static bool vhost_transport_seqpacket_allow(u32 remote_cid)
+{
+ struct vhost_vsock *vsock;
+ bool seqpacket_allow = false;
+
+ rcu_read_lock();
+ vsock = vhost_vsock_get(remote_cid);
+
+ if (vsock)
+ seqpacket_allow = vsock->seqpacket_allow;
+
+ rcu_read_unlock();
+
+ return seqpacket_allow;
+}
+
static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
{
struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
@@ -785,6 +818,9 @@ static int vhost_vsock_set_features(struct vhost_vsock *vsock, u64 features)
goto err;
}

+ if (features & (1ULL << VIRTIO_VSOCK_F_SEQPACKET))
+ vsock->seqpacket_allow = true;
+
for (i = 0; i < ARRAY_SIZE(vsock->vqs); i++) {
vq = &vsock->vqs[i];
mutex_lock(&vq->mutex);
--
2.25.1

2021-05-08 16:38:39

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 17/19] vsock_test: add SOCK_SEQPACKET tests

This adds two tests of SOCK_SEQPACKET socket: both transfer data and
then test MSG_EOR and MSG_TRUNC flags. Cases for connect(), bind(),
etc. are not tested, because it is same as for stream socket.

Signed-off-by: Arseny Krasnov <[email protected]>
---
tools/testing/vsock/util.c | 32 +++++++++++++---
tools/testing/vsock/util.h | 3 ++
tools/testing/vsock/vsock_test.c | 63 ++++++++++++++++++++++++++++++++
3 files changed, 93 insertions(+), 5 deletions(-)

diff --git a/tools/testing/vsock/util.c b/tools/testing/vsock/util.c
index 93cbd6f603f9..2acbb7703c6a 100644
--- a/tools/testing/vsock/util.c
+++ b/tools/testing/vsock/util.c
@@ -84,7 +84,7 @@ void vsock_wait_remote_close(int fd)
}

/* Connect to <cid, port> and return the file descriptor. */
-int vsock_stream_connect(unsigned int cid, unsigned int port)
+static int vsock_connect(unsigned int cid, unsigned int port, int type)
{
union {
struct sockaddr sa;
@@ -101,7 +101,7 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)

control_expectln("LISTENING");

- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
+ fd = socket(AF_VSOCK, type, 0);

timeout_begin(TIMEOUT);
do {
@@ -120,11 +120,21 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)
return fd;
}

+int vsock_stream_connect(unsigned int cid, unsigned int port)
+{
+ return vsock_connect(cid, port, SOCK_STREAM);
+}
+
+int vsock_seqpacket_connect(unsigned int cid, unsigned int port)
+{
+ return vsock_connect(cid, port, SOCK_SEQPACKET);
+}
+
/* Listen on <cid, port> and return the first incoming connection. The remote
* address is stored to clientaddrp. clientaddrp may be NULL.
*/
-int vsock_stream_accept(unsigned int cid, unsigned int port,
- struct sockaddr_vm *clientaddrp)
+static int vsock_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp, int type)
{
union {
struct sockaddr sa;
@@ -145,7 +155,7 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
int client_fd;
int old_errno;

- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
+ fd = socket(AF_VSOCK, type, 0);

if (bind(fd, &addr.sa, sizeof(addr.svm)) < 0) {
perror("bind");
@@ -189,6 +199,18 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
return client_fd;
}

+int vsock_stream_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp)
+{
+ return vsock_accept(cid, port, clientaddrp, SOCK_STREAM);
+}
+
+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp)
+{
+ return vsock_accept(cid, port, clientaddrp, SOCK_SEQPACKET);
+}
+
/* Transmit one byte and check the return value.
*
* expected_ret:
diff --git a/tools/testing/vsock/util.h b/tools/testing/vsock/util.h
index e53dd09d26d9..a3375ad2fb7f 100644
--- a/tools/testing/vsock/util.h
+++ b/tools/testing/vsock/util.h
@@ -36,8 +36,11 @@ struct test_case {
void init_signals(void);
unsigned int parse_cid(const char *str);
int vsock_stream_connect(unsigned int cid, unsigned int port);
+int vsock_seqpacket_connect(unsigned int cid, unsigned int port);
int vsock_stream_accept(unsigned int cid, unsigned int port,
struct sockaddr_vm *clientaddrp);
+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp);
void vsock_wait_remote_close(int fd);
void send_byte(int fd, int expected_ret, int flags);
void recv_byte(int fd, int expected_ret, int flags);
diff --git a/tools/testing/vsock/vsock_test.c b/tools/testing/vsock/vsock_test.c
index 5a4fb80fa832..ffec985fd36f 100644
--- a/tools/testing/vsock/vsock_test.c
+++ b/tools/testing/vsock/vsock_test.c
@@ -14,6 +14,8 @@
#include <errno.h>
#include <unistd.h>
#include <linux/kernel.h>
+#include <sys/types.h>
+#include <sys/socket.h>

#include "timeout.h"
#include "control.h"
@@ -279,6 +281,62 @@ static void test_stream_msg_peek_server(const struct test_opts *opts)
close(fd);
}

+#define MESSAGE_TRUNC_SZ 32
+static void test_seqpacket_msg_trunc_client(const struct test_opts *opts)
+{
+ int fd;
+ char buf[MESSAGE_TRUNC_SZ];
+
+ fd = vsock_seqpacket_connect(opts->peer_cid, 1234);
+ if (fd < 0) {
+ perror("connect");
+ exit(EXIT_FAILURE);
+ }
+
+ if (send(fd, buf, sizeof(buf), 0) != sizeof(buf)) {
+ perror("send failed");
+ exit(EXIT_FAILURE);
+ }
+
+ control_writeln("SENDDONE");
+ close(fd);
+}
+
+static void test_seqpacket_msg_trunc_server(const struct test_opts *opts)
+{
+ int fd;
+ char buf[MESSAGE_TRUNC_SZ / 2];
+ struct msghdr msg = {0};
+ struct iovec iov = {0};
+
+ fd = vsock_seqpacket_accept(VMADDR_CID_ANY, 1234, NULL);
+ if (fd < 0) {
+ perror("accept");
+ exit(EXIT_FAILURE);
+ }
+
+ control_expectln("SENDDONE");
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ ssize_t ret = recvmsg(fd, &msg, MSG_TRUNC);
+
+ if (ret != MESSAGE_TRUNC_SZ) {
+ printf("%zi\n", ret);
+ perror("MSG_TRUNC doesn't work");
+ exit(EXIT_FAILURE);
+ }
+
+ if (!(msg.msg_flags & MSG_TRUNC)) {
+ fprintf(stderr, "MSG_TRUNC expected\n");
+ exit(EXIT_FAILURE);
+ }
+
+ close(fd);
+}
+
static struct test_case test_cases[] = {
{
.name = "SOCK_STREAM connection reset",
@@ -309,6 +367,11 @@ static struct test_case test_cases[] = {
.run_client = test_stream_msg_peek_client,
.run_server = test_stream_msg_peek_server,
},
+ {
+ .name = "SOCK_SEQPACKET send data MSG_TRUNC",
+ .run_client = test_seqpacket_msg_trunc_client,
+ .run_server = test_seqpacket_msg_trunc_server,
+ },
{},
};

--
2.25.1

2021-05-08 16:38:54

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 09/19] virtio/vsock: simplify credit update function API

This function is static and 'hdr' arg was always NULL.

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
net/vmw_vsock/virtio_transport_common.c | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 6503a8370130..ad0d34d41444 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -271,8 +271,7 @@ void virtio_transport_put_credit(struct virtio_vsock_sock *vvs, u32 credit)
}
EXPORT_SYMBOL_GPL(virtio_transport_put_credit);

-static int virtio_transport_send_credit_update(struct vsock_sock *vsk,
- struct virtio_vsock_hdr *hdr)
+static int virtio_transport_send_credit_update(struct vsock_sock *vsk)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_CREDIT_UPDATE,
@@ -384,7 +383,7 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
* with different values.
*/
if (free_space < VIRTIO_VSOCK_MAX_PKT_BUF_SIZE)
- virtio_transport_send_credit_update(vsk, NULL);
+ virtio_transport_send_credit_update(vsk);

return total;

@@ -493,7 +492,7 @@ void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)

vvs->buf_alloc = *val;

- virtio_transport_send_credit_update(vsk, NULL);
+ virtio_transport_send_credit_update(vsk);
}
EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);

--
2.25.1

2021-05-08 16:39:15

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 14/19] virtio/vsock: enable SEQPACKET for transport

This adds
1) SEQPACKET ops for virtio transport and 'seqpacket_allow()' callback.
2) Handling of SEQPACKET bit: guest tries to negotiate it with vhost.

Signed-off-by: Arseny Krasnov <[email protected]>
---
v8 -> v9:
1) Move 'seqpacket_allow' to 'struct virtio_vsock'.

net/vmw_vsock/virtio_transport.c | 25 +++++++++++++++++++++++++
1 file changed, 25 insertions(+)

diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
index 2700a63ab095..f714c16af65d 100644
--- a/net/vmw_vsock/virtio_transport.c
+++ b/net/vmw_vsock/virtio_transport.c
@@ -62,6 +62,7 @@ struct virtio_vsock {
struct virtio_vsock_event event_list[8];

u32 guest_cid;
+ bool seqpacket_allow;
};

static u32 virtio_transport_get_local_cid(void)
@@ -443,6 +444,8 @@ static void virtio_vsock_rx_done(struct virtqueue *vq)
queue_work(virtio_vsock_workqueue, &vsock->rx_work);
}

+static bool virtio_transport_seqpacket_allow(u32 remote_cid);
+
static struct virtio_transport virtio_transport = {
.transport = {
.module = THIS_MODULE,
@@ -469,6 +472,10 @@ static struct virtio_transport virtio_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,

+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+ .seqpacket_enqueue = virtio_transport_seqpacket_enqueue,
+ .seqpacket_allow = virtio_transport_seqpacket_allow,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
@@ -485,6 +492,19 @@ static struct virtio_transport virtio_transport = {
.send_pkt = virtio_transport_send_pkt,
};

+static bool virtio_transport_seqpacket_allow(u32 remote_cid)
+{
+ struct virtio_vsock *vsock;
+ bool seqpacket_allow;
+
+ rcu_read_lock();
+ vsock = rcu_dereference(the_virtio_vsock);
+ seqpacket_allow = vsock->seqpacket_allow;
+ rcu_read_unlock();
+
+ return seqpacket_allow;
+}
+
static void virtio_transport_rx_work(struct work_struct *work)
{
struct virtio_vsock *vsock =
@@ -612,6 +632,10 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
rcu_assign_pointer(the_virtio_vsock, vsock);

mutex_unlock(&the_virtio_vsock_mutex);
+
+ if (vdev->features & (1ULL << VIRTIO_VSOCK_F_SEQPACKET))
+ vsock->seqpacket_allow = true;
+
return 0;

out:
@@ -695,6 +719,7 @@ static struct virtio_device_id id_table[] = {
};

static unsigned int features[] = {
+ VIRTIO_VSOCK_F_SEQPACKET
};

static struct virtio_driver virtio_vsock_driver = {
--
2.25.1

2021-05-08 16:39:32

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 16/19] vsock/loopback: enable SEQPACKET for transport

This adds SEQPACKET ops for loopback transport and 'seqpacket_allow()'
callback.

Signed-off-by: Arseny Krasnov <[email protected]>
Reviewed-by: Stefano Garzarella <[email protected]>
---
v8 -> v9:
1) 'vsock_loopback_seqpacket_allow()' always returns true.

net/vmw_vsock/vsock_loopback.c | 11 +++++++++++
1 file changed, 11 insertions(+)

diff --git a/net/vmw_vsock/vsock_loopback.c b/net/vmw_vsock/vsock_loopback.c
index a45f7ffca8c5..809f807d0710 100644
--- a/net/vmw_vsock/vsock_loopback.c
+++ b/net/vmw_vsock/vsock_loopback.c
@@ -63,6 +63,8 @@ static int vsock_loopback_cancel_pkt(struct vsock_sock *vsk)
return 0;
}

+static bool vsock_loopback_seqpacket_allow(u32 remote_cid);
+
static struct virtio_transport loopback_transport = {
.transport = {
.module = THIS_MODULE,
@@ -89,6 +91,10 @@ static struct virtio_transport loopback_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,

+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+ .seqpacket_enqueue = virtio_transport_seqpacket_enqueue,
+ .seqpacket_allow = vsock_loopback_seqpacket_allow,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
@@ -105,6 +111,11 @@ static struct virtio_transport loopback_transport = {
.send_pkt = vsock_loopback_send_pkt,
};

+static bool vsock_loopback_seqpacket_allow(u32 remote_cid)
+{
+ return true;
+}
+
static void vsock_loopback_work(struct work_struct *work)
{
struct vsock_loopback *vsock =
--
2.25.1

2021-05-08 16:39:38

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 19/19] af_vsock: serialize writes to shared socket

This add logic, that serializes write access to single socket
by multiple threads. It is implemented be adding field with TID
of current writer. When writer tries to send something, it checks
that field is -1(free), else it sleep in the same way as waiting
for free space at peers' side.

Signed-off-by: Arseny Krasnov <[email protected]>
---
include/net/af_vsock.h | 1 +
net/vmw_vsock/af_vsock.c | 10 +++++++++-
2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index 1747c0b564ef..413343f18e99 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -69,6 +69,7 @@ struct vsock_sock {
u64 buffer_size;
u64 buffer_min_size;
u64 buffer_max_size;
+ pid_t tid_owner;

/* Private to transport. */
void *trans;
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 7790728465f4..1fb4a1860f6d 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -757,6 +757,7 @@ static struct sock *__vsock_create(struct net *net,
vsk->peer_shutdown = 0;
INIT_DELAYED_WORK(&vsk->connect_work, vsock_connect_timeout);
INIT_DELAYED_WORK(&vsk->pending_work, vsock_pending_work);
+ vsk->tid_owner = -1;

psk = parent ? vsock_sk(parent) : NULL;
if (parent) {
@@ -1765,7 +1766,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
ssize_t written;

add_wait_queue(sk_sleep(sk), &wait);
- while (vsock_stream_has_space(vsk) == 0 &&
+ while ((vsock_stream_has_space(vsk) == 0 ||
+ (vsk->tid_owner != current->pid &&
+ vsk->tid_owner != -1)) &&
sk->sk_err == 0 &&
!(sk->sk_shutdown & SEND_SHUTDOWN) &&
!(vsk->peer_shutdown & RCV_SHUTDOWN)) {
@@ -1796,6 +1799,8 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
goto out_err;
}
}
+
+ vsk->tid_owner = current->pid;
remove_wait_queue(sk_sleep(sk), &wait);

/* These checks occur both as part of and after the loop
@@ -1852,7 +1857,10 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
err = total_written;
}
out:
+ vsk->tid_owner = -1;
release_sock(sk);
+ sk->sk_write_space(sk);
+
return err;
}

--
2.25.1

2021-05-08 16:41:21

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v9 18/19] virtio/vsock: update trace event for SEQPACKET

This adds SEQPACKET socket's type for trace event of virtio vsock.

Signed-off-by: Arseny Krasnov <[email protected]>
---
include/trace/events/vsock_virtio_transport_common.h | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/include/trace/events/vsock_virtio_transport_common.h b/include/trace/events/vsock_virtio_transport_common.h
index 6782213778be..b30c0e319b0e 100644
--- a/include/trace/events/vsock_virtio_transport_common.h
+++ b/include/trace/events/vsock_virtio_transport_common.h
@@ -9,9 +9,12 @@
#include <linux/tracepoint.h>

TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_STREAM);
+TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_SEQPACKET);

#define show_type(val) \
- __print_symbolic(val, { VIRTIO_VSOCK_TYPE_STREAM, "STREAM" })
+ __print_symbolic(val, \
+ { VIRTIO_VSOCK_TYPE_STREAM, "STREAM" }, \
+ { VIRTIO_VSOCK_TYPE_SEQPACKET, "SEQPACKET" })

TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_INVALID);
TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_REQUEST);
--
2.25.1

2021-05-13 12:12:46

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 06/19] af_vsock: rest of SEQPACKET support

On Sat, May 08, 2021 at 07:33:46PM +0300, Arseny Krasnov wrote:
>This does rest of SOCK_SEQPACKET support:
>1) Adds socket ops for SEQPACKET type.
>2) Allows to create socket with SEQPACKET type.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>Reviewed-by: Stefano Garzarella <[email protected]>

This patch is changed, so usually you should remove the R-b tags.

>---
> include/net/af_vsock.h | 1 +
> net/vmw_vsock/af_vsock.c | 36 +++++++++++++++++++++++++++++++++++-
> 2 files changed, 36 insertions(+), 1 deletion(-)
>
>diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>index 5860027d5173..1747c0b564ef 100644
>--- a/include/net/af_vsock.h
>+++ b/include/net/af_vsock.h
>@@ -140,6 +140,7 @@ struct vsock_transport {
> int flags, bool *msg_ready);
> int (*seqpacket_enqueue)(struct vsock_sock *vsk, struct msghdr *msg,
> size_t len);
>+ bool (*seqpacket_allow)(u32 remote_cid);

I'm thinking if it's better to follow .dgram_allow() and .stream_allow(),
specifying also the `port` param, but since it's not used, we can add
later if needed.

So, I think this is fine:

Reviewed-by: Stefano Garzarella <[email protected]>


2021-05-13 12:21:40

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 11/19] virtio/vsock: dequeue callback for SOCK_SEQPACKET

On Sat, May 08, 2021 at 07:35:20PM +0300, Arseny Krasnov wrote:
>This adds transport callback and it's logic for SEQPACKET dequeue.
>Callback fetches RW packets from rx queue of socket until whole record
>is copied(if user's buffer is full, user is not woken up). This is done
>to not stall sender, because if we wake up user and it leaves syscall,
>nobody will send credit update for rest of record, and sender will wait
>for next enter of read syscall at receiver's side. So if user buffer is
>full, we just send credit update and drop data.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> v8 -> v9:
> 1) Check for RW packet type is removed from loop(all packet now
> considered RW).
> 2) Locking in loop is fixed.
> 3) cpu_to_le32()/le32_to_cpu() now used.
> 4) MSG_TRUNC handling removed from transport.
>
> include/linux/virtio_vsock.h | 5 ++
> net/vmw_vsock/virtio_transport_common.c | 64 +++++++++++++++++++++++++
> 2 files changed, 69 insertions(+)
>
>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>index dc636b727179..02acf6e9ae04 100644
>--- a/include/linux/virtio_vsock.h
>+++ b/include/linux/virtio_vsock.h
>@@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags);
>
>+ssize_t
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ int flags,
>+ bool *msg_ready);
> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index ad0d34d41444..f649a21dd23b 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -393,6 +393,58 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> return err;
> }
>
>+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ int flags,
>+ bool *msg_ready)
>+{
>+ struct virtio_vsock_sock *vvs = vsk->trans;
>+ struct virtio_vsock_pkt *pkt;
>+ int err = 0;
>+ size_t user_buf_len = msg->msg_iter.count;
>+
>+ *msg_ready = false;
>+ spin_lock_bh(&vvs->rx_lock);
>+
>+ while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {
>+ size_t bytes_to_copy;
>+ size_t pkt_len;
>+
>+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>+ bytes_to_copy = min(user_buf_len, pkt_len);
>+
>+ if (bytes_to_copy) {
>+ /* sk_lock is held by caller so no one else can dequeue.
>+ * Unlock rx_lock since memcpy_to_msg() may sleep.
>+ */
>+ spin_unlock_bh(&vvs->rx_lock);
>+
>+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy))
>{
>+ err = -EINVAL;
>+ } else {
>+ err += pkt_len;

If `bytes_to_copy == 0` we are not increasing the real length.

Anyway is a bit confusing increase a variable called `err`, I think is
better to have another variable to store this information that we return
if there aren't errors.


2021-05-13 12:52:08

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 13/19] virtio/vsock: rest of SOCK_SEQPACKET support

On Sat, May 08, 2021 at 07:35:54PM +0300, Arseny Krasnov wrote:
>This adds rest of logic for SEQPACKET:
>1) Send SHUTDOWN on socket close for SEQPACKET type.
>2) Set SEQPACKET packet type during send.
>3) 'seqpacket_allow' flag to virtio transport.

Please update this commit message, point 3 is not included anymore in
this patch, right?

>4) Set 'VIRTIO_VSOCK_SEQ_EOR' bit in flags for last
> packet of message.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> v8 -> v9:
> 1) Use cpu_to_le32() to set VIRTIO_VSOCK_SEQ_EOR.
>
> include/linux/virtio_vsock.h | 4 ++++
> net/vmw_vsock/virtio_transport_common.c | 17 +++++++++++++++--
> 2 files changed, 19 insertions(+), 2 deletions(-)
>
>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>index 02acf6e9ae04..7360ab7ea0af 100644
>--- a/include/linux/virtio_vsock.h
>+++ b/include/linux/virtio_vsock.h
>@@ -80,6 +80,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags);
>
>+int
>+virtio_transport_seqpacket_enqueue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ size_t len);
> ssize_t
> virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index 7fea0a2192f7..b6608b4ac7c2 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -74,6 +74,10 @@ virtio_transport_alloc_pkt(struct virtio_vsock_pkt_info *info,
> err = memcpy_from_msg(pkt->buf, info->msg, len);
> if (err)
> goto out;
>+
>+ if (info->msg->msg_iter.count == 0)

Also here is better `msg_data_left(info->msg)`

>+ pkt->hdr.flags = cpu_to_le32(info->flags |
>+ VIRTIO_VSOCK_SEQ_EOR);

Re-thinking an alternative could be to set EOR here...

info->flags |= VIRTIO_VSOCK_SEQ_EOR;

> }

... and move pkt->hdr.flags assignment after this block:

pkt->hdr.flags = cpu_to_le32(info->flags);

But I don't have a strong opinion on that.

>
> trace_virtio_transport_alloc_pkt(src_cid, src_port,


2021-05-13 13:04:49

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 14/19] virtio/vsock: enable SEQPACKET for transport

On Sat, May 08, 2021 at 07:36:14PM +0300, Arseny Krasnov wrote:
>This adds
>1) SEQPACKET ops for virtio transport and 'seqpacket_allow()' callback.
>2) Handling of SEQPACKET bit: guest tries to negotiate it with vhost.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> v8 -> v9:
> 1) Move 'seqpacket_allow' to 'struct virtio_vsock'.
>
> net/vmw_vsock/virtio_transport.c | 25 +++++++++++++++++++++++++
> 1 file changed, 25 insertions(+)
>
>diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
>index 2700a63ab095..f714c16af65d 100644
>--- a/net/vmw_vsock/virtio_transport.c
>+++ b/net/vmw_vsock/virtio_transport.c
>@@ -62,6 +62,7 @@ struct virtio_vsock {
> struct virtio_vsock_event event_list[8];
>
> u32 guest_cid;
>+ bool seqpacket_allow;
> };
>
> static u32 virtio_transport_get_local_cid(void)
>@@ -443,6 +444,8 @@ static void virtio_vsock_rx_done(struct virtqueue *vq)
> queue_work(virtio_vsock_workqueue, &vsock->rx_work);
> }
>
>+static bool virtio_transport_seqpacket_allow(u32 remote_cid);
>+
> static struct virtio_transport virtio_transport = {
> .transport = {
> .module = THIS_MODULE,
>@@ -469,6 +472,10 @@ static struct virtio_transport virtio_transport = {
> .stream_is_active = virtio_transport_stream_is_active,
> .stream_allow = virtio_transport_stream_allow,
>
>+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
>+ .seqpacket_enqueue = virtio_transport_seqpacket_enqueue,
>+ .seqpacket_allow = virtio_transport_seqpacket_allow,
>+
> .notify_poll_in = virtio_transport_notify_poll_in,
> .notify_poll_out = virtio_transport_notify_poll_out,
> .notify_recv_init = virtio_transport_notify_recv_init,
>@@ -485,6 +492,19 @@ static struct virtio_transport virtio_transport = {
> .send_pkt = virtio_transport_send_pkt,
> };
>
>+static bool virtio_transport_seqpacket_allow(u32 remote_cid)
>+{
>+ struct virtio_vsock *vsock;
>+ bool seqpacket_allow;
>+
>+ rcu_read_lock();
>+ vsock = rcu_dereference(the_virtio_vsock);
>+ seqpacket_allow = vsock->seqpacket_allow;
>+ rcu_read_unlock();
>+
>+ return seqpacket_allow;
>+}
>+
> static void virtio_transport_rx_work(struct work_struct *work)
> {
> struct virtio_vsock *vsock =
>@@ -612,6 +632,10 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
> rcu_assign_pointer(the_virtio_vsock, vsock);
>
> mutex_unlock(&the_virtio_vsock_mutex);
>+
>+ if (vdev->features & (1ULL << VIRTIO_VSOCK_F_SEQPACKET))

We should use virtio_has_feature() to check the device features.

>+ vsock->seqpacket_allow = true;

When we assign the_virtio_vsock pointer, we should already set all the
fields, so please move this code before the following block:

# here

vdev->priv = vsock;
rcu_assign_pointer(the_virtio_vsock, vsock);


2021-05-13 13:08:32

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 13/19] virtio/vsock: rest of SOCK_SEQPACKET support

sdf

On Thu, May 13, 2021 at 2:27 PM Stefano Garzarella <[email protected]> wrote:
>
> On Sat, May 08, 2021 at 07:35:54PM +0300, Arseny Krasnov wrote:
> >This adds rest of logic for SEQPACKET:
> >1) Send SHUTDOWN on socket close for SEQPACKET type.
> >2) Set SEQPACKET packet type during send.
> >3) 'seqpacket_allow' flag to virtio transport.
>
> Please update this commit message, point 3 is not included anymore in
> this patch, right?
>
> >4) Set 'VIRTIO_VSOCK_SEQ_EOR' bit in flags for last
> > packet of message.
> >
> >Signed-off-by: Arseny Krasnov <[email protected]>
> >---
> > v8 -> v9:
> > 1) Use cpu_to_le32() to set VIRTIO_VSOCK_SEQ_EOR.
> >
> > include/linux/virtio_vsock.h | 4 ++++
> > net/vmw_vsock/virtio_transport_common.c | 17 +++++++++++++++--
> > 2 files changed, 19 insertions(+), 2 deletions(-)
> >
> >diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
> >index 02acf6e9ae04..7360ab7ea0af 100644
> >--- a/include/linux/virtio_vsock.h
> >+++ b/include/linux/virtio_vsock.h
> >@@ -80,6 +80,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> > struct msghdr *msg,
> > size_t len, int flags);
> >
> >+int
> >+virtio_transport_seqpacket_enqueue(struct vsock_sock *vsk,
> >+ struct msghdr *msg,
> >+ size_t len);
> > ssize_t
> > virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
> > struct msghdr *msg,
> >diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
> >index 7fea0a2192f7..b6608b4ac7c2 100644
> >--- a/net/vmw_vsock/virtio_transport_common.c
> >+++ b/net/vmw_vsock/virtio_transport_common.c
> >@@ -74,6 +74,10 @@ virtio_transport_alloc_pkt(struct virtio_vsock_pkt_info *info,
> > err = memcpy_from_msg(pkt->buf, info->msg, len);
> > if (err)
> > goto out;
> >+
> >+ if (info->msg->msg_iter.count == 0)
>
> Also here is better `msg_data_left(info->msg)`
>
> >+ pkt->hdr.flags = cpu_to_le32(info->flags |
> >+ VIRTIO_VSOCK_SEQ_EOR);
>
> Re-thinking an alternative could be to set EOR here...
>
> info->flags |= VIRTIO_VSOCK_SEQ_EOR;

Or just `pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR)`, as you
did in vhost-vsock :-)


2021-05-13 14:06:21

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 19/19] af_vsock: serialize writes to shared socket

On Sat, May 08, 2021 at 07:37:35PM +0300, Arseny Krasnov wrote:
>This add logic, that serializes write access to single socket
>by multiple threads. It is implemented be adding field with TID
>of current writer. When writer tries to send something, it checks
>that field is -1(free), else it sleep in the same way as waiting
>for free space at peers' side.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/net/af_vsock.h | 1 +
> net/vmw_vsock/af_vsock.c | 10 +++++++++-
> 2 files changed, 10 insertions(+), 1 deletion(-)

I think you forgot to move this patch at the beginning of the series.
It's important because in this way we can backport to stable branches
easily.

About the implementation, can't we just add a mutex that we hold until
we have sent all the payload?

I need to check other implementations like TCP.

>
>diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>index 1747c0b564ef..413343f18e99 100644
>--- a/include/net/af_vsock.h
>+++ b/include/net/af_vsock.h
>@@ -69,6 +69,7 @@ struct vsock_sock {
> u64 buffer_size;
> u64 buffer_min_size;
> u64 buffer_max_size;
>+ pid_t tid_owner;
>
> /* Private to transport. */
> void *trans;
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index 7790728465f4..1fb4a1860f6d 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -757,6 +757,7 @@ static struct sock *__vsock_create(struct net *net,
> vsk->peer_shutdown = 0;
> INIT_DELAYED_WORK(&vsk->connect_work, vsock_connect_timeout);
> INIT_DELAYED_WORK(&vsk->pending_work, vsock_pending_work);
>+ vsk->tid_owner = -1;
>
> psk = parent ? vsock_sk(parent) : NULL;
> if (parent) {
>@@ -1765,7 +1766,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> ssize_t written;
>
> add_wait_queue(sk_sleep(sk), &wait);
>- while (vsock_stream_has_space(vsk) == 0 &&
>+ while ((vsock_stream_has_space(vsk) == 0 ||
>+ (vsk->tid_owner != current->pid &&
>+ vsk->tid_owner != -1)) &&
> sk->sk_err == 0 &&
> !(sk->sk_shutdown & SEND_SHUTDOWN) &&
> !(vsk->peer_shutdown & RCV_SHUTDOWN)) {
>@@ -1796,6 +1799,8 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> goto out_err;
> }
> }
>+
>+ vsk->tid_owner = current->pid;
> remove_wait_queue(sk_sleep(sk), &wait);
>
> /* These checks occur both as part of and after the loop
>@@ -1852,7 +1857,10 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> err = total_written;
> }
> out:
>+ vsk->tid_owner = -1;
> release_sock(sk);
>+ sk->sk_write_space(sk);
>+

Is this change related? Can you explain in the commit message why it is
needed?

> return err;
> }
>
>--
>2.25.1
>


2021-05-13 14:12:19

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 00/19] virtio/vsock: introduce SOCK_SEQPACKET support

Hi Arseny,

On Sat, May 08, 2021 at 07:30:23PM +0300, Arseny Krasnov wrote:
> This patchset implements support of SOCK_SEQPACKET for virtio
>transport.
> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>do it, new bit for field 'flags' was added: SEQ_EOR. This bit is
>set to 1 in last RW packet of message.
> Now as packets of one socket are not reordered neither on vsock
>nor on vhost transport layers, such bit allows to restore original
>message on receiver's side. If user's buffer is smaller than message
>length, when all out of size data is dropped.
> Maximum length of datagram is not limited as in stream socket,
>because same credit logic is used. Difference with stream socket is
>that user is not woken up until whole record is received or error
>occurred. Implementation also supports 'MSG_TRUNC' flags.
> Tests also implemented.
>
> Thanks to [email protected] for encouragements and initial design
>recommendations.
>
> Arseny Krasnov (19):
> af_vsock: update functions for connectible socket
> af_vsock: separate wait data loop
> af_vsock: separate receive data loop
> af_vsock: implement SEQPACKET receive loop
> af_vsock: implement send logic for SEQPACKET
> af_vsock: rest of SEQPACKET support
> af_vsock: update comments for stream sockets
> virtio/vsock: set packet's type in virtio_transport_send_pkt_info()
> virtio/vsock: simplify credit update function API
> virtio/vsock: defines and constants for SEQPACKET
> virtio/vsock: dequeue callback for SOCK_SEQPACKET
> virtio/vsock: add SEQPACKET receive logic
> virtio/vsock: rest of SOCK_SEQPACKET support
> virtio/vsock: enable SEQPACKET for transport
> vhost/vsock: enable SEQPACKET for transport
> vsock/loopback: enable SEQPACKET for transport
> vsock_test: add SOCK_SEQPACKET tests
> virtio/vsock: update trace event for SEQPACKET
> af_vsock: serialize writes to shared socket
>
> drivers/vhost/vsock.c | 42 +-
> include/linux/virtio_vsock.h | 9 +
> include/net/af_vsock.h | 8 +
> .../events/vsock_virtio_transport_common.h | 5 +-
> include/uapi/linux/virtio_vsock.h | 9 +
> net/vmw_vsock/af_vsock.c | 417 +++++++++++------
> net/vmw_vsock/virtio_transport.c | 25 +
> net/vmw_vsock/virtio_transport_common.c | 129 ++++-
> net/vmw_vsock/vsock_loopback.c | 11 +
> tools/testing/vsock/util.c | 32 +-
> tools/testing/vsock/util.h | 3 +
> tools/testing/vsock/vsock_test.c | 63 +++
> 12 files changed, 594 insertions(+), 159 deletions(-)
>
> v8 -> v9:
> General changelog:
> - see per patch change log.
>

I reviewed this series and left some comments.

Before remove the RFC tag, please check that all the commit messages
contains the right information.

Also, I recommend you take a look on how the other commits in the Linux
tree are written because the commits in this series look like todo
lists.
For RFC could be fine, but for the final version it would be better to
rewrite them following the advice written here:
Documentation/process/submitting-patches.rst

Thanks,
Stefano


2021-05-13 14:45:31

by Arseny Krasnov

[permalink] [raw]
Subject: Re: [RFC PATCH v9 10/19] virtio/vsock: defines and constants for SEQPACKET


On 13.05.2021 14:45, Stefano Garzarella wrote:
> On Sat, May 08, 2021 at 07:35:05PM +0300, Arseny Krasnov wrote:
>> This adds set of defines and constants for SOCK_SEQPACKET
>> support in vsock. Here is link to spec patch, which uses it:
>>
>> https://lists.oasis-open.org/archives/virtio-comment/202103/msg00069.html
> Will you be submitting a new version?
Yes, i'll send new patch to spec in next few days.
>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> include/uapi/linux/virtio_vsock.h | 9 +++++++++
>> 1 file changed, 9 insertions(+)
>>
>> diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
>> index 1d57ed3d84d2..3dd3555b2740 100644
>> --- a/include/uapi/linux/virtio_vsock.h
>> +++ b/include/uapi/linux/virtio_vsock.h
>> @@ -38,6 +38,9 @@
>> #include <linux/virtio_ids.h>
>> #include <linux/virtio_config.h>
>>
>> +/* The feature bitmap for virtio vsock */
>> +#define VIRTIO_VSOCK_F_SEQPACKET 1 /* SOCK_SEQPACKET supported */
>> +
>> struct virtio_vsock_config {
>> __le64 guest_cid;
>> } __attribute__((packed));
>> @@ -65,6 +68,7 @@ struct virtio_vsock_hdr {
>>
>> enum virtio_vsock_type {
>> VIRTIO_VSOCK_TYPE_STREAM = 1,
>> + VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
>> };
>>
>> enum virtio_vsock_op {
>> @@ -91,4 +95,9 @@ enum virtio_vsock_shutdown {
>> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
>> };
>>
>> +/* VIRTIO_VSOCK_OP_RW flags values */
>> +enum virtio_vsock_rw {
>> + VIRTIO_VSOCK_SEQ_EOR = 1,
>> +};
>> +
>> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
>> --
>> 2.25.1
>>
> Looks good:
>
> Reviewed-by: Stefano Garzarella <[email protected]>
>
>

2021-05-13 14:45:58

by Arseny Krasnov

[permalink] [raw]
Subject: Re: [RFC PATCH v9 11/19] virtio/vsock: dequeue callback for SOCK_SEQPACKET


On 13.05.2021 14:58, Stefano Garzarella wrote:
> On Sat, May 08, 2021 at 07:35:20PM +0300, Arseny Krasnov wrote:
>> This adds transport callback and it's logic for SEQPACKET dequeue.
>> Callback fetches RW packets from rx queue of socket until whole record
>> is copied(if user's buffer is full, user is not woken up). This is done
>> to not stall sender, because if we wake up user and it leaves syscall,
>> nobody will send credit update for rest of record, and sender will wait
>> for next enter of read syscall at receiver's side. So if user buffer is
>> full, we just send credit update and drop data.
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> v8 -> v9:
>> 1) Check for RW packet type is removed from loop(all packet now
>> considered RW).
>> 2) Locking in loop is fixed.
>> 3) cpu_to_le32()/le32_to_cpu() now used.
>> 4) MSG_TRUNC handling removed from transport.
>>
>> include/linux/virtio_vsock.h | 5 ++
>> net/vmw_vsock/virtio_transport_common.c | 64 +++++++++++++++++++++++++
>> 2 files changed, 69 insertions(+)
>>
>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>> index dc636b727179..02acf6e9ae04 100644
>> --- a/include/linux/virtio_vsock.h
>> +++ b/include/linux/virtio_vsock.h
>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>> struct msghdr *msg,
>> size_t len, int flags);
>>
>> +ssize_t
>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>> + struct msghdr *msg,
>> + int flags,
>> + bool *msg_ready);
>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>
>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>> index ad0d34d41444..f649a21dd23b 100644
>> --- a/net/vmw_vsock/virtio_transport_common.c
>> +++ b/net/vmw_vsock/virtio_transport_common.c
>> @@ -393,6 +393,58 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>> return err;
>> }
>>
>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>> + struct msghdr *msg,
>> + int flags,
>> + bool *msg_ready)
>> +{
>> + struct virtio_vsock_sock *vvs = vsk->trans;
>> + struct virtio_vsock_pkt *pkt;
>> + int err = 0;
>> + size_t user_buf_len = msg->msg_iter.count;
>> +
>> + *msg_ready = false;
>> + spin_lock_bh(&vvs->rx_lock);
>> +
>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {
>> + size_t bytes_to_copy;
>> + size_t pkt_len;
>> +
>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>> + bytes_to_copy = min(user_buf_len, pkt_len);
>> +
>> + if (bytes_to_copy) {
>> + /* sk_lock is held by caller so no one else can dequeue.
>> + * Unlock rx_lock since memcpy_to_msg() may sleep.
>> + */
>> + spin_unlock_bh(&vvs->rx_lock);
>> +
>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy))
>> {
>> + err = -EINVAL;
>> + } else {
>> + err += pkt_len;
> If `bytes_to_copy == 0` we are not increasing the real length.
>
> Anyway is a bit confusing increase a variable called `err`, I think is
> better to have another variable to store this information that we return
> if there aren't errors.
Ack
>
>

2021-05-13 14:47:46

by Arseny Krasnov

[permalink] [raw]
Subject: Re: [RFC PATCH v9 14/19] virtio/vsock: enable SEQPACKET for transport


On 13.05.2021 15:49, Stefano Garzarella wrote:
> On Sat, May 08, 2021 at 07:36:14PM +0300, Arseny Krasnov wrote:
>> This adds
>> 1) SEQPACKET ops for virtio transport and 'seqpacket_allow()' callback.
>> 2) Handling of SEQPACKET bit: guest tries to negotiate it with vhost.
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> v8 -> v9:
>> 1) Move 'seqpacket_allow' to 'struct virtio_vsock'.
>>
>> net/vmw_vsock/virtio_transport.c | 25 +++++++++++++++++++++++++
>> 1 file changed, 25 insertions(+)
>>
>> diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
>> index 2700a63ab095..f714c16af65d 100644
>> --- a/net/vmw_vsock/virtio_transport.c
>> +++ b/net/vmw_vsock/virtio_transport.c
>> @@ -62,6 +62,7 @@ struct virtio_vsock {
>> struct virtio_vsock_event event_list[8];
>>
>> u32 guest_cid;
>> + bool seqpacket_allow;
>> };
>>
>> static u32 virtio_transport_get_local_cid(void)
>> @@ -443,6 +444,8 @@ static void virtio_vsock_rx_done(struct virtqueue *vq)
>> queue_work(virtio_vsock_workqueue, &vsock->rx_work);
>> }
>>
>> +static bool virtio_transport_seqpacket_allow(u32 remote_cid);
>> +
>> static struct virtio_transport virtio_transport = {
>> .transport = {
>> .module = THIS_MODULE,
>> @@ -469,6 +472,10 @@ static struct virtio_transport virtio_transport = {
>> .stream_is_active = virtio_transport_stream_is_active,
>> .stream_allow = virtio_transport_stream_allow,
>>
>> + .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
>> + .seqpacket_enqueue = virtio_transport_seqpacket_enqueue,
>> + .seqpacket_allow = virtio_transport_seqpacket_allow,
>> +
>> .notify_poll_in = virtio_transport_notify_poll_in,
>> .notify_poll_out = virtio_transport_notify_poll_out,
>> .notify_recv_init = virtio_transport_notify_recv_init,
>> @@ -485,6 +492,19 @@ static struct virtio_transport virtio_transport = {
>> .send_pkt = virtio_transport_send_pkt,
>> };
>>
>> +static bool virtio_transport_seqpacket_allow(u32 remote_cid)
>> +{
>> + struct virtio_vsock *vsock;
>> + bool seqpacket_allow;
>> +
>> + rcu_read_lock();
>> + vsock = rcu_dereference(the_virtio_vsock);
>> + seqpacket_allow = vsock->seqpacket_allow;
>> + rcu_read_unlock();
>> +
>> + return seqpacket_allow;
>> +}
>> +
>> static void virtio_transport_rx_work(struct work_struct *work)
>> {
>> struct virtio_vsock *vsock =
>> @@ -612,6 +632,10 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
>> rcu_assign_pointer(the_virtio_vsock, vsock);
>>
>> mutex_unlock(&the_virtio_vsock_mutex);
>> +
>> + if (vdev->features & (1ULL << VIRTIO_VSOCK_F_SEQPACKET))
> We should use virtio_has_feature() to check the device features.
>
>> + vsock->seqpacket_allow = true;
> When we assign the_virtio_vsock pointer, we should already set all the
> fields, so please move this code before the following block:
>
> # here
>
> vdev->priv = vsock;
> rcu_assign_pointer(the_virtio_vsock, vsock);
Ack
>
>

2021-05-13 14:50:27

by Arseny Krasnov

[permalink] [raw]
Subject: Re: [RFC PATCH v9 19/19] af_vsock: serialize writes to shared socket


On 13.05.2021 17:01, Stefano Garzarella wrote:
> On Sat, May 08, 2021 at 07:37:35PM +0300, Arseny Krasnov wrote:
>> This add logic, that serializes write access to single socket
>> by multiple threads. It is implemented be adding field with TID
>> of current writer. When writer tries to send something, it checks
>> that field is -1(free), else it sleep in the same way as waiting
>> for free space at peers' side.
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> include/net/af_vsock.h | 1 +
>> net/vmw_vsock/af_vsock.c | 10 +++++++++-
>> 2 files changed, 10 insertions(+), 1 deletion(-)
> I think you forgot to move this patch at the beginning of the series.
> It's important because in this way we can backport to stable branches
> easily.
>
> About the implementation, can't we just add a mutex that we hold until
> we have sent all the payload?
>
> I need to check other implementations like TCP.
Ok, i'll prepare this as separate patch
>
>> diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>> index 1747c0b564ef..413343f18e99 100644
>> --- a/include/net/af_vsock.h
>> +++ b/include/net/af_vsock.h
>> @@ -69,6 +69,7 @@ struct vsock_sock {
>> u64 buffer_size;
>> u64 buffer_min_size;
>> u64 buffer_max_size;
>> + pid_t tid_owner;
>>
>> /* Private to transport. */
>> void *trans;
>> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>> index 7790728465f4..1fb4a1860f6d 100644
>> --- a/net/vmw_vsock/af_vsock.c
>> +++ b/net/vmw_vsock/af_vsock.c
>> @@ -757,6 +757,7 @@ static struct sock *__vsock_create(struct net *net,
>> vsk->peer_shutdown = 0;
>> INIT_DELAYED_WORK(&vsk->connect_work, vsock_connect_timeout);
>> INIT_DELAYED_WORK(&vsk->pending_work, vsock_pending_work);
>> + vsk->tid_owner = -1;
>>
>> psk = parent ? vsock_sk(parent) : NULL;
>> if (parent) {
>> @@ -1765,7 +1766,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>> ssize_t written;
>>
>> add_wait_queue(sk_sleep(sk), &wait);
>> - while (vsock_stream_has_space(vsk) == 0 &&
>> + while ((vsock_stream_has_space(vsk) == 0 ||
>> + (vsk->tid_owner != current->pid &&
>> + vsk->tid_owner != -1)) &&
>> sk->sk_err == 0 &&
>> !(sk->sk_shutdown & SEND_SHUTDOWN) &&
>> !(vsk->peer_shutdown & RCV_SHUTDOWN)) {
>> @@ -1796,6 +1799,8 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>> goto out_err;
>> }
>> }
>> +
>> + vsk->tid_owner = current->pid;
>> remove_wait_queue(sk_sleep(sk), &wait);
>>
>> /* These checks occur both as part of and after the loop
>> @@ -1852,7 +1857,10 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>> err = total_written;
>> }
>> out:
>> + vsk->tid_owner = -1;
>> release_sock(sk);
>> + sk->sk_write_space(sk);
>> +
> Is this change related? Can you explain in the commit message why it is
> needed?
>
>> return err;
>> }
>>
>> --
>> 2.25.1
>>
>

2021-05-13 14:52:35

by Arseny Krasnov

[permalink] [raw]
Subject: Re: [RFC PATCH v9 00/19] virtio/vsock: introduce SOCK_SEQPACKET support


On 13.05.2021 17:10, Stefano Garzarella wrote:
> Hi Arseny,
>
> On Sat, May 08, 2021 at 07:30:23PM +0300, Arseny Krasnov wrote:
>> This patchset implements support of SOCK_SEQPACKET for virtio
>> transport.
>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>> do it, new bit for field 'flags' was added: SEQ_EOR. This bit is
>> set to 1 in last RW packet of message.
>> Now as packets of one socket are not reordered neither on vsock
>> nor on vhost transport layers, such bit allows to restore original
>> message on receiver's side. If user's buffer is smaller than message
>> length, when all out of size data is dropped.
>> Maximum length of datagram is not limited as in stream socket,
>> because same credit logic is used. Difference with stream socket is
>> that user is not woken up until whole record is received or error
>> occurred. Implementation also supports 'MSG_TRUNC' flags.
>> Tests also implemented.
>>
>> Thanks to [email protected] for encouragements and initial design
>> recommendations.
>>
>> Arseny Krasnov (19):
>> af_vsock: update functions for connectible socket
>> af_vsock: separate wait data loop
>> af_vsock: separate receive data loop
>> af_vsock: implement SEQPACKET receive loop
>> af_vsock: implement send logic for SEQPACKET
>> af_vsock: rest of SEQPACKET support
>> af_vsock: update comments for stream sockets
>> virtio/vsock: set packet's type in virtio_transport_send_pkt_info()
>> virtio/vsock: simplify credit update function API
>> virtio/vsock: defines and constants for SEQPACKET
>> virtio/vsock: dequeue callback for SOCK_SEQPACKET
>> virtio/vsock: add SEQPACKET receive logic
>> virtio/vsock: rest of SOCK_SEQPACKET support
>> virtio/vsock: enable SEQPACKET for transport
>> vhost/vsock: enable SEQPACKET for transport
>> vsock/loopback: enable SEQPACKET for transport
>> vsock_test: add SOCK_SEQPACKET tests
>> virtio/vsock: update trace event for SEQPACKET
>> af_vsock: serialize writes to shared socket
>>
>> drivers/vhost/vsock.c | 42 +-
>> include/linux/virtio_vsock.h | 9 +
>> include/net/af_vsock.h | 8 +
>> .../events/vsock_virtio_transport_common.h | 5 +-
>> include/uapi/linux/virtio_vsock.h | 9 +
>> net/vmw_vsock/af_vsock.c | 417 +++++++++++------
>> net/vmw_vsock/virtio_transport.c | 25 +
>> net/vmw_vsock/virtio_transport_common.c | 129 ++++-
>> net/vmw_vsock/vsock_loopback.c | 11 +
>> tools/testing/vsock/util.c | 32 +-
>> tools/testing/vsock/util.h | 3 +
>> tools/testing/vsock/vsock_test.c | 63 +++
>> 12 files changed, 594 insertions(+), 159 deletions(-)
>>
>> v8 -> v9:
>> General changelog:
>> - see per patch change log.
>>
> I reviewed this series and left some comments.
>
> Before remove the RFC tag, please check that all the commit messages
> contains the right information.
>
> Also, I recommend you take a look on how the other commits in the Linux
> tree are written because the commits in this series look like todo
> lists.
> For RFC could be fine, but for the final version it would be better to
> rewrite them following the advice written here:
> Documentation/process/submitting-patches.rst
Thank You for review, ok, i'll check it
>
> Thanks,
> Stefano
>
>

2021-05-13 14:53:02

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 19/19] af_vsock: serialize writes to shared socket

On Thu, May 13, 2021 at 04:01:50PM +0200, Stefano Garzarella wrote:
>On Sat, May 08, 2021 at 07:37:35PM +0300, Arseny Krasnov wrote:
>>This add logic, that serializes write access to single socket
>>by multiple threads. It is implemented be adding field with TID
>>of current writer. When writer tries to send something, it checks
>>that field is -1(free), else it sleep in the same way as waiting
>>for free space at peers' side.
>>
>>Signed-off-by: Arseny Krasnov <[email protected]>
>>---
>>include/net/af_vsock.h | 1 +
>>net/vmw_vsock/af_vsock.c | 10 +++++++++-
>>2 files changed, 10 insertions(+), 1 deletion(-)
>
>I think you forgot to move this patch at the beginning of the series.
>It's important because in this way we can backport to stable branches
>easily.
>
>About the implementation, can't we just add a mutex that we hold until
>we have sent all the payload?

Re-thinking, I guess we can't because we have the timeout to deal
with...

>
>I need to check other implementations like TCP.
>

Thanks,
Stefano


2021-05-13 14:55:25

by Arseny Krasnov

[permalink] [raw]
Subject: Re: [RFC PATCH v9 19/19] af_vsock: serialize writes to shared socket


On 13.05.2021 17:01, Stefano Garzarella wrote:
> On Sat, May 08, 2021 at 07:37:35PM +0300, Arseny Krasnov wrote:
>> This add logic, that serializes write access to single socket
>> by multiple threads. It is implemented be adding field with TID
>> of current writer. When writer tries to send something, it checks
>> that field is -1(free), else it sleep in the same way as waiting
>> for free space at peers' side.
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> include/net/af_vsock.h | 1 +
>> net/vmw_vsock/af_vsock.c | 10 +++++++++-
>> 2 files changed, 10 insertions(+), 1 deletion(-)
> I think you forgot to move this patch at the beginning of the series.
> It's important because in this way we can backport to stable branches
> easily.
>
> About the implementation, can't we just add a mutex that we hold until
> we have sent all the payload?
>
> I need to check other implementations like TCP.
>
>> diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>> index 1747c0b564ef..413343f18e99 100644
>> --- a/include/net/af_vsock.h
>> +++ b/include/net/af_vsock.h
>> @@ -69,6 +69,7 @@ struct vsock_sock {
>> u64 buffer_size;
>> u64 buffer_min_size;
>> u64 buffer_max_size;
>> + pid_t tid_owner;
>>
>> /* Private to transport. */
>> void *trans;
>> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>> index 7790728465f4..1fb4a1860f6d 100644
>> --- a/net/vmw_vsock/af_vsock.c
>> +++ b/net/vmw_vsock/af_vsock.c
>> @@ -757,6 +757,7 @@ static struct sock *__vsock_create(struct net *net,
>> vsk->peer_shutdown = 0;
>> INIT_DELAYED_WORK(&vsk->connect_work, vsock_connect_timeout);
>> INIT_DELAYED_WORK(&vsk->pending_work, vsock_pending_work);
>> + vsk->tid_owner = -1;
>>
>> psk = parent ? vsock_sk(parent) : NULL;
>> if (parent) {
>> @@ -1765,7 +1766,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>> ssize_t written;
>>
>> add_wait_queue(sk_sleep(sk), &wait);
>> - while (vsock_stream_has_space(vsk) == 0 &&
>> + while ((vsock_stream_has_space(vsk) == 0 ||
>> + (vsk->tid_owner != current->pid &&
>> + vsk->tid_owner != -1)) &&
>> sk->sk_err == 0 &&
>> !(sk->sk_shutdown & SEND_SHUTDOWN) &&
>> !(vsk->peer_shutdown & RCV_SHUTDOWN)) {
>> @@ -1796,6 +1799,8 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>> goto out_err;
>> }
>> }
>> +
>> + vsk->tid_owner = current->pid;
>> remove_wait_queue(sk_sleep(sk), &wait);
>>
>> /* These checks occur both as part of and after the loop
>> @@ -1852,7 +1857,10 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>> err = total_written;
>> }
>> out:
>> + vsk->tid_owner = -1;
>> release_sock(sk);
>> + sk->sk_write_space(sk);
>> +
> Is this change related? Can you explain in the commit message why it is
> needed?
This is "unlocking" of socket
>
>> return err;
>> }
>>
>> --
>> 2.25.1
>>
>

2021-05-13 18:14:51

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 04/19] af_vsock: implement SEQPACKET receive loop

On Sat, May 08, 2021 at 07:33:14PM +0300, Arseny Krasnov wrote:
>This adds receive loop for SEQPACKET. It looks like receive loop for
>STREAM, but there is a little bit difference:
>1) It doesn't call notify callbacks.
>2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
> there is no sense for these values in SEQPACKET case.
>3) It waits until whole record is received or error is found during
> receiving.
>4) It processes and sets 'MSG_TRUNC' flag.
>
>So to avoid extra conditions for two types of socket inside one loop, two
>independent functions were created.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> v8 -> v9:
> 1) 'tmp_record_len' renamed to 'fragment_len'.
> 2) MSG_TRUNC handled in af_vsock.c instead of transport.
> 3) 'flags' still passed to transport for MSG_PEEK support.

Ah, right I see, sorry for the wrong suggestion to remove it.

>
> include/net/af_vsock.h | 4 +++
> net/vmw_vsock/af_vsock.c | 72 +++++++++++++++++++++++++++++++++++++++-
> 2 files changed, 75 insertions(+), 1 deletion(-)
>
>diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>index b1c717286993..5175f5a52ce1 100644
>--- a/include/net/af_vsock.h
>+++ b/include/net/af_vsock.h
>@@ -135,6 +135,10 @@ struct vsock_transport {
> bool (*stream_is_active)(struct vsock_sock *);
> bool (*stream_allow)(u32 cid, u32 port);
>
>+ /* SEQ_PACKET. */
>+ ssize_t (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
>+ int flags, bool *msg_ready);
>+
> /* Notification. */
> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index c4f6bfa1e381..78b9af545ca8 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -1974,6 +1974,73 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
> return err;
> }
>
>+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
>+ size_t len, int flags)
>+{
>+ const struct vsock_transport *transport;
>+ bool msg_ready;
>+ struct vsock_sock *vsk;
>+ ssize_t record_len;
>+ long timeout;
>+ int err = 0;
>+ DEFINE_WAIT(wait);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+
>+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>+ msg_ready = false;
>+ record_len = 0;
>+
>+ while (1) {
>+ ssize_t fragment_len;
>+
>+ if (vsock_wait_data(sk, &wait, timeout, NULL, 0) <= 0) {
>+ /* In case of any loop break(timeout, signal
>+ * interrupt or shutdown), we report user that
>+ * nothing was copied.
>+ */
>+ err = 0;
>+ break;
>+ }
>+
>+ fragment_len = transport->seqpacket_dequeue(vsk, msg, flags, &msg_ready);
>+

So, IIUC, seqpacket_dequeue() must return the real length,
and not the bytes copied, right?

I'm not sure virtio_transport_seqpacket_do_dequeue() is doing that.
I'll post a comment on that patch.

>+ if (fragment_len < 0) {
>+ err = -ENOMEM;
>+ break;
>+ }
>+
>+ record_len += fragment_len;
>+
>+ if (msg_ready)
>+ break;
>+ }
>+
>+ if (sk->sk_err)
>+ err = -sk->sk_err;
>+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
>+ err = 0;
>+
>+ if (msg_ready && err == 0) {
>+ /* User sets MSG_TRUNC, so return real length of
>+ * packet.
>+ */
>+ if (flags & MSG_TRUNC)
>+ err = record_len;
>+ else
>+ err = len - msg->msg_iter.count;

I think is better to use msg_data_left(msg) instead of accessing fields.

>+
>+ /* Always set MSG_TRUNC if real length of packet is
>+ * bigger than user's buffer.
>+ */
>+ if (record_len > len)
>+ msg->msg_flags |= MSG_TRUNC;
>+ }
>+
>+ return err;
>+}
>+
> static int
> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> int flags)
>@@ -2029,7 +2096,10 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> goto out;
> }
>
>- err = __vsock_stream_recvmsg(sk, msg, len, flags);
>+ if (sk->sk_type == SOCK_STREAM)
>+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
>+ else
>+ err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
>
> out:
> release_sock(sk);
>--
>2.25.1
>


2021-05-13 19:13:54

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 10/19] virtio/vsock: defines and constants for SEQPACKET

On Sat, May 08, 2021 at 07:35:05PM +0300, Arseny Krasnov wrote:
>This adds set of defines and constants for SOCK_SEQPACKET
>support in vsock. Here is link to spec patch, which uses it:
>
>https://lists.oasis-open.org/archives/virtio-comment/202103/msg00069.html

Will you be submitting a new version?

>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/uapi/linux/virtio_vsock.h | 9 +++++++++
> 1 file changed, 9 insertions(+)
>
>diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
>index 1d57ed3d84d2..3dd3555b2740 100644
>--- a/include/uapi/linux/virtio_vsock.h
>+++ b/include/uapi/linux/virtio_vsock.h
>@@ -38,6 +38,9 @@
> #include <linux/virtio_ids.h>
> #include <linux/virtio_config.h>
>
>+/* The feature bitmap for virtio vsock */
>+#define VIRTIO_VSOCK_F_SEQPACKET 1 /* SOCK_SEQPACKET supported */
>+
> struct virtio_vsock_config {
> __le64 guest_cid;
> } __attribute__((packed));
>@@ -65,6 +68,7 @@ struct virtio_vsock_hdr {
>
> enum virtio_vsock_type {
> VIRTIO_VSOCK_TYPE_STREAM = 1,
>+ VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
> };
>
> enum virtio_vsock_op {
>@@ -91,4 +95,9 @@ enum virtio_vsock_shutdown {
> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
> };
>
>+/* VIRTIO_VSOCK_OP_RW flags values */
>+enum virtio_vsock_rw {
>+ VIRTIO_VSOCK_SEQ_EOR = 1,
>+};
>+
> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
>--
>2.25.1
>

Looks good:

Reviewed-by: Stefano Garzarella <[email protected]>


2021-05-13 19:30:20

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 12/19] virtio/vsock: add SEQPACKET receive logic

On Sat, May 08, 2021 at 07:35:40PM +0300, Arseny Krasnov wrote:
>This modifies current receive logic for SEQPACKET support:
>1) Inserts 'RW' packet to socket's rx queue, but without merging with
> buffer of last packet in queue.

This is not true anymore, right?

>2) Performs check for packet and socket types on receive(if mismatch,
> then reset connection).
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>Reviewed-by: Stefano Garzarella <[email protected]>

Also this patch is changed :-)

>---
> net/vmw_vsock/virtio_transport_common.c | 28 +++++++++++++++++++++++--
> 1 file changed, 26 insertions(+), 2 deletions(-)
>
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index f649a21dd23b..7fea0a2192f7 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -165,6 +165,14 @@ void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
> }
> EXPORT_SYMBOL_GPL(virtio_transport_deliver_tap_pkt);
>
>+static u16 virtio_transport_get_type(struct sock *sk)
>+{
>+ if (sk->sk_type == SOCK_STREAM)
>+ return VIRTIO_VSOCK_TYPE_STREAM;
>+ else
>+ return VIRTIO_VSOCK_TYPE_SEQPACKET;
>+}
>+
> /* This function can only be used on connecting/connected sockets,
> * since a socket assigned to a transport is required.
> *
>@@ -980,11 +988,15 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
> /* If there is space in the last packet queued, we copy the
> * new packet in its buffer.
> */
>- if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
>+ if ((pkt->len <= last_pkt->buf_len - last_pkt->len) &&
>+ !(le32_to_cpu(last_pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)) {

Maybe we should update the comment above.

> memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
> pkt->len);
> last_pkt->len += pkt->len;
> free_pkt = true;
>+
>+ if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)
>+ last_pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);

What about doing the following in any case?

last_pkt->hdr.flags |= pkt->hdr.flags;

> goto out;
> }
> }


2021-05-13 19:47:52

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 17/19] vsock_test: add SOCK_SEQPACKET tests

On Sat, May 08, 2021 at 07:37:00PM +0300, Arseny Krasnov wrote:
>This adds two tests of SOCK_SEQPACKET socket: both transfer data and
>then test MSG_EOR and MSG_TRUNC flags. Cases for connect(), bind(),
^
We removed the MSG_EOR tests, right?

>etc. are not tested, because it is same as for stream socket.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> tools/testing/vsock/util.c | 32 +++++++++++++---
> tools/testing/vsock/util.h | 3 ++
> tools/testing/vsock/vsock_test.c | 63 ++++++++++++++++++++++++++++++++
> 3 files changed, 93 insertions(+), 5 deletions(-)
>
>diff --git a/tools/testing/vsock/util.c b/tools/testing/vsock/util.c
>index 93cbd6f603f9..2acbb7703c6a 100644
>--- a/tools/testing/vsock/util.c
>+++ b/tools/testing/vsock/util.c
>@@ -84,7 +84,7 @@ void vsock_wait_remote_close(int fd)
> }
>
> /* Connect to <cid, port> and return the file descriptor. */
>-int vsock_stream_connect(unsigned int cid, unsigned int port)
>+static int vsock_connect(unsigned int cid, unsigned int port, int type)
> {
> union {
> struct sockaddr sa;
>@@ -101,7 +101,7 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)
>
> control_expectln("LISTENING");
>
>- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
>+ fd = socket(AF_VSOCK, type, 0);
>
> timeout_begin(TIMEOUT);
> do {
>@@ -120,11 +120,21 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)
> return fd;
> }
>
>+int vsock_stream_connect(unsigned int cid, unsigned int port)
>+{
>+ return vsock_connect(cid, port, SOCK_STREAM);
>+}
>+
>+int vsock_seqpacket_connect(unsigned int cid, unsigned int port)
>+{
>+ return vsock_connect(cid, port, SOCK_SEQPACKET);
>+}
>+
> /* Listen on <cid, port> and return the first incoming connection. The remote
> * address is stored to clientaddrp. clientaddrp may be NULL.
> */
>-int vsock_stream_accept(unsigned int cid, unsigned int port,
>- struct sockaddr_vm *clientaddrp)
>+static int vsock_accept(unsigned int cid, unsigned int port,
>+ struct sockaddr_vm *clientaddrp, int type)
> {
> union {
> struct sockaddr sa;
>@@ -145,7 +155,7 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
> int client_fd;
> int old_errno;
>
>- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
>+ fd = socket(AF_VSOCK, type, 0);
>
> if (bind(fd, &addr.sa, sizeof(addr.svm)) < 0) {
> perror("bind");
>@@ -189,6 +199,18 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
> return client_fd;
> }
>
>+int vsock_stream_accept(unsigned int cid, unsigned int port,
>+ struct sockaddr_vm *clientaddrp)
>+{
>+ return vsock_accept(cid, port, clientaddrp, SOCK_STREAM);
>+}
>+
>+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
>+ struct sockaddr_vm *clientaddrp)
>+{
>+ return vsock_accept(cid, port, clientaddrp, SOCK_SEQPACKET);
>+}
>+
> /* Transmit one byte and check the return value.
> *
> * expected_ret:
>diff --git a/tools/testing/vsock/util.h b/tools/testing/vsock/util.h
>index e53dd09d26d9..a3375ad2fb7f 100644
>--- a/tools/testing/vsock/util.h
>+++ b/tools/testing/vsock/util.h
>@@ -36,8 +36,11 @@ struct test_case {
> void init_signals(void);
> unsigned int parse_cid(const char *str);
> int vsock_stream_connect(unsigned int cid, unsigned int port);
>+int vsock_seqpacket_connect(unsigned int cid, unsigned int port);
> int vsock_stream_accept(unsigned int cid, unsigned int port,
> struct sockaddr_vm *clientaddrp);
>+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
>+ struct sockaddr_vm *clientaddrp);
> void vsock_wait_remote_close(int fd);
> void send_byte(int fd, int expected_ret, int flags);
> void recv_byte(int fd, int expected_ret, int flags);
>diff --git a/tools/testing/vsock/vsock_test.c b/tools/testing/vsock/vsock_test.c
>index 5a4fb80fa832..ffec985fd36f 100644
>--- a/tools/testing/vsock/vsock_test.c
>+++ b/tools/testing/vsock/vsock_test.c
>@@ -14,6 +14,8 @@
> #include <errno.h>
> #include <unistd.h>
> #include <linux/kernel.h>
>+#include <sys/types.h>
>+#include <sys/socket.h>
>
> #include "timeout.h"
> #include "control.h"
>@@ -279,6 +281,62 @@ static void test_stream_msg_peek_server(const struct test_opts *opts)
> close(fd);
> }
>
>+#define MESSAGE_TRUNC_SZ 32
>+static void test_seqpacket_msg_trunc_client(const struct test_opts *opts)
>+{
>+ int fd;
>+ char buf[MESSAGE_TRUNC_SZ];
>+
>+ fd = vsock_seqpacket_connect(opts->peer_cid, 1234);
>+ if (fd < 0) {
>+ perror("connect");
>+ exit(EXIT_FAILURE);
>+ }
>+
>+ if (send(fd, buf, sizeof(buf), 0) != sizeof(buf)) {
>+ perror("send failed");
>+ exit(EXIT_FAILURE);
>+ }
>+
>+ control_writeln("SENDDONE");
>+ close(fd);
>+}
>+
>+static void test_seqpacket_msg_trunc_server(const struct test_opts *opts)
>+{
>+ int fd;
>+ char buf[MESSAGE_TRUNC_SZ / 2];
>+ struct msghdr msg = {0};
>+ struct iovec iov = {0};
>+
>+ fd = vsock_seqpacket_accept(VMADDR_CID_ANY, 1234, NULL);
>+ if (fd < 0) {
>+ perror("accept");
>+ exit(EXIT_FAILURE);
>+ }
>+
>+ control_expectln("SENDDONE");
>+ iov.iov_base = buf;
>+ iov.iov_len = sizeof(buf);
>+ msg.msg_iov = &iov;
>+ msg.msg_iovlen = 1;
>+
>+ ssize_t ret = recvmsg(fd, &msg, MSG_TRUNC);
>+
>+ if (ret != MESSAGE_TRUNC_SZ) {
>+ printf("%zi\n", ret);
>+ perror("MSG_TRUNC doesn't work");
>+ exit(EXIT_FAILURE);
>+ }
>+
>+ if (!(msg.msg_flags & MSG_TRUNC)) {
>+ fprintf(stderr, "MSG_TRUNC expected\n");
>+ exit(EXIT_FAILURE);
>+ }
>+
>+ close(fd);
>+}
>+
> static struct test_case test_cases[] = {
> {
> .name = "SOCK_STREAM connection reset",
>@@ -309,6 +367,11 @@ static struct test_case test_cases[] = {
> .run_client = test_stream_msg_peek_client,
> .run_server = test_stream_msg_peek_server,
> },
>+ {
>+ .name = "SOCK_SEQPACKET send data MSG_TRUNC",
>+ .run_client = test_seqpacket_msg_trunc_client,
>+ .run_server = test_seqpacket_msg_trunc_server,
>+ },
> {},
> };
>
>--
>2.25.1
>


2021-05-13 20:23:05

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 11/19] virtio/vsock: dequeue callback for SOCK_SEQPACKET

On Sat, May 08, 2021 at 07:35:20PM +0300, Arseny Krasnov wrote:
>This adds transport callback and it's logic for SEQPACKET dequeue.
>Callback fetches RW packets from rx queue of socket until whole record
>is copied(if user's buffer is full, user is not woken up). This is done
>to not stall sender, because if we wake up user and it leaves syscall,
>nobody will send credit update for rest of record, and sender will wait
>for next enter of read syscall at receiver's side. So if user buffer is
>full, we just send credit update and drop data.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> v8 -> v9:
> 1) Check for RW packet type is removed from loop(all packet now
> considered RW).
> 2) Locking in loop is fixed.
> 3) cpu_to_le32()/le32_to_cpu() now used.
> 4) MSG_TRUNC handling removed from transport.
>
> include/linux/virtio_vsock.h | 5 ++
> net/vmw_vsock/virtio_transport_common.c | 64 +++++++++++++++++++++++++
> 2 files changed, 69 insertions(+)
>
>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>index dc636b727179..02acf6e9ae04 100644
>--- a/include/linux/virtio_vsock.h
>+++ b/include/linux/virtio_vsock.h
>@@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags);
>
>+ssize_t
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ int flags,
>+ bool *msg_ready);
> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index ad0d34d41444..f649a21dd23b 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -393,6 +393,58 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> return err;
> }
>
>+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ int flags,
>+ bool *msg_ready)
>+{
>+ struct virtio_vsock_sock *vvs = vsk->trans;
>+ struct virtio_vsock_pkt *pkt;
>+ int err = 0;
>+ size_t user_buf_len = msg->msg_iter.count;

Forgot to mention that also here is better to use `msg_data_left(msg)`

Thanks,
Stefano


2021-05-13 20:52:04

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 15/19] vhost/vsock: enable SEQPACKET for transport

On Sat, May 08, 2021 at 07:36:31PM +0300, Arseny Krasnov wrote:
>This removes:
>1) Ignore of non-stream type of packets.
>This adds:
>1) Handling of SEQPACKET bit: if guest sets features with this bit cleared,
> then SOCK_SEQPACKET support will be disabled.
>2) 'seqpacket_allow()' callback.
>3) Handling of SEQ_EOR bit: when vhost places data in buffers of guest's
> rx queue, keep this bit set only when last piece of data is copied.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> v8 -> v9:
> 1) Move 'seqpacket_allow' to 'struct vhost_vsock'.
> 2) Use cpu_to_le32()/le32_to_cpu() to work with 'flags' of packet.
>
> drivers/vhost/vsock.c | 42 +++++++++++++++++++++++++++++++++++++++---
> 1 file changed, 39 insertions(+), 3 deletions(-)
>
>diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
>index 5e78fb719602..3395b25d4a35 100644
>--- a/drivers/vhost/vsock.c
>+++ b/drivers/vhost/vsock.c
>@@ -31,7 +31,8 @@
>
> enum {
> VHOST_VSOCK_FEATURES = VHOST_FEATURES |
>- (1ULL << VIRTIO_F_ACCESS_PLATFORM)
>+ (1ULL << VIRTIO_F_ACCESS_PLATFORM) |
>+ (1ULL << VIRTIO_VSOCK_F_SEQPACKET)
> };
>
> enum {
>@@ -56,6 +57,7 @@ struct vhost_vsock {
> atomic_t queued_replies;
>
> u32 guest_cid;
>+ bool seqpacket_allow;
> };
>
> static u32 vhost_transport_get_local_cid(void)
>@@ -112,6 +114,7 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
> size_t nbytes;
> size_t iov_len, payload_len;
> int head;
>+ bool restore_flag = false;
>
> spin_lock_bh(&vsock->send_pkt_list_lock);
> if (list_empty(&vsock->send_pkt_list)) {
>@@ -174,6 +177,12 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
> /* Set the correct length in the header */
> pkt->hdr.len = cpu_to_le32(payload_len);
>
>+ if (pkt->off + payload_len < pkt->len &&
>+ le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) {
>+ pkt->hdr.flags &= ~cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
>+ restore_flag = true;
>+ }

I think is better to move this code in the same block when we limit
payload_len, something like this (not tested):

/* If the packet is greater than the space available in the
* buffer, we split it using multiple buffers.
*/
if (payload_len > iov_len - sizeof(pkt->hdr)) {
payload_len = iov_len - sizeof(pkt->hdr);

if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) {
pkt->hdr.flags &= ~cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
restore_flag = true;
}
}

The rest LGTM.


2021-05-13 22:16:20

by Arseny Krasnov

[permalink] [raw]
Subject: Re: [RFC PATCH v9 04/19] af_vsock: implement SEQPACKET receive loop


On 13.05.2021 14:37, Stefano Garzarella wrote:
> On Sat, May 08, 2021 at 07:33:14PM +0300, Arseny Krasnov wrote:
>> This adds receive loop for SEQPACKET. It looks like receive loop for
>> STREAM, but there is a little bit difference:
>> 1) It doesn't call notify callbacks.
>> 2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
>> there is no sense for these values in SEQPACKET case.
>> 3) It waits until whole record is received or error is found during
>> receiving.
>> 4) It processes and sets 'MSG_TRUNC' flag.
>>
>> So to avoid extra conditions for two types of socket inside one loop, two
>> independent functions were created.
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> v8 -> v9:
>> 1) 'tmp_record_len' renamed to 'fragment_len'.
>> 2) MSG_TRUNC handled in af_vsock.c instead of transport.
>> 3) 'flags' still passed to transport for MSG_PEEK support.
> Ah, right I see, sorry for the wrong suggestion to remove it.
>
>> include/net/af_vsock.h | 4 +++
>> net/vmw_vsock/af_vsock.c | 72 +++++++++++++++++++++++++++++++++++++++-
>> 2 files changed, 75 insertions(+), 1 deletion(-)
>>
>> diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>> index b1c717286993..5175f5a52ce1 100644
>> --- a/include/net/af_vsock.h
>> +++ b/include/net/af_vsock.h
>> @@ -135,6 +135,10 @@ struct vsock_transport {
>> bool (*stream_is_active)(struct vsock_sock *);
>> bool (*stream_allow)(u32 cid, u32 port);
>>
>> + /* SEQ_PACKET. */
>> + ssize_t (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
>> + int flags, bool *msg_ready);
>> +
>> /* Notification. */
>> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
>> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
>> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>> index c4f6bfa1e381..78b9af545ca8 100644
>> --- a/net/vmw_vsock/af_vsock.c
>> +++ b/net/vmw_vsock/af_vsock.c
>> @@ -1974,6 +1974,73 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
>> return err;
>> }
>>
>> +static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
>> + size_t len, int flags)
>> +{
>> + const struct vsock_transport *transport;
>> + bool msg_ready;
>> + struct vsock_sock *vsk;
>> + ssize_t record_len;
>> + long timeout;
>> + int err = 0;
>> + DEFINE_WAIT(wait);
>> +
>> + vsk = vsock_sk(sk);
>> + transport = vsk->transport;
>> +
>> + timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>> + msg_ready = false;
>> + record_len = 0;
>> +
>> + while (1) {
>> + ssize_t fragment_len;
>> +
>> + if (vsock_wait_data(sk, &wait, timeout, NULL, 0) <= 0) {
>> + /* In case of any loop break(timeout, signal
>> + * interrupt or shutdown), we report user that
>> + * nothing was copied.
>> + */
>> + err = 0;
>> + break;
>> + }
>> +
>> + fragment_len = transport->seqpacket_dequeue(vsk, msg, flags, &msg_ready);
>> +
> So, IIUC, seqpacket_dequeue() must return the real length,
> and not the bytes copied, right?
>
> I'm not sure virtio_transport_seqpacket_do_dequeue() is doing that.
> I'll post a comment on that patch.
Ok, i'll check it
>
>> + if (fragment_len < 0) {
>> + err = -ENOMEM;
>> + break;
>> + }
>> +
>> + record_len += fragment_len;
>> +
>> + if (msg_ready)
>> + break;
>> + }
>> +
>> + if (sk->sk_err)
>> + err = -sk->sk_err;
>> + else if (sk->sk_shutdown & RCV_SHUTDOWN)
>> + err = 0;
>> +
>> + if (msg_ready && err == 0) {
>> + /* User sets MSG_TRUNC, so return real length of
>> + * packet.
>> + */
>> + if (flags & MSG_TRUNC)
>> + err = record_len;
>> + else
>> + err = len - msg->msg_iter.count;
> I think is better to use msg_data_left(msg) instead of accessing fields.
Ack
>
>> +
>> + /* Always set MSG_TRUNC if real length of packet is
>> + * bigger than user's buffer.
>> + */
>> + if (record_len > len)
>> + msg->msg_flags |= MSG_TRUNC;
>> + }
>> +
>> + return err;
>> +}
>> +
>> static int
>> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>> int flags)
>> @@ -2029,7 +2096,10 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>> goto out;
>> }
>>
>> - err = __vsock_stream_recvmsg(sk, msg, len, flags);
>> + if (sk->sk_type == SOCK_STREAM)
>> + err = __vsock_stream_recvmsg(sk, msg, len, flags);
>> + else
>> + err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
>>
>> out:
>> release_sock(sk);
>> --
>> 2.25.1
>>
>

2021-05-13 22:33:51

by Arseny Krasnov

[permalink] [raw]
Subject: Re: [RFC PATCH v9 19/19] af_vsock: serialize writes to shared socket


On 13.05.2021 17:46, Stefano Garzarella wrote:
> On Thu, May 13, 2021 at 04:01:50PM +0200, Stefano Garzarella wrote:
>> On Sat, May 08, 2021 at 07:37:35PM +0300, Arseny Krasnov wrote:
>>> This add logic, that serializes write access to single socket
>>> by multiple threads. It is implemented be adding field with TID
>>> of current writer. When writer tries to send something, it checks
>>> that field is -1(free), else it sleep in the same way as waiting
>>> for free space at peers' side.
>>>
>>> Signed-off-by: Arseny Krasnov <[email protected]>
>>> ---
>>> include/net/af_vsock.h | 1 +
>>> net/vmw_vsock/af_vsock.c | 10 +++++++++-
>>> 2 files changed, 10 insertions(+), 1 deletion(-)
>> I think you forgot to move this patch at the beginning of the series.
>> It's important because in this way we can backport to stable branches
>> easily.
>>
>> About the implementation, can't we just add a mutex that we hold until
>> we have sent all the payload?
> Re-thinking, I guess we can't because we have the timeout to deal
> with...
Yes, i forgot about why i've implemented it using 'tid_owner' :)
>
>> I need to check other implementations like TCP.
>>
> Thanks,
> Stefano
>
>

2021-05-13 22:33:51

by Arseny Krasnov

[permalink] [raw]
Subject: Re: [RFC PATCH v9 12/19] virtio/vsock: add SEQPACKET receive logic


On 13.05.2021 15:14, Stefano Garzarella wrote:
> On Sat, May 08, 2021 at 07:35:40PM +0300, Arseny Krasnov wrote:
>> This modifies current receive logic for SEQPACKET support:
>> 1) Inserts 'RW' packet to socket's rx queue, but without merging with
>> buffer of last packet in queue.
> This is not true anymore, right?
>
>> 2) Performs check for packet and socket types on receive(if mismatch,
>> then reset connection).
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> Reviewed-by: Stefano Garzarella <[email protected]>
> Also this patch is changed :-)
>
>> ---
>> net/vmw_vsock/virtio_transport_common.c | 28 +++++++++++++++++++++++--
>> 1 file changed, 26 insertions(+), 2 deletions(-)
>>
>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>> index f649a21dd23b..7fea0a2192f7 100644
>> --- a/net/vmw_vsock/virtio_transport_common.c
>> +++ b/net/vmw_vsock/virtio_transport_common.c
>> @@ -165,6 +165,14 @@ void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
>> }
>> EXPORT_SYMBOL_GPL(virtio_transport_deliver_tap_pkt);
>>
>> +static u16 virtio_transport_get_type(struct sock *sk)
>> +{
>> + if (sk->sk_type == SOCK_STREAM)
>> + return VIRTIO_VSOCK_TYPE_STREAM;
>> + else
>> + return VIRTIO_VSOCK_TYPE_SEQPACKET;
>> +}
>> +
>> /* This function can only be used on connecting/connected sockets,
>> * since a socket assigned to a transport is required.
>> *
>> @@ -980,11 +988,15 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
>> /* If there is space in the last packet queued, we copy the
>> * new packet in its buffer.
>> */
>> - if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
>> + if ((pkt->len <= last_pkt->buf_len - last_pkt->len) &&
>> + !(le32_to_cpu(last_pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)) {
> Maybe we should update the comment above.
>
>> memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
>> pkt->len);
>> last_pkt->len += pkt->len;
>> free_pkt = true;
>> +
>> + if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)
>> + last_pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
> What about doing the following in any case?
>
> last_pkt->hdr.flags |= pkt->hdr.flags;
Ack
>
>> goto out;
>> }
>> }
>

2021-05-13 22:35:47

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v9 19/19] af_vsock: serialize writes to shared socket

On Thu, May 13, 2021 at 05:48:19PM +0300, Arseny Krasnov wrote:
>
>On 13.05.2021 17:46, Stefano Garzarella wrote:
>> On Thu, May 13, 2021 at 04:01:50PM +0200, Stefano Garzarella wrote:
>>> On Sat, May 08, 2021 at 07:37:35PM +0300, Arseny Krasnov wrote:
>>>> This add logic, that serializes write access to single socket
>>>> by multiple threads. It is implemented be adding field with TID
>>>> of current writer. When writer tries to send something, it checks
>>>> that field is -1(free), else it sleep in the same way as waiting
>>>> for free space at peers' side.
>>>>
>>>> Signed-off-by: Arseny Krasnov <[email protected]>
>>>> ---
>>>> include/net/af_vsock.h | 1 +
>>>> net/vmw_vsock/af_vsock.c | 10 +++++++++-
>>>> 2 files changed, 10 insertions(+), 1 deletion(-)
>>> I think you forgot to move this patch at the beginning of the series.
>>> It's important because in this way we can backport to stable branches
>>> easily.
>>>
>>> About the implementation, can't we just add a mutex that we hold until
>>> we have sent all the payload?
>> Re-thinking, I guess we can't because we have the timeout to deal
>> with...
>Yes, i forgot about why i've implemented it using 'tid_owner' :)

It is not clear to me if we need to do this also for stream.

I think will be better to follow af_inet/af_unix, but I need to check
their implementation.

Thanks,
Stefano