2021-01-15 05:55:27

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 00/13] virtio/vsock: introduce SOCK_SEQPACKET support.

This patchset impelements support of SOCK_SEQPACKET for virtio
transport.
As SOCK_SEQPACKET guarantees to save record boundaries, so to
do it, new packet operation was added: it marks start of record (with
record length in header), such packet doesn't carry any data. To send
record, packet with start marker is sent first, then all data is sent
as usual 'RW' packets. On receiver's side, length of record is known
from packet with start record marker. Now as packets of one socket
are not reordered neither on vsock nor on vhost transport layers, such
marker allows to restore original record on receiver's side. If user's
buffer is smaller that record length, when all out of size data is
dropped.
Maximum length of datagram is not limited as in stream socket,
because same credit logic is used. Difference with stream socket is
that user is not woken up until whole record is received or error
occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
Tests also implemented.

Arseny Krasnov (13):
af_vsock: implement 'vsock_wait_data()'.
af_vsock: separate rx loops for STREAM/SEQPACKET.
af_vsock: implement rx loops entry point
af_vsock: replace previous stream rx loop.
af_vsock: implement send logic for SOCK_SEQPACKET
af_vsock: general support of SOCK_SEQPACKET type.
af_vsock: update comments for stream sockets.
virtio/vsock: dequeue callback for SOCK_SEQPACKET.
virtio/vsock: implement fetch of record length
virtio/vsock: update receive logic
virtio/vsock: rest of SOCK_SEQPACKET support
vhost/vsock: support for SOCK_SEQPACKET socket.
vsock_test: add SOCK_SEQPACKET tests.

drivers/vhost/vsock.c | 7 +-
include/linux/virtio_vsock.h | 12 +
include/net/af_vsock.h | 6 +
include/uapi/linux/virtio_vsock.h | 9 +
net/vmw_vsock/af_vsock.c | 483 ++++++++++++++++------
net/vmw_vsock/virtio_transport.c | 4 +
net/vmw_vsock/virtio_transport_common.c | 294 +++++++++++--
tools/testing/vsock/util.c | 32 +-
tools/testing/vsock/util.h | 3 +
tools/testing/vsock/vsock_test.c | 126 ++++++
10 files changed, 824 insertions(+), 152 deletions(-)

v1 -> v2:
- patches reordered: af_vsock.c changes now before virtio vsock
- patches reorganized: more small patches, where +/- are not mixed
- tests for SOCK_SEQPACKET added
- all commit messages updated
- af_vsock.c: 'vsock_pre_recv_check()' inlined to
'vsock_connectible_recvmsg()'
- af_vsock.c: 'vsock_assign_transport()' returns ENODEV if transport
was not found
- virtio_transport_common.c: transport callback for seqpacket dequeue
- virtio_transport_common.c: simplified
'virtio_transport_recv_connected()'
- virtio_transport_common.c: send reset on socket and packet type
mismatch.

Signed-off-by: Arseny Krasnov <[email protected]>

--
2.25.1


2021-01-15 05:56:42

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 02/13] af_vsock: separate rx loops for STREAM/SEQPACKET.

This adds two receive loops: for SOCK_STREAM and SOCK_SEQPACKET. Both are
look like twins, but SEQPACKET is a little bit different from STREAM:
1) It doesn't call notify callbacks.
2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
there is no sense for these values in SEQPACKET case.
3) It waits until whole record is received or error is found during
receiving.
4) It processes and sets 'MSG_TRUNC' flag.

So to avoid extra conditions for two types of socket inside on loop, two
independent functions were created.

Signed-off-by: Arseny Krasnov <[email protected]>
---
include/net/af_vsock.h | 5 +
net/vmw_vsock/af_vsock.c | 202 +++++++++++++++++++++++++++++++++++++++
2 files changed, 207 insertions(+)

diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index b1c717286993..46073842d489 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -135,6 +135,11 @@ struct vsock_transport {
bool (*stream_is_active)(struct vsock_sock *);
bool (*stream_allow)(u32 cid, u32 port);

+ /* SEQ_PACKET. */
+ size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
+ ssize_t (*seqpacket_dequeue)(struct vsock_sock *, struct msghdr *,
+ size_t len, int flags);
+
/* Notification. */
int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index af716f5a93a4..afacbe9f4231 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1870,6 +1870,208 @@ static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
return err;
}

+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
+{
+ int err = 0;
+ size_t record_len;
+ struct vsock_sock *vsk;
+ const struct vsock_transport *transport;
+ long timeout;
+ ssize_t dequeued_total = 0;
+ unsigned long orig_nr_segs;
+ const struct iovec *orig_iov;
+ DEFINE_WAIT(wait);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ msg->msg_flags &= ~MSG_EOR;
+ orig_nr_segs = msg->msg_iter.nr_segs;
+ orig_iov = msg->msg_iter.iov;
+
+ while (1) {
+ s64 ready;
+
+ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
+ ready = vsock_stream_has_data(vsk);
+
+ if (ready == 0) {
+ if (vsock_wait_data(sk, &wait, timeout, NULL, 0)) {
+ /* In case of any loop break(timeout, signal
+ * interrupt or shutdown), we report user that
+ * nothing was copied.
+ */
+ dequeued_total = 0;
+ break;
+ }
+ } else {
+ ssize_t dequeued;
+
+ finish_wait(sk_sleep(sk), &wait);
+
+ if (ready < 0) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ if (dequeued_total == 0) {
+ record_len =
+ transport->seqpacket_seq_get_len(vsk);
+
+ if (record_len == 0)
+ continue;
+ }
+
+ /* 'msg_iter.count' is number of unused bytes in iov.
+ * On every copy to iov iterator it is decremented at
+ * size of data.
+ */
+ dequeued = transport->seqpacket_dequeue(vsk, msg,
+ msg->msg_iter.count, flags);
+
+ if (dequeued < 0) {
+ dequeued_total = 0;
+
+ if (dequeued == -EAGAIN) {
+ iov_iter_init(&msg->msg_iter, READ,
+ orig_iov, orig_nr_segs,
+ len);
+ msg->msg_flags &= ~MSG_EOR;
+ continue;
+ }
+
+ err = -ENOMEM;
+ break;
+ }
+
+ dequeued_total += dequeued;
+
+ if (dequeued_total >= record_len)
+ break;
+ }
+ }
+ if (sk->sk_err)
+ err = -sk->sk_err;
+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
+ err = 0;
+
+ if (dequeued_total > 0) {
+ /* User sets MSG_TRUNC, so return real length of
+ * packet.
+ */
+ if (flags & MSG_TRUNC)
+ err = record_len;
+ else
+ err = len - msg->msg_iter.count;
+
+ /* Always set MSG_TRUNC if real length of packet is
+ * bigger that user buffer.
+ */
+ if (record_len > len)
+ msg->msg_flags |= MSG_TRUNC;
+ }
+out:
+ return err;
+}
+
+static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
+{
+ int err;
+ const struct vsock_transport *transport;
+ struct vsock_sock *vsk;
+ size_t target;
+ struct vsock_transport_recv_notify_data recv_data;
+ long timeout;
+ ssize_t copied;
+
+ DEFINE_WAIT(wait);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
+ /* We must not copy less than target bytes into the user's buffer
+ * before returning successfully, so we wait for the consume queue to
+ * have that much data to consume before dequeueing. Note that this
+ * makes it impossible to handle cases where target is greater than the
+ * queue size.
+ */
+ target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
+ if (target >= transport->stream_rcvhiwat(vsk)) {
+ err = -ENOMEM;
+ goto out;
+ }
+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ copied = 0;
+
+ err = transport->notify_recv_init(vsk, target, &recv_data);
+ if (err < 0)
+ goto out;
+
+ while (1) {
+ s64 ready;
+
+ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
+ ready = vsock_stream_has_data(vsk);
+
+ if (ready == 0) {
+ if (vsock_wait_data(sk, &wait, timeout, &recv_data, target))
+ break;
+ } else {
+ ssize_t read;
+
+ finish_wait(sk_sleep(sk), &wait);
+
+ if (ready < 0) {
+ /* Invalid queue pair content. XXX This should
+ * be changed to a connection reset in a later
+ * change.
+ */
+
+ err = -ENOMEM;
+ goto out;
+ }
+
+ err = transport->notify_recv_pre_dequeue(vsk,
+ target, &recv_data);
+ if (err < 0)
+ break;
+ read = transport->stream_dequeue(vsk, msg, len - copied, flags);
+
+ if (read < 0) {
+ err = -ENOMEM;
+ break;
+ }
+
+ copied += read;
+
+ err = transport->notify_recv_post_dequeue(vsk,
+ target, read,
+ !(flags & MSG_PEEK), &recv_data);
+ if (err < 0)
+ goto out;
+
+ if (read >= target || flags & MSG_PEEK)
+ break;
+
+ target -= read;
+ }
+ }
+
+ if (sk->sk_err)
+ err = -sk->sk_err;
+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
+ err = 0;
+ if (copied > 0)
+ err = copied;
+
+out:
+ release_sock(sk);
+ return err;
+}
+
static int
vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
int flags)
--
2.25.1

2021-01-15 05:57:36

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 04/13] af_vsock: replace previous stream rx loop.

This removes previous 'vsock_stream_recvmsg()' and uses newly implemented
receive loops. Moved to separate patch to make review easier.

Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 184 +++------------------------------------
1 file changed, 12 insertions(+), 172 deletions(-)

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 38afaa90d141..5bf887190881 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -2072,178 +2072,6 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
return err;
}

-static int
-vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
- int flags)
-{
- struct sock *sk;
- struct vsock_sock *vsk;
- const struct vsock_transport *transport;
- int err;
- size_t target;
- ssize_t copied;
- long timeout;
- struct vsock_transport_recv_notify_data recv_data;
-
- DEFINE_WAIT(wait);
-
- sk = sock->sk;
- vsk = vsock_sk(sk);
- transport = vsk->transport;
- err = 0;
-
- lock_sock(sk);
-
- if (!transport || sk->sk_state != TCP_ESTABLISHED) {
- /* Recvmsg is supposed to return 0 if a peer performs an
- * orderly shutdown. Differentiate between that case and when a
- * peer has not connected or a local shutdown occured with the
- * SOCK_DONE flag.
- */
- if (sock_flag(sk, SOCK_DONE))
- err = 0;
- else
- err = -ENOTCONN;
-
- goto out;
- }
-
- if (flags & MSG_OOB) {
- err = -EOPNOTSUPP;
- goto out;
- }
-
- /* We don't check peer_shutdown flag here since peer may actually shut
- * down, but there can be data in the queue that a local socket can
- * receive.
- */
- if (sk->sk_shutdown & RCV_SHUTDOWN) {
- err = 0;
- goto out;
- }
-
- /* It is valid on Linux to pass in a zero-length receive buffer. This
- * is not an error. We may as well bail out now.
- */
- if (!len) {
- err = 0;
- goto out;
- }
-
- /* We must not copy less than target bytes into the user's buffer
- * before returning successfully, so we wait for the consume queue to
- * have that much data to consume before dequeueing. Note that this
- * makes it impossible to handle cases where target is greater than the
- * queue size.
- */
- target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
- if (target >= transport->stream_rcvhiwat(vsk)) {
- err = -ENOMEM;
- goto out;
- }
- timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
- copied = 0;
-
- err = transport->notify_recv_init(vsk, target, &recv_data);
- if (err < 0)
- goto out;
-
-
- while (1) {
- s64 ready;
-
- prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
- ready = vsock_stream_has_data(vsk);
-
- if (ready == 0) {
- if (sk->sk_err != 0 ||
- (sk->sk_shutdown & RCV_SHUTDOWN) ||
- (vsk->peer_shutdown & SEND_SHUTDOWN)) {
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
- /* Don't wait for non-blocking sockets. */
- if (timeout == 0) {
- err = -EAGAIN;
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
-
- err = transport->notify_recv_pre_block(
- vsk, target, &recv_data);
- if (err < 0) {
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
- release_sock(sk);
- timeout = schedule_timeout(timeout);
- lock_sock(sk);
-
- if (signal_pending(current)) {
- err = sock_intr_errno(timeout);
- finish_wait(sk_sleep(sk), &wait);
- break;
- } else if (timeout == 0) {
- err = -EAGAIN;
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
- } else {
- ssize_t read;
-
- finish_wait(sk_sleep(sk), &wait);
-
- if (ready < 0) {
- /* Invalid queue pair content. XXX This should
- * be changed to a connection reset in a later
- * change.
- */
-
- err = -ENOMEM;
- goto out;
- }
-
- err = transport->notify_recv_pre_dequeue(
- vsk, target, &recv_data);
- if (err < 0)
- break;
-
- read = transport->stream_dequeue(
- vsk, msg,
- len - copied, flags);
- if (read < 0) {
- err = -ENOMEM;
- break;
- }
-
- copied += read;
-
- err = transport->notify_recv_post_dequeue(
- vsk, target, read,
- !(flags & MSG_PEEK), &recv_data);
- if (err < 0)
- goto out;
-
- if (read >= target || flags & MSG_PEEK)
- break;
-
- target -= read;
- }
- }
-
- if (sk->sk_err)
- err = -sk->sk_err;
- else if (sk->sk_shutdown & RCV_SHUTDOWN)
- err = 0;
-
- if (copied > 0)
- err = copied;
-
-out:
- release_sock(sk);
- return err;
-}
-
static int vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg,
size_t len, int flags)
{
@@ -2299,6 +2127,18 @@ static int vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg,
return err;
}

+static int vsock_seqpacket_recvmsg(struct socket *sock, struct msghdr *msg,
+ size_t len, int flags)
+{
+ return vsock_connectible_recvmsg(sock, msg, len, flags);
+}
+
+static int vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg,
+ size_t len, int flags)
+{
+ return vsock_connectible_recvmsg(sock, msg, len, flags);
+}
+
static const struct proto_ops vsock_stream_ops = {
.family = PF_VSOCK,
.owner = THIS_MODULE,
--
2.25.1

2021-01-15 05:57:51

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 05/13] af_vsock: implement send logic for SOCK_SEQPACKET

This adds some logic to current stream enqueue function for SEQPACKET
support:
1) Send record begin marker with length of record.
2) Return value from enqueue function is wholevrecord length or error for
SOCK_SEQPACKET.

Signed-off-by: Arseny Krasnov <[email protected]>
---
include/net/af_vsock.h | 1 +
net/vmw_vsock/af_vsock.c | 32 ++++++++++++++++++++++++++++----
2 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index 46073842d489..ec6bf4600ef8 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -136,6 +136,7 @@ struct vsock_transport {
bool (*stream_allow)(u32 cid, u32 port);

/* SEQ_PACKET. */
+ bool (*seqpacket_seq_send_len)(struct vsock_sock *, size_t len);
size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
ssize_t (*seqpacket_dequeue)(struct vsock_sock *, struct msghdr *,
size_t len, int flags);
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 5bf887190881..4a7cdf7756c0 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1683,8 +1683,8 @@ static int vsock_stream_getsockopt(struct socket *sock,
return 0;
}

-static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
- size_t len)
+static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
{
struct sock *sk;
struct vsock_sock *vsk;
@@ -1737,6 +1737,12 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
if (err < 0)
goto out;

+ if (sk->sk_type == SOCK_SEQPACKET) {
+ err = transport->seqpacket_seq_send_len(vsk, len);
+ if (err < 0)
+ goto out;
+ }
+
while (total_written < len) {
ssize_t written;

@@ -1815,13 +1821,31 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
}

out_err:
- if (total_written > 0)
- err = total_written;
+ if (total_written > 0) {
+ /* Return number of written bytes only if:
+ * 1) SOCK_STREAM socket.
+ * 2) SOCK_SEQPACKET socket when whole buffer is sent.
+ */
+ if (sk->sk_type == SOCK_STREAM || total_written == len)
+ err = total_written;
+ }
out:
release_sock(sk);
return err;
}

+static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
+{
+ return vsock_connectible_sendmsg(sock, msg, len);
+}
+
+static int vsock_seqpacket_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
+{
+ return vsock_connectible_sendmsg(sock, msg, len);
+}
+
static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
long timeout,
struct vsock_transport_recv_notify_data *recv_data,
--
2.25.1

2021-01-15 05:58:09

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 06/13] af_vsock: general support of SOCK_SEQPACKET type.

This adds socket operations for SOCK_SEQPACKET and adds this type of
socket for conditions where SOCK_STREAM is involved because both type of
sockets are connect oriented.

Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 108 +++++++++++++++++++++++++++++++++------
1 file changed, 92 insertions(+), 16 deletions(-)

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 4a7cdf7756c0..d0ef066e9352 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -452,6 +452,7 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
new_transport = transport_dgram;
break;
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
if (vsock_use_local_transport(remote_cid))
new_transport = transport_local;
else if (remote_cid <= VMADDR_CID_HOST || !transport_h2g ||
@@ -459,6 +460,13 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
new_transport = transport_g2h;
else
new_transport = transport_h2g;
+
+ if (sk->sk_type == SOCK_SEQPACKET) {
+ if (!new_transport->seqpacket_seq_send_len ||
+ !new_transport->seqpacket_seq_get_len ||
+ !new_transport->seqpacket_dequeue)
+ return -ENODEV;
+ }
break;
default:
return -ESOCKTNOSUPPORT;
@@ -604,8 +612,8 @@ static void vsock_pending_work(struct work_struct *work)

/**** SOCKET OPERATIONS ****/

-static int __vsock_bind_stream(struct vsock_sock *vsk,
- struct sockaddr_vm *addr)
+static int __vsock_bind_connectible(struct vsock_sock *vsk,
+ struct sockaddr_vm *addr)
{
static u32 port;
struct sockaddr_vm new_addr;
@@ -684,8 +692,9 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)

switch (sk->sk_socket->type) {
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
spin_lock_bh(&vsock_table_lock);
- retval = __vsock_bind_stream(vsk, addr);
+ retval = __vsock_bind_connectible(vsk, addr);
spin_unlock_bh(&vsock_table_lock);
break;

@@ -767,6 +776,11 @@ static struct sock *__vsock_create(struct net *net,
return sk;
}

+static bool sock_type_connectible(u16 type)
+{
+ return (type == SOCK_STREAM || type == SOCK_SEQPACKET);
+}
+
static void __vsock_release(struct sock *sk, int level)
{
if (sk) {
@@ -785,7 +799,7 @@ static void __vsock_release(struct sock *sk, int level)

if (vsk->transport)
vsk->transport->release(vsk);
- else if (sk->sk_type == SOCK_STREAM)
+ else if (sock_type_connectible(sk->sk_type))
vsock_remove_sock(vsk);

sock_orphan(sk);
@@ -945,7 +959,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
sk = sock->sk;
if (sock->state == SS_UNCONNECTED) {
err = -ENOTCONN;
- if (sk->sk_type == SOCK_STREAM)
+ if (sock_type_connectible(sk->sk_type))
return err;
} else {
sock->state = SS_DISCONNECTING;
@@ -960,7 +974,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
sk->sk_state_change(sk);
release_sock(sk);

- if (sk->sk_type == SOCK_STREAM) {
+ if (sock_type_connectible(sk->sk_type)) {
sock_reset_flag(sk, SOCK_DONE);
vsock_send_shutdown(sk, mode);
}
@@ -1013,7 +1027,7 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
if (!(sk->sk_shutdown & SEND_SHUTDOWN))
mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;

- } else if (sock->type == SOCK_STREAM) {
+ } else if (sock_type_connectible(sk->sk_type)) {
const struct vsock_transport *transport = vsk->transport;
lock_sock(sk);

@@ -1259,8 +1273,8 @@ static void vsock_connect_timeout(struct work_struct *work)
sock_put(sk);
}

-static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
- int addr_len, int flags)
+static int vsock_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
{
int err;
struct sock *sk;
@@ -1410,7 +1424,7 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,

lock_sock(listener);

- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sock->type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1477,6 +1491,18 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
return err;
}

+static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
+{
+ return vsock_connect(sock, addr, addr_len, flags);
+}
+
+static int vsock_seqpacket_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
+{
+ return vsock_connect(sock, addr, addr_len, flags);
+}
+
static int vsock_listen(struct socket *sock, int backlog)
{
int err;
@@ -1487,7 +1513,7 @@ static int vsock_listen(struct socket *sock, int backlog)

lock_sock(sk);

- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sk->sk_type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1531,11 +1557,11 @@ static void vsock_update_buffer_size(struct vsock_sock *vsk,
vsk->buffer_size = val;
}

-static int vsock_stream_setsockopt(struct socket *sock,
- int level,
- int optname,
- sockptr_t optval,
- unsigned int optlen)
+static int vsock_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
{
int err;
struct sock *sk;
@@ -1612,6 +1638,24 @@ static int vsock_stream_setsockopt(struct socket *sock,
return err;
}

+static int vsock_seqpacket_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
+{
+ return vsock_setsockopt(sock, level, optname, optval, optlen);
+}
+
+static int vsock_stream_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
+{
+ return vsock_setsockopt(sock, level, optname, optval, optlen);
+}
+
static int vsock_stream_getsockopt(struct socket *sock,
int level, int optname,
char __user *optval,
@@ -1683,6 +1727,14 @@ static int vsock_stream_getsockopt(struct socket *sock,
return 0;
}

+static int vsock_seqpacket_getsockopt(struct socket *sock,
+ int level, int optname,
+ char __user *optval,
+ int __user *optlen)
+{
+ return vsock_stream_getsockopt(sock, level, optname, optval, optlen);
+}
+
static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
size_t len)
{
@@ -2184,6 +2236,27 @@ static const struct proto_ops vsock_stream_ops = {
.sendpage = sock_no_sendpage,
};

+static const struct proto_ops vsock_seqpacket_ops = {
+ .family = PF_VSOCK,
+ .owner = THIS_MODULE,
+ .release = vsock_release,
+ .bind = vsock_bind,
+ .connect = vsock_seqpacket_connect,
+ .socketpair = sock_no_socketpair,
+ .accept = vsock_accept,
+ .getname = vsock_getname,
+ .poll = vsock_poll,
+ .ioctl = sock_no_ioctl,
+ .listen = vsock_listen,
+ .shutdown = vsock_shutdown,
+ .setsockopt = vsock_seqpacket_setsockopt,
+ .getsockopt = vsock_seqpacket_getsockopt,
+ .sendmsg = vsock_seqpacket_sendmsg,
+ .recvmsg = vsock_seqpacket_recvmsg,
+ .mmap = sock_no_mmap,
+ .sendpage = sock_no_sendpage,
+};
+
static int vsock_create(struct net *net, struct socket *sock,
int protocol, int kern)
{
@@ -2204,6 +2277,9 @@ static int vsock_create(struct net *net, struct socket *sock,
case SOCK_STREAM:
sock->ops = &vsock_stream_ops;
break;
+ case SOCK_SEQPACKET:
+ sock->ops = &vsock_seqpacket_ops;
+ break;
default:
return -ESOCKTNOSUPPORT;
}
--
2.25.1

2021-01-15 05:58:12

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 07/13] af_vsock: update comments for stream sockets.

This replaces 'stream' to 'connect oriented' in comments as SEQPACKET is
also connect oriented.

Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 31 +++++++++++++++++--------------
1 file changed, 17 insertions(+), 14 deletions(-)

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index d0ef066e9352..97dcaa410ee4 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -415,8 +415,8 @@ static void vsock_deassign_transport(struct vsock_sock *vsk)

/* Assign a transport to a socket and call the .init transport callback.
*
- * Note: for stream socket this must be called when vsk->remote_addr is set
- * (e.g. during the connect() or when a connection request on a listener
+ * Note: for connect oriented socket this must be called when vsk->remote_addr
+ * is set (e.g. during the connect() or when a connection request on a listener
* socket is received).
* The vsk->remote_addr is used to decide which transport to use:
* - remote CID == VMADDR_CID_LOCAL or g2h->local_cid or VMADDR_CID_HOST if
@@ -477,10 +477,10 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
return 0;

/* transport->release() must be called with sock lock acquired.
- * This path can only be taken during vsock_stream_connect(),
- * where we have already held the sock lock.
- * In the other cases, this function is called on a new socket
- * which is not assigned to any transport.
+ * This path can only be taken during vsock_connect(), where we
+ * have already held the sock lock. In the other cases, this
+ * function is called on a new socket which is not assigned to
+ * any transport.
*/
vsk->transport->release(vsk);
vsock_deassign_transport(vsk);
@@ -657,9 +657,10 @@ static int __vsock_bind_connectible(struct vsock_sock *vsk,

vsock_addr_init(&vsk->local_addr, new_addr.svm_cid, new_addr.svm_port);

- /* Remove stream sockets from the unbound list and add them to the hash
- * table for easy lookup by its address. The unbound list is simply an
- * extra entry at the end of the hash table, a trick used by AF_UNIX.
+ /* Remove connect oriented sockets from the unbound list and add them
+ * to the hash table for easy lookup by its address. The unbound list
+ * is simply an extra entry at the end of the hash table, a trick used
+ * by AF_UNIX.
*/
__vsock_remove_bound(vsk);
__vsock_insert_bound(vsock_bound_sockets(&vsk->local_addr), vsk);
@@ -950,10 +951,10 @@ static int vsock_shutdown(struct socket *sock, int mode)
if ((mode & ~SHUTDOWN_MASK) || !mode)
return -EINVAL;

- /* If this is a STREAM socket and it is not connected then bail out
- * immediately. If it is a DGRAM socket then we must first kick the
- * socket so that it wakes up from any sleeping calls, for example
- * recv(), and then afterwards return the error.
+ /* If this is a connect oriented socket and it is not connected then
+ * bail out immediately. If it is a DGRAM socket then we must first
+ * kick the socket so that it wakes up from any sleeping calls, for
+ * example recv(), and then afterwards return the error.
*/

sk = sock->sk;
@@ -1758,7 +1759,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,

lock_sock(sk);

- /* Callers should not provide a destination with stream sockets. */
+ /* Callers should not provide a destination with connect oriented
+ * sockets.
+ */
if (msg->msg_namelen) {
err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
goto out;
--
2.25.1

2021-01-15 05:58:35

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 08/13] virtio/vsock: dequeue callback for SOCK_SEQPACKET.

This adds transport callback and it's logic for SEQPACKET dequeue.
Callback fetches RW packets from rx queue of socket until whole record
is copied(if user's buffer is full, user is not woken up). This is done
to not stall sender, because if we wake up user and it leaves syscall,
nobody will send credit update for rest of record, and sender will wait
for next enter of read syscall at receiver's side. So if user buffer is
full, we just send credit update and drop data. If during copy SEQ_BEGIN
was found(and not all data was copied), copying is restarted by reset
user's iov iterator(previous unfinished data is dropped).

Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 4 +
include/uapi/linux/virtio_vsock.h | 9 ++
net/vmw_vsock/virtio_transport_common.c | 128 ++++++++++++++++++++++++
3 files changed, 141 insertions(+)

diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index dc636b727179..7f0ef5204e33 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -36,6 +36,10 @@ struct virtio_vsock_sock {
u32 rx_bytes;
u32 buf_alloc;
struct list_head rx_queue;
+
+ /* For SOCK_SEQPACKET */
+ u32 user_read_seq_len;
+ u32 user_read_copied;
};

struct virtio_vsock_pkt {
diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
index 1d57ed3d84d2..058908bc19fc 100644
--- a/include/uapi/linux/virtio_vsock.h
+++ b/include/uapi/linux/virtio_vsock.h
@@ -65,6 +65,7 @@ struct virtio_vsock_hdr {

enum virtio_vsock_type {
VIRTIO_VSOCK_TYPE_STREAM = 1,
+ VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
};

enum virtio_vsock_op {
@@ -83,6 +84,9 @@ enum virtio_vsock_op {
VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
/* Request the peer to send the credit info to us */
VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
+
+ /* Record begin for SOCK_SEQPACKET */
+ VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
};

/* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
@@ -91,4 +95,9 @@ enum virtio_vsock_shutdown {
VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
};

+/* VIRTIO_VSOCK_OP_RW flags values for SOCK_SEQPACKET type */
+enum virtio_vsock_rw_seqpacket {
+ VIRTIO_VSOCK_RW_EOR = 1,
+};
+
#endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 5956939eebb7..4328f653a477 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -397,6 +397,132 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
return err;
}

+static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
+{
+ list_del(&pkt->list);
+ virtio_transport_free_pkt(pkt);
+}
+
+static size_t virtio_transport_drop_until_seq_begin(struct virtio_vsock_sock *vvs)
+{
+ struct virtio_vsock_pkt *pkt, *n;
+ size_t bytes_dropped = 0;
+
+ list_for_each_entry_safe(pkt, n, &vvs->rx_queue, list) {
+ if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_SEQ_BEGIN)
+ break;
+
+ bytes_dropped += le32_to_cpu(pkt->hdr.len);
+ virtio_transport_dec_rx_pkt(vvs, pkt);
+ virtio_transport_del_n_free_pkt(pkt);
+ }
+
+ return bytes_dropped;
+}
+
+static ssize_t virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ size_t user_buf_len)
+{
+ struct virtio_vsock_sock *vvs = vsk->trans;
+ struct virtio_vsock_pkt *pkt;
+ size_t bytes_handled = 0;
+ int err = 0;
+
+ spin_lock_bh(&vvs->rx_lock);
+
+ if (user_buf_len == 0) {
+ /* User's buffer is full, we processing rest of
+ * record and drop it. If 'SEQ_BEGIN' is found
+ * while iterating, user will be woken up,
+ * because record is already copied, and we
+ * don't care about absent of some tail RW packets
+ * of it. Return number of bytes(rest of record),
+ * but ignore credit update for such absent bytes.
+ */
+ bytes_handled = virtio_transport_drop_until_seq_begin(vvs);
+ vvs->user_read_copied += bytes_handled;
+
+ if (!list_empty(&vvs->rx_queue) &&
+ vvs->user_read_copied < vvs->user_read_seq_len) {
+ /* 'SEQ_BEGIN' found, but record isn't complete.
+ * Set number of copied bytes to fit record size
+ * and force counters to finish receiving.
+ */
+ bytes_handled += (vvs->user_read_seq_len - vvs->user_read_copied);
+ vvs->user_read_copied = vvs->user_read_seq_len;
+ }
+ }
+
+ /* Now start copying. */
+ while (vvs->user_read_copied < vvs->user_read_seq_len &&
+ vvs->rx_bytes &&
+ user_buf_len &&
+ !err) {
+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+ switch (le16_to_cpu(pkt->hdr.op)) {
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
+ /* Unexpected 'SEQ_BEGIN' during record copy:
+ * Leave receive loop, 'EAGAIN' will restart it from
+ * outer receive loop, packet is still in queue and
+ * counters are cleared. So in next loop enter,
+ * 'SEQ_BEGIN' will be dequeued first. User's iov
+ * iterator will be reset in outer loop. Also
+ * send credit update, because some bytes could be
+ * copied. User will never see unfinished record.
+ */
+ err = -EAGAIN;
+ break;
+ }
+ case VIRTIO_VSOCK_OP_RW: {
+ size_t bytes_to_copy;
+ size_t pkt_len;
+
+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
+ bytes_to_copy = min(user_buf_len, pkt_len);
+
+ /* sk_lock is held by caller so no one else can dequeue.
+ * Unlock rx_lock since memcpy_to_msg() may sleep.
+ */
+ spin_unlock_bh(&vvs->rx_lock);
+
+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
+ spin_lock_bh(&vvs->rx_lock);
+ err = -EINVAL;
+ break;
+ }
+
+ spin_lock_bh(&vvs->rx_lock);
+ user_buf_len -= bytes_to_copy;
+ bytes_handled += pkt->len;
+ vvs->user_read_copied += bytes_to_copy;
+
+ if (le16_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_RW_EOR)
+ msg->msg_flags |= MSG_EOR;
+ break;
+ }
+ default:
+ ;
+ }
+
+ /* For unexpected 'SEQ_BEGIN', keep such packet in queue,
+ * but drop any other type of packet.
+ */
+ if (le16_to_cpu(pkt->hdr.op) != VIRTIO_VSOCK_OP_SEQ_BEGIN) {
+ virtio_transport_dec_rx_pkt(vvs, pkt);
+ virtio_transport_del_n_free_pkt(pkt);
+ }
+ }
+
+ spin_unlock_bh(&vvs->rx_lock);
+
+ virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_SEQPACKET,
+ NULL);
+
+ return err ?: bytes_handled;
+}
+
ssize_t
virtio_transport_stream_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
@@ -481,6 +607,8 @@ int virtio_transport_do_socket_init(struct vsock_sock *vsk,
spin_lock_init(&vvs->rx_lock);
spin_lock_init(&vvs->tx_lock);
INIT_LIST_HEAD(&vvs->rx_queue);
+ vvs->user_read_copied = 0;
+ vvs->user_read_seq_len = 0;

return 0;
}
--
2.25.1

2021-01-15 05:58:50

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 09/13] virtio/vsock: implement fetch of record length

This adds transport callback which tries to fetch record begin marker from
socket's rx queue. It is called from af_vsock.c before reading data
packets of record.

Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 1 +
net/vmw_vsock/virtio_transport_common.c | 33 +++++++++++++++++++++++++
2 files changed, 34 insertions(+)

diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index 7f0ef5204e33..af8705ea8b95 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -84,6 +84,7 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);

+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);

diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 4328f653a477..fe1272e74517 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -420,6 +420,39 @@ static size_t virtio_transport_drop_until_seq_begin(struct virtio_vsock_sock *vv
return bytes_dropped;
}

+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk)
+{
+ struct virtio_vsock_sock *vvs = vsk->trans;
+ struct virtio_vsock_pkt *pkt;
+ size_t bytes_dropped;
+
+ spin_lock_bh(&vvs->rx_lock);
+
+ /* Fetch all orphaned 'RW', packets, and
+ * send credit update.
+ */
+ bytes_dropped = virtio_transport_drop_until_seq_begin(vvs);
+
+ if (list_empty(&vvs->rx_queue))
+ goto out;
+
+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+ vvs->user_read_copied = 0;
+ vvs->user_read_seq_len = le32_to_cpu(pkt->hdr.flags);
+ virtio_transport_del_n_free_pkt(pkt);
+out:
+ spin_unlock_bh(&vvs->rx_lock);
+
+ if (bytes_dropped)
+ virtio_transport_send_credit_update(vsk,
+ VIRTIO_VSOCK_TYPE_SEQPACKET,
+ NULL);
+
+ return vvs->user_read_seq_len;
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_get_len);
+
static ssize_t virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t user_buf_len)
--
2.25.1

2021-01-15 05:58:57

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 03/13] af_vsock: implement rx loops entry point

This adds entry point for STREAM/SEQPACKET rx loops. As both types are
connect oriented, so there are same checks before reading data from
socket. All this checks are performed in this entry point, then specific
rx loop is called.

Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 55 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 55 insertions(+)

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index afacbe9f4231..38afaa90d141 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -2244,6 +2244,61 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
return err;
}

+static int vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg,
+ size_t len, int flags)
+{
+ struct sock *sk;
+ int err = 0;
+ struct vsock_sock *vsk;
+ const struct vsock_transport *transport;
+
+ sk = sock->sk;
+
+ lock_sock(sk);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
+ if (!transport || sk->sk_state != TCP_ESTABLISHED) {
+ /* Recvmsg is supposed to return 0 if a peer performs an
+ * orderly shutdown. Differentiate between that case and when a
+ * peer has not connected or a local shutdown occurred with the
+ * SOCK_DONE flag.
+ */
+ if (!sock_flag(sk, SOCK_DONE))
+ err = -ENOTCONN;
+
+ goto out;
+ }
+
+ if (flags & MSG_OOB) {
+ err = -EOPNOTSUPP;
+ goto out;
+ }
+
+ /* We don't check peer_shutdown flag here since peer may actually shut
+ * down, but there can be data in the queue that a local socket can
+ * receive.
+ */
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ goto out;
+
+ /* It is valid on Linux to pass in a zero-length receive buffer. This
+ * is not an error. We may as well bail out now.
+ */
+ if (!len)
+ goto out;
+
+ if (sk->sk_type == SOCK_STREAM)
+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
+ else
+ err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
+
+out:
+ release_sock(sk);
+ return err;
+}
+
static const struct proto_ops vsock_stream_ops = {
.family = PF_VSOCK,
.owner = THIS_MODULE,
--
2.25.1

2021-01-15 05:59:00

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 10/13] virtio/vsock: update receive logic

This modifies current receive logic for SEQPACKET support:
1) Add 'SEQ_BEGIN' packet to socket's rx queue.
2) Add 'RW' packet to rx queue, but without merging inside buffer of last
packet in queue.
3) Perform check for packet type and socket type on receive(if mismatch,
then reset connection).

Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/virtio_transport_common.c | 79 ++++++++++++++++++-------
1 file changed, 58 insertions(+), 21 deletions(-)

diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index fe1272e74517..c3e07eb1c666 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -397,6 +397,14 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
return err;
}

+static u16 virtio_transport_get_type(struct sock *sk)
+{
+ if (sk->sk_type == SOCK_STREAM)
+ return VIRTIO_VSOCK_TYPE_STREAM;
+ else
+ return VIRTIO_VSOCK_TYPE_SEQPACKET;
+}
+
static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
{
list_del(&pkt->list);
@@ -1050,39 +1058,49 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
struct virtio_vsock_pkt *pkt)
{
struct virtio_vsock_sock *vvs = vsk->trans;
- bool can_enqueue, free_pkt = false;
+ bool free_pkt = false;

pkt->len = le32_to_cpu(pkt->hdr.len);
pkt->off = 0;

spin_lock_bh(&vvs->rx_lock);

- can_enqueue = virtio_transport_inc_rx_pkt(vvs, pkt);
- if (!can_enqueue) {
+ if (!virtio_transport_inc_rx_pkt(vvs, pkt)) {
free_pkt = true;
goto out;
}

- /* Try to copy small packets into the buffer of last packet queued,
- * to avoid wasting memory queueing the entire buffer with a small
- * payload.
- */
- if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
- struct virtio_vsock_pkt *last_pkt;
+ switch (le32_to_cpu(pkt->hdr.type)) {
+ case VIRTIO_VSOCK_TYPE_STREAM: {
+ /* Try to copy small packets into the buffer of last packet queued,
+ * to avoid wasting memory queueing the entire buffer with a small
+ * payload.
+ */
+ if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
+ struct virtio_vsock_pkt *last_pkt;

- last_pkt = list_last_entry(&vvs->rx_queue,
- struct virtio_vsock_pkt, list);
+ last_pkt = list_last_entry(&vvs->rx_queue,
+ struct virtio_vsock_pkt, list);

- /* If there is space in the last packet queued, we copy the
- * new packet in its buffer.
- */
- if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
- memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
- pkt->len);
- last_pkt->len += pkt->len;
- free_pkt = true;
- goto out;
+ /* If there is space in the last packet queued, we copy the
+ * new packet in its buffer.
+ */
+ if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
+ memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
+ pkt->len);
+ last_pkt->len += pkt->len;
+ free_pkt = true;
+ goto out;
+ }
}
+
+ break;
+ }
+ case VIRTIO_VSOCK_TYPE_SEQPACKET: {
+ break;
+ }
+ default:
+ goto out;
}

list_add_tail(&pkt->list, &vvs->rx_queue);
@@ -1101,6 +1119,14 @@ virtio_transport_recv_connected(struct sock *sk,
int err = 0;

switch (le16_to_cpu(pkt->hdr.op)) {
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
+ struct virtio_vsock_sock *vvs = vsk->trans;
+
+ spin_lock_bh(&vvs->rx_lock);
+ list_add_tail(&pkt->list, &vvs->rx_queue);
+ spin_unlock_bh(&vvs->rx_lock);
+ return err;
+ }
case VIRTIO_VSOCK_OP_RW:
virtio_transport_recv_enqueue(vsk, pkt);
sk->sk_data_ready(sk);
@@ -1247,6 +1273,12 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
return 0;
}

+static bool virtio_transport_valid_type(u16 type)
+{
+ return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
+ (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
+}
+
/* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
* lock.
*/
@@ -1272,7 +1304,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
le32_to_cpu(pkt->hdr.buf_alloc),
le32_to_cpu(pkt->hdr.fwd_cnt));

- if (le16_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_STREAM) {
+ if (!virtio_transport_valid_type(le16_to_cpu(pkt->hdr.type))) {
(void)virtio_transport_reset_no_sock(t, pkt);
goto free_pkt;
}
@@ -1289,6 +1321,11 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
}
}

+ if (virtio_transport_get_type(sk) != le16_to_cpu(pkt->hdr.type)) {
+ (void)virtio_transport_reset_no_sock(t, pkt);
+ goto free_pkt;
+ }
+
vsk = vsock_sk(sk);

space_available = virtio_transport_space_update(sk, pkt);
--
2.25.1

2021-01-15 06:00:18

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 11/13] virtio/vsock: rest of SOCK_SEQPACKET support

This adds rest of logic for SEQPACKET:
1) Shared functions for packet sending now set valid type of packet
according socket type.
2) SEQPACKET specific function like SEQ_BEGIN send and data dequeue.
3) Ops for virtio transport.
4) TAP support for SEQPACKET is not so easy if it is necessary to send
whole record to TAP interface. This could be done by allocating
new packet when whole record is received, data of record must be
copied to TAP packet.

Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 7 ++++
net/vmw_vsock/virtio_transport.c | 4 ++
net/vmw_vsock/virtio_transport_common.c | 54 ++++++++++++++++++++++---
3 files changed, 59 insertions(+), 6 deletions(-)

diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index af8705ea8b95..ad9783df97c9 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -84,7 +84,14 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);

+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len);
size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ size_t len,
+ int type);
+
s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);

diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
index 2700a63ab095..5a7ab1befee8 100644
--- a/net/vmw_vsock/virtio_transport.c
+++ b/net/vmw_vsock/virtio_transport.c
@@ -469,6 +469,10 @@ static struct virtio_transport virtio_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,

+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index c3e07eb1c666..5fdf1adfdaab 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -139,6 +139,7 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
break;
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN:
hdr->op = cpu_to_le16(AF_VSOCK_OP_CONTROL);
break;
default:
@@ -157,6 +158,10 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)

void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
{
+ /* TODO: implement tap support for SOCK_SEQPACKET. */
+ if (le32_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET)
+ return;
+
if (pkt->tap_delivered)
return;

@@ -405,6 +410,19 @@ static u16 virtio_transport_get_type(struct sock *sk)
return VIRTIO_VSOCK_TYPE_SEQPACKET;
}

+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len)
+{
+ struct virtio_vsock_pkt_info info = {
+ .type = VIRTIO_VSOCK_TYPE_SEQPACKET,
+ .op = VIRTIO_VSOCK_OP_SEQ_BEGIN,
+ .vsk = vsk,
+ .flags = len
+ };
+
+ return virtio_transport_send_pkt_info(vsk, &info);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_send_len);
+
static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
{
list_del(&pkt->list);
@@ -576,6 +594,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
}
EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);

+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ size_t len, int flags)
+{
+ if (flags & MSG_PEEK)
+ return -EOPNOTSUPP;
+
+ return virtio_transport_seqpacket_do_dequeue(vsk, msg, len);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
+
int
virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
@@ -659,13 +689,15 @@ EXPORT_SYMBOL_GPL(virtio_transport_do_socket_init);
void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)
{
struct virtio_vsock_sock *vvs = vsk->trans;
+ int type;

if (*val > VIRTIO_VSOCK_MAX_BUF_SIZE)
*val = VIRTIO_VSOCK_MAX_BUF_SIZE;

vvs->buf_alloc = *val;

- virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_STREAM,
+ type = virtio_transport_get_type(sk_vsock(vsk));
+ virtio_transport_send_credit_update(vsk, type,
NULL);
}
EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);
@@ -793,10 +825,11 @@ int virtio_transport_connect(struct vsock_sock *vsk)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_REQUEST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.vsk = vsk,
};

+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
EXPORT_SYMBOL_GPL(virtio_transport_connect);
@@ -805,7 +838,6 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_SHUTDOWN,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.flags = (mode & RCV_SHUTDOWN ?
VIRTIO_VSOCK_SHUTDOWN_RCV : 0) |
(mode & SEND_SHUTDOWN ?
@@ -813,6 +845,8 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
.vsk = vsk,
};

+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
EXPORT_SYMBOL_GPL(virtio_transport_shutdown);
@@ -834,12 +868,18 @@ virtio_transport_stream_enqueue(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RW,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.msg = msg,
.pkt_len = len,
.vsk = vsk,
+ .flags = 0,
};

+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
+ if (info.type == VIRTIO_VSOCK_TYPE_SEQPACKET &&
+ msg->msg_flags & MSG_EOR)
+ info.flags |= VIRTIO_VSOCK_RW_EOR;
+
return virtio_transport_send_pkt_info(vsk, &info);
}
EXPORT_SYMBOL_GPL(virtio_transport_stream_enqueue);
@@ -857,7 +897,6 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.reply = !!pkt,
.vsk = vsk,
};
@@ -866,6 +905,8 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
if (pkt && le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RST)
return 0;

+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}

@@ -1177,13 +1218,14 @@ virtio_transport_send_response(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RESPONSE,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.remote_cid = le64_to_cpu(pkt->hdr.src_cid),
.remote_port = le32_to_cpu(pkt->hdr.src_port),
.reply = true,
.vsk = vsk,
};

+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}

--
2.25.1

2021-01-15 06:00:22

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 13/13] vsock_test: add SOCK_SEQPACKET tests.

This adds two tests of SOCK_SEQPACKET socket: both transfer data and then
test MSG_EOR and MSG_TRUNC flags. Cases for connect(), bind(), etc. are
not tested, because it is same as for stream socket.

Signed-off-by: Arseny Krasnov <[email protected]>
---
tools/testing/vsock/util.c | 32 ++++++--
tools/testing/vsock/util.h | 3 +
tools/testing/vsock/vsock_test.c | 126 +++++++++++++++++++++++++++++++
3 files changed, 156 insertions(+), 5 deletions(-)

diff --git a/tools/testing/vsock/util.c b/tools/testing/vsock/util.c
index 93cbd6f603f9..2acbb7703c6a 100644
--- a/tools/testing/vsock/util.c
+++ b/tools/testing/vsock/util.c
@@ -84,7 +84,7 @@ void vsock_wait_remote_close(int fd)
}

/* Connect to <cid, port> and return the file descriptor. */
-int vsock_stream_connect(unsigned int cid, unsigned int port)
+static int vsock_connect(unsigned int cid, unsigned int port, int type)
{
union {
struct sockaddr sa;
@@ -101,7 +101,7 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)

control_expectln("LISTENING");

- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
+ fd = socket(AF_VSOCK, type, 0);

timeout_begin(TIMEOUT);
do {
@@ -120,11 +120,21 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)
return fd;
}

+int vsock_stream_connect(unsigned int cid, unsigned int port)
+{
+ return vsock_connect(cid, port, SOCK_STREAM);
+}
+
+int vsock_seqpacket_connect(unsigned int cid, unsigned int port)
+{
+ return vsock_connect(cid, port, SOCK_SEQPACKET);
+}
+
/* Listen on <cid, port> and return the first incoming connection. The remote
* address is stored to clientaddrp. clientaddrp may be NULL.
*/
-int vsock_stream_accept(unsigned int cid, unsigned int port,
- struct sockaddr_vm *clientaddrp)
+static int vsock_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp, int type)
{
union {
struct sockaddr sa;
@@ -145,7 +155,7 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
int client_fd;
int old_errno;

- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
+ fd = socket(AF_VSOCK, type, 0);

if (bind(fd, &addr.sa, sizeof(addr.svm)) < 0) {
perror("bind");
@@ -189,6 +199,18 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
return client_fd;
}

+int vsock_stream_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp)
+{
+ return vsock_accept(cid, port, clientaddrp, SOCK_STREAM);
+}
+
+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp)
+{
+ return vsock_accept(cid, port, clientaddrp, SOCK_SEQPACKET);
+}
+
/* Transmit one byte and check the return value.
*
* expected_ret:
diff --git a/tools/testing/vsock/util.h b/tools/testing/vsock/util.h
index e53dd09d26d9..a3375ad2fb7f 100644
--- a/tools/testing/vsock/util.h
+++ b/tools/testing/vsock/util.h
@@ -36,8 +36,11 @@ struct test_case {
void init_signals(void);
unsigned int parse_cid(const char *str);
int vsock_stream_connect(unsigned int cid, unsigned int port);
+int vsock_seqpacket_connect(unsigned int cid, unsigned int port);
int vsock_stream_accept(unsigned int cid, unsigned int port,
struct sockaddr_vm *clientaddrp);
+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp);
void vsock_wait_remote_close(int fd);
void send_byte(int fd, int expected_ret, int flags);
void recv_byte(int fd, int expected_ret, int flags);
diff --git a/tools/testing/vsock/vsock_test.c b/tools/testing/vsock/vsock_test.c
index 5a4fb80fa832..db6cc49fa5e4 100644
--- a/tools/testing/vsock/vsock_test.c
+++ b/tools/testing/vsock/vsock_test.c
@@ -14,6 +14,8 @@
#include <errno.h>
#include <unistd.h>
#include <linux/kernel.h>
+#include <sys/types.h>
+#include <sys/socket.h>

#include "timeout.h"
#include "control.h"
@@ -279,6 +281,120 @@ static void test_stream_msg_peek_server(const struct test_opts *opts)
close(fd);
}

+#define MESSAGES_CNT 7
+#define MESSAGE_EOR_IDX (MESSAGES_CNT / 2)
+static void test_seqpacket_msg_send_client(const struct test_opts *opts)
+{
+ int fd;
+
+ fd = vsock_seqpacket_connect(opts->peer_cid, 1234);
+ if (fd < 0) {
+ perror("connect");
+ exit(EXIT_FAILURE);
+ }
+
+ /* Send several messages, one with MSG_EOR flag */
+ for (int i = 0; i < MESSAGES_CNT; i++)
+ send_byte(fd, 1, (i != MESSAGE_EOR_IDX) ? 0 : MSG_EOR);
+
+ control_writeln("SENDDONE");
+ close(fd);
+}
+
+static void test_seqpacket_msg_send_server(const struct test_opts *opts)
+{
+ int fd;
+ char buf[16];
+ struct msghdr msg = {0};
+ struct iovec iov = {0};
+
+ fd = vsock_seqpacket_accept(VMADDR_CID_ANY, 1234, NULL);
+ if (fd < 0) {
+ perror("accept");
+ exit(EXIT_FAILURE);
+ }
+
+ control_expectln("SENDDONE");
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ for (int i = 0; i < MESSAGES_CNT; i++) {
+ if (recvmsg(fd, &msg, 0) != 1) {
+ perror("message bound violated");
+ exit(EXIT_FAILURE);
+ }
+
+ if (i == MESSAGE_EOR_IDX) {
+ if (!(msg.msg_flags & MSG_EOR)) {
+ fprintf(stderr, "MSG_EOR flag expected\n");
+ exit(EXIT_FAILURE);
+ }
+ } else {
+ if (msg.msg_flags & MSG_EOR) {
+ fprintf(stderr, "unexpected MSG_EOR flag\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+ }
+
+ close(fd);
+}
+
+#define MESSAGE_TRUNC_SZ 32
+static void test_seqpacket_msg_trunc_client(const struct test_opts *opts)
+{
+ int fd;
+ char buf[MESSAGE_TRUNC_SZ];
+
+ fd = vsock_seqpacket_connect(opts->peer_cid, 1234);
+ if (fd < 0) {
+ perror("connect");
+ exit(EXIT_FAILURE);
+ }
+
+ if (send(fd, buf, sizeof(buf), 0) != sizeof(buf)) {
+ perror("send failed");
+ exit(EXIT_FAILURE);
+ }
+
+ control_writeln("SENDDONE");
+ close(fd);
+}
+
+static void test_seqpacket_msg_trunc_server(const struct test_opts *opts)
+{
+ int fd;
+ char buf[MESSAGE_TRUNC_SZ / 2];
+ struct msghdr msg = {0};
+ struct iovec iov = {0};
+
+ fd = vsock_seqpacket_accept(VMADDR_CID_ANY, 1234, NULL);
+ if (fd < 0) {
+ perror("accept");
+ exit(EXIT_FAILURE);
+ }
+
+ control_expectln("SENDDONE");
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ if (recvmsg(fd, &msg, MSG_TRUNC) != MESSAGE_TRUNC_SZ) {
+ perror("MSG_TRUNC doesn't work");
+ exit(EXIT_FAILURE);
+ }
+
+ if (!(msg.msg_flags & MSG_TRUNC)) {
+ fprintf(stderr, "MSG_TRUNC expected\n");
+ exit(EXIT_FAILURE);
+ }
+
+ close(fd);
+}
+
static struct test_case test_cases[] = {
{
.name = "SOCK_STREAM connection reset",
@@ -309,6 +425,16 @@ static struct test_case test_cases[] = {
.run_client = test_stream_msg_peek_client,
.run_server = test_stream_msg_peek_server,
},
+ {
+ .name = "SOCK_SEQPACKET send data MSG_EOR",
+ .run_client = test_seqpacket_msg_send_client,
+ .run_server = test_seqpacket_msg_send_server,
+ },
+ {
+ .name = "SOCK_SEQPACKET send data MSG_TRUNC",
+ .run_client = test_seqpacket_msg_trunc_client,
+ .run_server = test_seqpacket_msg_trunc_server,
+ },
{},
};

--
2.25.1

2021-01-15 06:00:27

by Arseny Krasnov

[permalink] [raw]
Subject: [RFC PATCH v2 12/13] vhost/vsock: support for SOCK_SEQPACKET socket.

This adds transport ops and removes ignore of non-stream type of packets.

Signed-off-by: Arseny Krasnov <[email protected]>
---
drivers/vhost/vsock.c | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index 5e78fb719602..4d60a99aed14 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -354,8 +354,7 @@ vhost_vsock_alloc_pkt(struct vhost_virtqueue *vq,
return NULL;
}

- if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM)
- pkt->len = le32_to_cpu(pkt->hdr.len);
+ pkt->len = le32_to_cpu(pkt->hdr.len);

/* No payload */
if (!pkt->len)
@@ -424,6 +423,10 @@ static struct virtio_transport vhost_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,

+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
--
2.25.1

2021-01-15 10:02:33

by stsp

[permalink] [raw]
Subject: Re: [RFC PATCH v2 00/13] virtio/vsock: introduce SOCK_SEQPACKET support.

15.01.2021 08:35, Arseny Krasnov пишет:
> This patchset impelements support of SOCK_SEQPACKET for virtio
> transport.
> As SOCK_SEQPACKET guarantees to save record boundaries, so to
> do it, new packet operation was added: it marks start of record (with
> record length in header), such packet doesn't carry any data. To send
> record, packet with start marker is sent first, then all data is sent
> as usual 'RW' packets. On receiver's side, length of record is known
> from packet with start record marker. Now as packets of one socket
> are not reordered neither on vsock nor on vhost transport layers, such
> marker allows to restore original record on receiver's side. If user's
> buffer is smaller that

than


> record length, when

then


> v1 -> v2:
> - patches reordered: af_vsock.c changes now before virtio vsock
> - patches reorganized: more small patches, where +/- are not mixed

If you did this because I asked, then this
is not what I asked. :)
You can't just add some static func in a
separate patch, as it will just produce the
compilation warning of an unused function.
I only asked to separate the refactoring from
the new code. I.e. if you move some code
block to a separate function, you shouldn't
split that into 2 patches, one that adds a
code block and another one that removes it.
It should be in one patch, so that it is clear
what was moved, and no new warnings are
introduced.
What I asked to separate, is the old code
moves with the new code additions. Such
things can definitely go in a separate patches.

NB: just trying to help, as I already played
with your code a bit. I am neither a
maintainer nor a contributor here, but
it would be cool to have the vsock SEQPACKET
support.

2021-01-18 15:25:54

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v2 06/13] af_vsock: general support of SOCK_SEQPACKET type.

On Fri, Jan 15, 2021 at 08:42:43AM +0300, Arseny Krasnov wrote:
>This adds socket operations for SOCK_SEQPACKET and adds this type of
>socket for conditions where SOCK_STREAM is involved because both type of
>sockets are connect oriented.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/af_vsock.c | 108 +++++++++++++++++++++++++++++++++------
> 1 file changed, 92 insertions(+), 16 deletions(-)
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index 4a7cdf7756c0..d0ef066e9352 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -452,6 +452,7 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
> new_transport = transport_dgram;
> break;
> case SOCK_STREAM:
>+ case SOCK_SEQPACKET:
> if (vsock_use_local_transport(remote_cid))
> new_transport = transport_local;
> else if (remote_cid <= VMADDR_CID_HOST || !transport_h2g ||
>@@ -459,6 +460,13 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
> new_transport = transport_g2h;
> else
> new_transport = transport_h2g;
>+
>+ if (sk->sk_type == SOCK_SEQPACKET) {
>+ if (!new_transport->seqpacket_seq_send_len ||
>+ !new_transport->seqpacket_seq_get_len ||
>+ !new_transport->seqpacket_dequeue)
>+ return -ENODEV;
>+ }
> break;
> default:
> return -ESOCKTNOSUPPORT;
>@@ -604,8 +612,8 @@ static void vsock_pending_work(struct work_struct *work)
>
> /**** SOCKET OPERATIONS ****/
>
>-static int __vsock_bind_stream(struct vsock_sock *vsk,
>- struct sockaddr_vm *addr)
>+static int __vsock_bind_connectible(struct vsock_sock *vsk,
>+ struct sockaddr_vm *addr)
> {
> static u32 port;
> struct sockaddr_vm new_addr;
>@@ -684,8 +692,9 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
>
> switch (sk->sk_socket->type) {
> case SOCK_STREAM:
>+ case SOCK_SEQPACKET:
> spin_lock_bh(&vsock_table_lock);
>- retval = __vsock_bind_stream(vsk, addr);
>+ retval = __vsock_bind_connectible(vsk, addr);
> spin_unlock_bh(&vsock_table_lock);
> break;
>
>@@ -767,6 +776,11 @@ static struct sock *__vsock_create(struct net *net,
> return sk;
> }
>
>+static bool sock_type_connectible(u16 type)
>+{
>+ return (type == SOCK_STREAM || type == SOCK_SEQPACKET);
>+}
>+
> static void __vsock_release(struct sock *sk, int level)
> {
> if (sk) {
>@@ -785,7 +799,7 @@ static void __vsock_release(struct sock *sk, int level)
>
> if (vsk->transport)
> vsk->transport->release(vsk);
>- else if (sk->sk_type == SOCK_STREAM)
>+ else if (sock_type_connectible(sk->sk_type))
> vsock_remove_sock(vsk);
>
> sock_orphan(sk);
>@@ -945,7 +959,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
> sk = sock->sk;
> if (sock->state == SS_UNCONNECTED) {
> err = -ENOTCONN;
>- if (sk->sk_type == SOCK_STREAM)
>+ if (sock_type_connectible(sk->sk_type))
> return err;
> } else {
> sock->state = SS_DISCONNECTING;
>@@ -960,7 +974,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
> sk->sk_state_change(sk);
> release_sock(sk);
>
>- if (sk->sk_type == SOCK_STREAM) {
>+ if (sock_type_connectible(sk->sk_type)) {
> sock_reset_flag(sk, SOCK_DONE);
> vsock_send_shutdown(sk, mode);
> }
>@@ -1013,7 +1027,7 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
> if (!(sk->sk_shutdown & SEND_SHUTDOWN))
> mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;
>
>- } else if (sock->type == SOCK_STREAM) {
>+ } else if (sock_type_connectible(sk->sk_type)) {
> const struct vsock_transport *transport = vsk->transport;
> lock_sock(sk);
>
>@@ -1259,8 +1273,8 @@ static void vsock_connect_timeout(struct work_struct *work)
> sock_put(sk);
> }
>
>-static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
>- int addr_len, int flags)
>+static int vsock_connect(struct socket *sock, struct sockaddr *addr,
>+ int addr_len, int flags)
> {
> int err;
> struct sock *sk;
>@@ -1410,7 +1424,7 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
>
> lock_sock(listener);
>
>- if (sock->type != SOCK_STREAM) {
>+ if (!sock_type_connectible(sock->type)) {
> err = -EOPNOTSUPP;
> goto out;
> }
>@@ -1477,6 +1491,18 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
> return err;
> }
>
>+static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
>+ int addr_len, int flags)
>+{
>+ return vsock_connect(sock, addr, addr_len, flags);
>+}
>+
>+static int vsock_seqpacket_connect(struct socket *sock, struct sockaddr *addr,
>+ int addr_len, int flags)
>+{
>+ return vsock_connect(sock, addr, addr_len, flags);
>+}
>+
> static int vsock_listen(struct socket *sock, int backlog)
> {
> int err;
>@@ -1487,7 +1513,7 @@ static int vsock_listen(struct socket *sock, int backlog)
>
> lock_sock(sk);
>
>- if (sock->type != SOCK_STREAM) {
>+ if (!sock_type_connectible(sk->sk_type)) {
> err = -EOPNOTSUPP;
> goto out;
> }
>@@ -1531,11 +1557,11 @@ static void vsock_update_buffer_size(struct vsock_sock *vsk,
> vsk->buffer_size = val;
> }
>
>-static int vsock_stream_setsockopt(struct socket *sock,
>- int level,
>- int optname,
>- sockptr_t optval,
>- unsigned int optlen)
>+static int vsock_setsockopt(struct socket *sock,
>+ int level,
>+ int optname,
>+ sockptr_t optval,
>+ unsigned int optlen)
> {
> int err;
> struct sock *sk;
>@@ -1612,6 +1638,24 @@ static int vsock_stream_setsockopt(struct socket *sock,
> return err;
> }
>
>+static int vsock_seqpacket_setsockopt(struct socket *sock,
>+ int level,
>+ int optname,
>+ sockptr_t optval,
>+ unsigned int optlen)
>+{
>+ return vsock_setsockopt(sock, level, optname, optval, optlen);
>+}
>+
>+static int vsock_stream_setsockopt(struct socket *sock,
>+ int level,
>+ int optname,
>+ sockptr_t optval,
>+ unsigned int optlen)
>+{
>+ return vsock_setsockopt(sock, level, optname, optval, optlen);
>+}
>+
> static int vsock_stream_getsockopt(struct socket *sock,
> int level, int optname,
> char __user *optval,
>@@ -1683,6 +1727,14 @@ static int vsock_stream_getsockopt(struct socket *sock,
> return 0;
> }
>
>+static int vsock_seqpacket_getsockopt(struct socket *sock,
>+ int level, int optname,
>+ char __user *optval,
>+ int __user *optlen)
>+{
>+ return vsock_stream_getsockopt(sock, level, optname, optval, optlen);
>+}
>+

Why didn't you do the same thing you did with setsockopt?

Both are fine for me, but I'd like to do the same thing for getsockopt
and setsockopt.

If you opt to create new functions, maybe it's better to call them
vsock_connectible_*sockopt()

> static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> size_t len)
> {
>@@ -2184,6 +2236,27 @@ static const struct proto_ops vsock_stream_ops = {
> .sendpage = sock_no_sendpage,
> };
>
>+static const struct proto_ops vsock_seqpacket_ops = {
>+ .family = PF_VSOCK,
>+ .owner = THIS_MODULE,
>+ .release = vsock_release,
>+ .bind = vsock_bind,
>+ .connect = vsock_seqpacket_connect,
>+ .socketpair = sock_no_socketpair,
>+ .accept = vsock_accept,
>+ .getname = vsock_getname,
>+ .poll = vsock_poll,
>+ .ioctl = sock_no_ioctl,
>+ .listen = vsock_listen,
>+ .shutdown = vsock_shutdown,
>+ .setsockopt = vsock_seqpacket_setsockopt,
>+ .getsockopt = vsock_seqpacket_getsockopt,
>+ .sendmsg = vsock_seqpacket_sendmsg,
>+ .recvmsg = vsock_seqpacket_recvmsg,
>+ .mmap = sock_no_mmap,
>+ .sendpage = sock_no_sendpage,
>+};
>+
> static int vsock_create(struct net *net, struct socket *sock,
> int protocol, int kern)
> {
>@@ -2204,6 +2277,9 @@ static int vsock_create(struct net *net, struct socket *sock,
> case SOCK_STREAM:
> sock->ops = &vsock_stream_ops;
> break;
>+ case SOCK_SEQPACKET:
>+ sock->ops = &vsock_seqpacket_ops;
>+ break;
> default:
> return -ESOCKTNOSUPPORT;
> }
>--
>2.25.1
>

2021-01-18 16:47:39

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v2 00/13] virtio/vsock: introduce SOCK_SEQPACKET support.

On Fri, Jan 15, 2021 at 12:59:30PM +0300, stsp wrote:
>15.01.2021 08:35, Arseny Krasnov пишет:
>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>transport.
>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>do it, new packet operation was added: it marks start of record (with
>>record length in header), such packet doesn't carry any data. To send
>>record, packet with start marker is sent first, then all data is sent
>>as usual 'RW' packets. On receiver's side, length of record is known
>>from packet with start record marker. Now as packets of one socket
>>are not reordered neither on vsock nor on vhost transport layers, such
>>marker allows to restore original record on receiver's side. If user's
>>buffer is smaller that
>
>than
>
>
>> record length, when
>
>then
>
>
>> v1 -> v2:
>> - patches reordered: af_vsock.c changes now before virtio vsock
>> - patches reorganized: more small patches, where +/- are not mixed
>
>If you did this because I asked, then this
>is not what I asked. :)
>You can't just add some static func in a
>separate patch, as it will just produce the
>compilation warning of an unused function.
>I only asked to separate the refactoring from
>the new code. I.e. if you move some code
>block to a separate function, you shouldn't
>split that into 2 patches, one that adds a
>code block and another one that removes it.
>It should be in one patch, so that it is clear
>what was moved, and no new warnings are
>introduced.
>What I asked to separate, is the old code
>moves with the new code additions. Such
>things can definitely go in a separate patches.

Arseny, thanks for the v2.
I appreciated that you moved the af_vsock changes before the transport
and also the test, but I agree with stsp about split patches.

As stsp suggested, you can have some "preparation" patches that touch
the already existing code (e.g. rename vsock_stream_sendmsg in
vsock_connectible_sendmsg() and call it inside the new
vsock_stream_sendmsg, etc.), then a patch that adds seqpacket stuff in
af_vsock.

Also for virtio/vhost transports, you can have some patches that add
support in virtio_transport_common, then a patch that enable it in
virtio_transport and a patch for vhost_vsock, as you rightly did in
patch 12.

So, I'd suggest moving out the code that touches virtio_transport.c
from patch 11.

These changes should simplify the review.

In addition, you can also remove the . from the commit titles.


I left other comments in the single patches.

Thanks,
Stefano

2021-01-18 16:54:46

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v2 08/13] virtio/vsock: dequeue callback for SOCK_SEQPACKET.

On Fri, Jan 15, 2021 at 08:43:24AM +0300, Arseny Krasnov wrote:
>This adds transport callback and it's logic for SEQPACKET dequeue.
>Callback fetches RW packets from rx queue of socket until whole record
>is copied(if user's buffer is full, user is not woken up). This is done
>to not stall sender, because if we wake up user and it leaves syscall,
>nobody will send credit update for rest of record, and sender will wait
>for next enter of read syscall at receiver's side. So if user buffer is
>full, we just send credit update and drop data. If during copy SEQ_BEGIN
>was found(and not all data was copied), copying is restarted by reset
>user's iov iterator(previous unfinished data is dropped).
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/linux/virtio_vsock.h | 4 +
> include/uapi/linux/virtio_vsock.h | 9 ++
> net/vmw_vsock/virtio_transport_common.c | 128 ++++++++++++++++++++++++
> 3 files changed, 141 insertions(+)
>
>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>index dc636b727179..7f0ef5204e33 100644
>--- a/include/linux/virtio_vsock.h
>+++ b/include/linux/virtio_vsock.h
>@@ -36,6 +36,10 @@ struct virtio_vsock_sock {
> u32 rx_bytes;
> u32 buf_alloc;
> struct list_head rx_queue;
>+
>+ /* For SOCK_SEQPACKET */
>+ u32 user_read_seq_len;
>+ u32 user_read_copied;
> };
>
> struct virtio_vsock_pkt {
>diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
>index 1d57ed3d84d2..058908bc19fc 100644
>--- a/include/uapi/linux/virtio_vsock.h
>+++ b/include/uapi/linux/virtio_vsock.h
>@@ -65,6 +65,7 @@ struct virtio_vsock_hdr {
>
> enum virtio_vsock_type {
> VIRTIO_VSOCK_TYPE_STREAM = 1,
>+ VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
> };
>
> enum virtio_vsock_op {
>@@ -83,6 +84,9 @@ enum virtio_vsock_op {
> VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
> /* Request the peer to send the credit info to us */
> VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
>+
>+ /* Record begin for SOCK_SEQPACKET */
>+ VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
> };
>
> /* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
>@@ -91,4 +95,9 @@ enum virtio_vsock_shutdown {
> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
> };
>
>+/* VIRTIO_VSOCK_OP_RW flags values for SOCK_SEQPACKET type */
>+enum virtio_vsock_rw_seqpacket {
>+ VIRTIO_VSOCK_RW_EOR = 1,
>+};
>+
> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index 5956939eebb7..4328f653a477 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -397,6 +397,132 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> return err;
> }
>
>+static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
>+{
>+ list_del(&pkt->list);
>+ virtio_transport_free_pkt(pkt);
>+}
>+
>+static size_t virtio_transport_drop_until_seq_begin(struct virtio_vsock_sock *vvs)
>+{
>+ struct virtio_vsock_pkt *pkt, *n;
>+ size_t bytes_dropped = 0;
>+
>+ list_for_each_entry_safe(pkt, n, &vvs->rx_queue, list) {
>+ if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_SEQ_BEGIN)
>+ break;
>+
>+ bytes_dropped += le32_to_cpu(pkt->hdr.len);
>+ virtio_transport_dec_rx_pkt(vvs, pkt);
>+ virtio_transport_del_n_free_pkt(pkt);
>+ }
>+
>+ return bytes_dropped;
>+}
>+
>+static ssize_t virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ size_t user_buf_len)
>+{
>+ struct virtio_vsock_sock *vvs = vsk->trans;
>+ struct virtio_vsock_pkt *pkt;
>+ size_t bytes_handled = 0;
>+ int err = 0;
>+
>+ spin_lock_bh(&vvs->rx_lock);
>+
>+ if (user_buf_len == 0) {
>+ /* User's buffer is full, we processing rest of
>+ * record and drop it. If 'SEQ_BEGIN' is found
>+ * while iterating, user will be woken up,
>+ * because record is already copied, and we
>+ * don't care about absent of some tail RW packets
>+ * of it. Return number of bytes(rest of record),
>+ * but ignore credit update for such absent bytes.
>+ */
>+ bytes_handled = virtio_transport_drop_until_seq_begin(vvs);
>+ vvs->user_read_copied += bytes_handled;
>+
>+ if (!list_empty(&vvs->rx_queue) &&
>+ vvs->user_read_copied < vvs->user_read_seq_len) {
>+ /* 'SEQ_BEGIN' found, but record isn't complete.
>+ * Set number of copied bytes to fit record size
>+ * and force counters to finish receiving.
>+ */
>+ bytes_handled += (vvs->user_read_seq_len - vvs->user_read_copied);
>+ vvs->user_read_copied = vvs->user_read_seq_len;
>+ }
>+ }
>+
>+ /* Now start copying. */
>+ while (vvs->user_read_copied < vvs->user_read_seq_len &&
>+ vvs->rx_bytes &&
>+ user_buf_len &&
>+ !err) {
>+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>+
>+ switch (le16_to_cpu(pkt->hdr.op)) {
>+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
>+ /* Unexpected 'SEQ_BEGIN' during record copy:
>+ * Leave receive loop, 'EAGAIN' will restart it from
>+ * outer receive loop, packet is still in queue and
>+ * counters are cleared. So in next loop enter,
>+ * 'SEQ_BEGIN' will be dequeued first. User's iov
>+ * iterator will be reset in outer loop. Also
>+ * send credit update, because some bytes could be
>+ * copied. User will never see unfinished record.
>+ */
>+ err = -EAGAIN;
>+ break;
>+ }
>+ case VIRTIO_VSOCK_OP_RW: {
>+ size_t bytes_to_copy;
>+ size_t pkt_len;
>+
>+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>+ bytes_to_copy = min(user_buf_len, pkt_len);
>+
>+ /* sk_lock is held by caller so no one else can dequeue.
>+ * Unlock rx_lock since memcpy_to_msg() may sleep.
>+ */
>+ spin_unlock_bh(&vvs->rx_lock);
>+
>+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
>+ spin_lock_bh(&vvs->rx_lock);
>+ err = -EINVAL;
>+ break;
>+ }
>+
>+ spin_lock_bh(&vvs->rx_lock);
>+ user_buf_len -= bytes_to_copy;
>+ bytes_handled += pkt->len;
>+ vvs->user_read_copied += bytes_to_copy;
>+
>+ if (le16_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_RW_EOR)
^
hdr.flags is __le32, so please use le32_to_cpu()

>+ msg->msg_flags |= MSG_EOR;
>+ break;
>+ }
>+ default:
>+ ;
>+ }
>+
>+ /* For unexpected 'SEQ_BEGIN', keep such packet in queue,
>+ * but drop any other type of packet.
>+ */
>+ if (le16_to_cpu(pkt->hdr.op) != VIRTIO_VSOCK_OP_SEQ_BEGIN) {
>+ virtio_transport_dec_rx_pkt(vvs, pkt);
>+ virtio_transport_del_n_free_pkt(pkt);
>+ }
>+ }
>+
>+ spin_unlock_bh(&vvs->rx_lock);
>+
>+ virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_SEQPACKET,
>+ NULL);
>+
>+ return err ?: bytes_handled;
>+}
>+
> ssize_t
> virtio_transport_stream_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
>@@ -481,6 +607,8 @@ int virtio_transport_do_socket_init(struct vsock_sock *vsk,
> spin_lock_init(&vvs->rx_lock);
> spin_lock_init(&vvs->tx_lock);
> INIT_LIST_HEAD(&vvs->rx_queue);
>+ vvs->user_read_copied = 0;
>+ vvs->user_read_seq_len = 0;
>
> return 0;
> }
>--
>2.25.1
>

2021-01-18 16:56:28

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v2 10/13] virtio/vsock: update receive logic

On Fri, Jan 15, 2021 at 08:44:07AM +0300, Arseny Krasnov wrote:
>This modifies current receive logic for SEQPACKET support:
>1) Add 'SEQ_BEGIN' packet to socket's rx queue.
>2) Add 'RW' packet to rx queue, but without merging inside buffer of last
> packet in queue.
>3) Perform check for packet type and socket type on receive(if mismatch,
> then reset connection).
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/virtio_transport_common.c | 79 ++++++++++++++++++-------
> 1 file changed, 58 insertions(+), 21 deletions(-)
>
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index fe1272e74517..c3e07eb1c666 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -397,6 +397,14 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> return err;
> }
>
>+static u16 virtio_transport_get_type(struct sock *sk)
>+{
>+ if (sk->sk_type == SOCK_STREAM)
>+ return VIRTIO_VSOCK_TYPE_STREAM;
>+ else
>+ return VIRTIO_VSOCK_TYPE_SEQPACKET;
>+}
>+
> static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
> {
> list_del(&pkt->list);
>@@ -1050,39 +1058,49 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
> struct virtio_vsock_pkt *pkt)
> {
> struct virtio_vsock_sock *vvs = vsk->trans;
>- bool can_enqueue, free_pkt = false;
>+ bool free_pkt = false;
>
> pkt->len = le32_to_cpu(pkt->hdr.len);
> pkt->off = 0;
>
> spin_lock_bh(&vvs->rx_lock);
>
>- can_enqueue = virtio_transport_inc_rx_pkt(vvs, pkt);
>- if (!can_enqueue) {
>+ if (!virtio_transport_inc_rx_pkt(vvs, pkt)) {
> free_pkt = true;
> goto out;
> }
>
>- /* Try to copy small packets into the buffer of last packet queued,
>- * to avoid wasting memory queueing the entire buffer with a small
>- * payload.
>- */
>- if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
>- struct virtio_vsock_pkt *last_pkt;
>+ switch (le32_to_cpu(pkt->hdr.type)) {
^
hdr.type is __le16, so please use le16_to_cpu()

>+ case VIRTIO_VSOCK_TYPE_STREAM: {
>+ /* Try to copy small packets into the buffer of last
>packet queued,
>+ * to avoid wasting memory queueing the entire buffer with a small
>+ * payload.
>+ */
>+ if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
>+ struct virtio_vsock_pkt *last_pkt;
>
>- last_pkt = list_last_entry(&vvs->rx_queue,
>- struct virtio_vsock_pkt, list);
>+ last_pkt = list_last_entry(&vvs->rx_queue,
>+ struct virtio_vsock_pkt, list);
>
>- /* If there is space in the last packet queued, we copy the
>- * new packet in its buffer.
>- */
>- if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
>- memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
>- pkt->len);
>- last_pkt->len += pkt->len;
>- free_pkt = true;
>- goto out;
>+ /* If there is space in the last packet queued, we copy the
>+ * new packet in its buffer.
>+ */
>+ if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
>+ memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
>+ pkt->len);
>+ last_pkt->len += pkt->len;
>+ free_pkt = true;
>+ goto out;
>+ }
> }
>+
>+ break;
>+ }
>+ case VIRTIO_VSOCK_TYPE_SEQPACKET: {
>+ break;
>+ }
>+ default:
>+ goto out;
> }
>
> list_add_tail(&pkt->list, &vvs->rx_queue);
>@@ -1101,6 +1119,14 @@ virtio_transport_recv_connected(struct sock *sk,
> int err = 0;
>
> switch (le16_to_cpu(pkt->hdr.op)) {
>+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
>+ struct virtio_vsock_sock *vvs = vsk->trans;
>+
>+ spin_lock_bh(&vvs->rx_lock);
>+ list_add_tail(&pkt->list, &vvs->rx_queue);
>+ spin_unlock_bh(&vvs->rx_lock);
>+ return err;
>+ }
> case VIRTIO_VSOCK_OP_RW:
> virtio_transport_recv_enqueue(vsk, pkt);
> sk->sk_data_ready(sk);
>@@ -1247,6 +1273,12 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
> return 0;
> }
>
>+static bool virtio_transport_valid_type(u16 type)
>+{
>+ return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
>+ (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
>+}
>+
> /* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
> * lock.
> */
>@@ -1272,7 +1304,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
> le32_to_cpu(pkt->hdr.buf_alloc),
> le32_to_cpu(pkt->hdr.fwd_cnt));
>
>- if (le16_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_STREAM) {
>+ if (!virtio_transport_valid_type(le16_to_cpu(pkt->hdr.type))) {
> (void)virtio_transport_reset_no_sock(t, pkt);
> goto free_pkt;
> }
>@@ -1289,6 +1321,11 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
> }
> }
>
>+ if (virtio_transport_get_type(sk) != le16_to_cpu(pkt->hdr.type)) {
>+ (void)virtio_transport_reset_no_sock(t, pkt);
>+ goto free_pkt;
>+ }
>+
> vsk = vsock_sk(sk);
>
> space_available = virtio_transport_space_update(sk, pkt);
>--
>2.25.1
>

2021-01-19 04:37:45

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v2 02/13] af_vsock: separate rx loops for STREAM/SEQPACKET.

On Fri, Jan 15, 2021 at 08:40:50AM +0300, Arseny Krasnov wrote:
>This adds two receive loops: for SOCK_STREAM and SOCK_SEQPACKET. Both are
>look like twins, but SEQPACKET is a little bit different from STREAM:
>1) It doesn't call notify callbacks.
>2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
> there is no sense for these values in SEQPACKET case.
>3) It waits until whole record is received or error is found during
> receiving.
>4) It processes and sets 'MSG_TRUNC' flag.
>
>So to avoid extra conditions for two types of socket inside on loop, two
>independent functions were created.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/net/af_vsock.h | 5 +
> net/vmw_vsock/af_vsock.c | 202 +++++++++++++++++++++++++++++++++++++++
> 2 files changed, 207 insertions(+)
>
>diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>index b1c717286993..46073842d489 100644
>--- a/include/net/af_vsock.h
>+++ b/include/net/af_vsock.h
>@@ -135,6 +135,11 @@ struct vsock_transport {
> bool (*stream_is_active)(struct vsock_sock *);
> bool (*stream_allow)(u32 cid, u32 port);
>
>+ /* SEQ_PACKET. */
>+ size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
>+ ssize_t (*seqpacket_dequeue)(struct vsock_sock *, struct msghdr *,
>+ size_t len, int flags);
>+
> /* Notification. */
> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index af716f5a93a4..afacbe9f4231 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -1870,6 +1870,208 @@ static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
> return err;
> }
>
>+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
>+ size_t len, int flags)
>+{
>+ int err = 0;
>+ size_t record_len;
>+ struct vsock_sock *vsk;
>+ const struct vsock_transport *transport;
>+ long timeout;
>+ ssize_t dequeued_total = 0;
>+ unsigned long orig_nr_segs;
>+ const struct iovec *orig_iov;
>+ DEFINE_WAIT(wait);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+
>+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>+ msg->msg_flags &= ~MSG_EOR;
>+ orig_nr_segs = msg->msg_iter.nr_segs;
>+ orig_iov = msg->msg_iter.iov;
>+
>+ while (1) {
>+ s64 ready;
>+
>+ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>+ ready = vsock_stream_has_data(vsk);
>+
>+ if (ready == 0) {
>+ if (vsock_wait_data(sk, &wait, timeout, NULL, 0)) {
>+ /* In case of any loop break(timeout, signal
>+ * interrupt or shutdown), we report user that
>+ * nothing was copied.
>+ */
>+ dequeued_total = 0;
>+ break;
>+ }

Maybe here we can do 'continue', remove the next line, and reduce the
indentation on the next block.

>+ } else {
>+ ssize_t dequeued;
>+
>+ finish_wait(sk_sleep(sk), &wait);
>+
>+ if (ready < 0) {
>+ err = -ENOMEM;
>+ goto out;
>+ }
>+
>+ if (dequeued_total == 0) {
>+ record_len =
>+ transport->seqpacket_seq_get_len(vsk);
>+
>+ if (record_len == 0)
>+ continue;
>+ }
>+
>+ /* 'msg_iter.count' is number of unused bytes in iov.
>+ * On every copy to iov iterator it is decremented at
>+ * size of data.
>+ */
>+ dequeued = transport->seqpacket_dequeue(vsk, msg,
>+ msg->msg_iter.count, flags);
>+
>+ if (dequeued < 0) {
>+ dequeued_total = 0;
>+
>+ if (dequeued == -EAGAIN) {
>+ iov_iter_init(&msg->msg_iter, READ,
>+ orig_iov, orig_nr_segs,
>+ len);
>+ msg->msg_flags &= ~MSG_EOR;
>+ continue;
>+ }
>+
>+ err = -ENOMEM;
>+ break;
>+ }
>+
>+ dequeued_total += dequeued;
>+
>+ if (dequeued_total >= record_len)
>+ break;
>+ }
>+ }
>+ if (sk->sk_err)
>+ err = -sk->sk_err;
>+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
>+ err = 0;
>+
>+ if (dequeued_total > 0) {
>+ /* User sets MSG_TRUNC, so return real length of
>+ * packet.
>+ */
>+ if (flags & MSG_TRUNC)
>+ err = record_len;
>+ else
>+ err = len - msg->msg_iter.count;
>+
>+ /* Always set MSG_TRUNC if real length of packet is
>+ * bigger that user buffer.
>+ */
>+ if (record_len > len)
>+ msg->msg_flags |= MSG_TRUNC;
>+ }
>+out:
>+ return err;
>+}
>+
>+static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
>+ size_t len, int flags)
>+{
>+ int err;
>+ const struct vsock_transport *transport;
>+ struct vsock_sock *vsk;
>+ size_t target;
>+ struct vsock_transport_recv_notify_data recv_data;
>+ long timeout;
>+ ssize_t copied;
>+
>+ DEFINE_WAIT(wait);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+
>+ /* We must not copy less than target bytes into the user's buffer
>+ * before returning successfully, so we wait for the consume queue to
>+ * have that much data to consume before dequeueing. Note that this
>+ * makes it impossible to handle cases where target is greater than the
>+ * queue size.
>+ */
>+ target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
>+ if (target >= transport->stream_rcvhiwat(vsk)) {
>+ err = -ENOMEM;
>+ goto out;
>+ }
>+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>+ copied = 0;
>+
>+ err = transport->notify_recv_init(vsk, target, &recv_data);
>+ if (err < 0)
>+ goto out;
>+
>+ while (1) {
>+ s64 ready;
>+
>+ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>+ ready = vsock_stream_has_data(vsk);
>+
>+ if (ready == 0) {
>+ if (vsock_wait_data(sk, &wait, timeout,
>&recv_data, target))
>+ break;

The same also here.

>+ } else {
>+ ssize_t read;
>+
>+ finish_wait(sk_sleep(sk), &wait);
>+
>+ if (ready < 0) {
>+ /* Invalid queue pair content. XXX This should
>+ * be changed to a connection reset in a later
>+ * change.
>+ */
>+
>+ err = -ENOMEM;
>+ goto out;
>+ }
>+
>+ err = transport->notify_recv_pre_dequeue(vsk,
>+ target, &recv_data);
>+ if (err < 0)
>+ break;
>+ read = transport->stream_dequeue(vsk, msg, len - copied, flags);
>+
>+ if (read < 0) {
>+ err = -ENOMEM;
>+ break;
>+ }
>+
>+ copied += read;
>+
>+ err = transport->notify_recv_post_dequeue(vsk,
>+ target, read,
>+ !(flags & MSG_PEEK), &recv_data);
>+ if (err < 0)
>+ goto out;
>+
>+ if (read >= target || flags & MSG_PEEK)
>+ break;
>+
>+ target -= read;
>+ }
>+ }
>+
>+ if (sk->sk_err)
>+ err = -sk->sk_err;
>+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
>+ err = 0;
>+ if (copied > 0)
>+ err = copied;
>+
>+out:
>+ release_sock(sk);
>+ return err;
>+}
>+
> static int
> vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> int flags)
>--
>2.25.1
>

2021-01-19 04:46:46

by Stefano Garzarella

[permalink] [raw]
Subject: Re: [RFC PATCH v2 11/13] virtio/vsock: rest of SOCK_SEQPACKET support

On Fri, Jan 15, 2021 at 08:44:22AM +0300, Arseny Krasnov wrote:
>This adds rest of logic for SEQPACKET:
>1) Shared functions for packet sending now set valid type of packet
> according socket type.
>2) SEQPACKET specific function like SEQ_BEGIN send and data dequeue.
>3) Ops for virtio transport.
>4) TAP support for SEQPACKET is not so easy if it is necessary to send
> whole record to TAP interface. This could be done by allocating
> new packet when whole record is received, data of record must be
> copied to TAP packet.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/linux/virtio_vsock.h | 7 ++++
> net/vmw_vsock/virtio_transport.c | 4 ++
> net/vmw_vsock/virtio_transport_common.c | 54 ++++++++++++++++++++++---
> 3 files changed, 59 insertions(+), 6 deletions(-)
>
>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>index af8705ea8b95..ad9783df97c9 100644
>--- a/include/linux/virtio_vsock.h
>+++ b/include/linux/virtio_vsock.h
>@@ -84,7 +84,14 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags);
>
>+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len);
> size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
>+ssize_t
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ size_t len,
>+ int type);
>+
> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>
>diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
>index 2700a63ab095..5a7ab1befee8 100644
>--- a/net/vmw_vsock/virtio_transport.c
>+++ b/net/vmw_vsock/virtio_transport.c
>@@ -469,6 +469,10 @@ static struct virtio_transport virtio_transport = {
> .stream_is_active = virtio_transport_stream_is_active,
> .stream_allow = virtio_transport_stream_allow,
>
>+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
>+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
>+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
>+
> .notify_poll_in = virtio_transport_notify_poll_in,
> .notify_poll_out = virtio_transport_notify_poll_out,
> .notify_recv_init = virtio_transport_notify_recv_init,
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index c3e07eb1c666..5fdf1adfdaab 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -139,6 +139,7 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
> break;
> case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
> case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
>+ case VIRTIO_VSOCK_OP_SEQ_BEGIN:
> hdr->op = cpu_to_le16(AF_VSOCK_OP_CONTROL);
> break;
> default:
>@@ -157,6 +158,10 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
>
> void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
> {
>+ /* TODO: implement tap support for SOCK_SEQPACKET. */
>+ if (le32_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET)
^
hdr.type is __le16, so please use le16_to_cpu()

>+ return;
>+
> if (pkt->tap_delivered)
> return;
>
>@@ -405,6 +410,19 @@ static u16 virtio_transport_get_type(struct sock *sk)
> return VIRTIO_VSOCK_TYPE_SEQPACKET;
> }
>
>+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len)
>+{
>+ struct virtio_vsock_pkt_info info = {
>+ .type = VIRTIO_VSOCK_TYPE_SEQPACKET,
>+ .op = VIRTIO_VSOCK_OP_SEQ_BEGIN,
>+ .vsk = vsk,
>+ .flags = len
>+ };
>+
>+ return virtio_transport_send_pkt_info(vsk, &info);
>+}
>+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_send_len);
>+
> static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
> {
> list_del(&pkt->list);
>@@ -576,6 +594,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
> }
> EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
>
>+ssize_t
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ size_t len, int flags)
>+{
>+ if (flags & MSG_PEEK)
>+ return -EOPNOTSUPP;
>+
>+ return virtio_transport_seqpacket_do_dequeue(vsk, msg, len);
>+}
>+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
>+
> int
> virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
>@@ -659,13 +689,15 @@ EXPORT_SYMBOL_GPL(virtio_transport_do_socket_init);
> void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)
> {
> struct virtio_vsock_sock *vvs = vsk->trans;
>+ int type;
>
> if (*val > VIRTIO_VSOCK_MAX_BUF_SIZE)
> *val = VIRTIO_VSOCK_MAX_BUF_SIZE;
>
> vvs->buf_alloc = *val;
>
>- virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_STREAM,
>+ type = virtio_transport_get_type(sk_vsock(vsk));
>+ virtio_transport_send_credit_update(vsk, type,
> NULL);

With this change, you can move 'NULL' in the previous line.

> }
> EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);
>@@ -793,10 +825,11 @@ int virtio_transport_connect(struct vsock_sock *vsk)
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_REQUEST,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .vsk = vsk,
> };
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_connect);
>@@ -805,7 +838,6 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_SHUTDOWN,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .flags = (mode & RCV_SHUTDOWN ?
> VIRTIO_VSOCK_SHUTDOWN_RCV : 0) |
> (mode & SEND_SHUTDOWN ?
>@@ -813,6 +845,8 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
> .vsk = vsk,
> };
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_shutdown);
>@@ -834,12 +868,18 @@ virtio_transport_stream_enqueue(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RW,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .msg = msg,
> .pkt_len = len,
> .vsk = vsk,
>+ .flags = 0,
> };
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
>+ if (info.type == VIRTIO_VSOCK_TYPE_SEQPACKET &&
>+ msg->msg_flags & MSG_EOR)
>+ info.flags |= VIRTIO_VSOCK_RW_EOR;
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_stream_enqueue);
>@@ -857,7 +897,6 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RST,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .reply = !!pkt,
> .vsk = vsk,
> };
>@@ -866,6 +905,8 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
> if (pkt && le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RST)
> return 0;
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
>
>@@ -1177,13 +1218,14 @@ virtio_transport_send_response(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RESPONSE,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .remote_cid = le64_to_cpu(pkt->hdr.src_cid),
> .remote_port = le32_to_cpu(pkt->hdr.src_port),
> .reply = true,
> .vsk = vsk,
> };
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
>
>--
>2.25.1
>