This patchset impelements support of SOCK_SEQPACKET for virtio
transport.
As SOCK_SEQPACKET guarantees to save record boundaries, so to
do it, two new packet operations were added: first for start of record
and second to mark end of record(SEQ_BEGIN and SEQ_END later). Also,
both operations carries metadata - to maintain boundaries and payload
integrity. Metadata is introduced by adding special header with two
fields - message count and message length:
struct virtio_vsock_seq_hdr {
__le32 msg_cnt;
__le32 msg_len;
} __attribute__((packed));
This header is transmitted as payload of SEQ_BEGIN and SEQ_END
packets(buffer of second virtio descriptor in chain) in the same way as
data transmitted in RW packets. Payload was chosen as buffer for this
header to avoid touching first virtio buffer which carries header of
packet, because someone could check that size of this buffer is equal
to size of packet header. To send record, packet with start marker is
sent first(it's header contains length of record and counter), then
counter is incremented and all data is sent as usual 'RW' packets and
finally SEQ_END is sent(it also carries counter of message, which is
counter of SEQ_BEGIN + 1), also after sedning SEQ_END counter is
incremented again. On receiver's side, length of record is known from
packet with start record marker. To check that no packets were dropped
by transport, counters of two sequential SEQ_BEGIN and SEQ_END are
checked(counter of SEQ_END must be bigger that counter of SEQ_BEGIN by
1) and length of data between two markers is compared to length in
SEQ_BEGIN header.
Now as packets of one socket are not reordered neither on
vsock nor on vhost transport layers, such markers allows to restore
original record on receiver's side. If user's buffer is smaller that
record length, when all out of size data is dropped.
Maximum length of datagram is not limited as in stream socket,
because same credit logic is used. Difference with stream socket is
that user is not woken up until whole record is received or error
occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
Tests also implemented.
Arseny Krasnov (19):
af_vsock: update functions for connectible socket
af_vsock: separate wait data loop
af_vsock: separate receive data loop
af_vsock: implement SEQPACKET receive loop
af_vsock: separate wait space loop
af_vsock: implement send logic for SEQPACKET
af_vsock: rest of SEQPACKET support
af_vsock: update comments for stream sockets
virtio/vsock: set packet's type in send
virtio/vsock: simplify credit update function API
virtio/vsock: dequeue callback for SOCK_SEQPACKET
virtio/vsock: fetch length for SEQPACKET record
virtio/vsock: add SEQPACKET receive logic
virtio/vsock: rest of SOCK_SEQPACKET support
virtio/vsock: setup SEQPACKET ops for transport
vhost/vsock: setup SEQPACKET ops for transport
vsock/loopback: setup SEQPACKET ops for transport
vsock_test: add SOCK_SEQPACKET tests
virtio/vsock: update trace event for SEQPACKET
drivers/vhost/vsock.c | 8 +-
include/linux/virtio_vsock.h | 14 +
include/net/af_vsock.h | 9 +
.../events/vsock_virtio_transport_common.h | 48 +-
include/uapi/linux/virtio_vsock.h | 16 +
net/vmw_vsock/af_vsock.c | 590 +++++++++++------
net/vmw_vsock/virtio_transport.c | 5 +
net/vmw_vsock/virtio_transport_common.c | 342 ++++++++--
net/vmw_vsock/vsock_loopback.c | 5 +
tools/testing/vsock/util.c | 32 +-
tools/testing/vsock/util.h | 3 +
tools/testing/vsock/vsock_test.c | 126 ++++
12 files changed, 951 insertions(+), 247 deletions(-)
v4 -> v5:
- patches reorganized:
1) Setting of packet's type in 'virtio_transport_send_pkt_info()'
is moved to separate patch.
2) Simplifying of 'virtio_transport_send_credit_update()' is
moved to separate patch and before main virtio/vsock patches.
- style problem fixed
- in 'af_vsock: separate receive data loop' extra 'release_sock()'
removed
- added trace event fields for SEQPACKET
- in 'af_vsock: separate wait data loop':
1) 'vsock_wait_data()' removed 'goto out;'
2) Comment for invalid data amount is changed.
- in 'af_vsock: rest of SEQPACKET support', 'new_transport' pointer
check is moved after 'try_module_get()'
- in 'af_vsock: update comments for stream sockets', 'connect-oriented'
replaced with 'connection-oriented'
- in 'loopback/vsock: setup SEQPACKET ops for transport',
'loopback/vsock' replaced with 'vsock/loopback'
v3 -> v4:
- SEQPACKET specific metadata moved from packet header to payload
and called 'virtio_vsock_seq_hdr'
- record integrity check:
1) SEQ_END operation was added, which marks end of record.
2) Both SEQ_BEGIN and SEQ_END carries counter which is incremented
on every marker send.
- af_vsock.c: socket operations for STREAM and SEQPACKET call same
functions instead of having own "gates" differs only by names:
'vsock_seqpacket/stream_getsockopt()' now replaced with
'vsock_connectible_getsockopt()'.
- af_vsock.c: 'seqpacket_dequeue' callback returns error and flag that
record ready. There is no need to return number of copied bytes,
because case when record received successfully is checked at virtio
transport layer, when SEQ_END is processed. Also user doesn't need
number of copied bytes, because 'recv()' from SEQPACKET could return
error, length of users's buffer or length of whole record(both are
known in af_vsock.c).
- af_vsock.c: both wait loops in af_vsock.c(for data and space) moved
to separate functions because now both called from several places.
- af_vsock.c: 'vsock_assign_transport()' checks that 'new_transport'
pointer is not NULL and returns 'ESOCKTNOSUPPORT' instead of 'ENODEV'
if failed to use transport.
- tools/testing/vsock/vsock_test.c: rename tests
v2 -> v3:
- patches reorganized: split for prepare and implementation patches
- local variables are declared in "Reverse Christmas tree" manner
- virtio_transport_common.c: valid leXX_to_cpu() for vsock header
fields access
- af_vsock.c: 'vsock_connectible_*sockopt()' added as shared code
between stream and seqpacket sockets.
- af_vsock.c: loops in '__vsock_*_recvmsg()' refactored.
- af_vsock.c: 'vsock_wait_data()' refactored.
v1 -> v2:
- patches reordered: af_vsock.c related changes now before virtio vsock
- patches reorganized: more small patches, where +/- are not mixed
- tests for SOCK_SEQPACKET added
- all commit messages updated
- af_vsock.c: 'vsock_pre_recv_check()' inlined to
'vsock_connectible_recvmsg()'
- af_vsock.c: 'vsock_assign_transport()' returns ENODEV if transport
was not found
- virtio_transport_common.c: transport callback for seqpacket dequeue
- virtio_transport_common.c: simplified
'virtio_transport_recv_connected()'
- virtio_transport_common.c: send reset on socket and packet type
mismatch.
Signed-off-by: Arseny Krasnov <[email protected]>
--
2.25.1
This moves loop that waits for space on send to separate function,
because it will be used for SEQ_BEGIN/SEQ_END sending before and
after data transmission. Waiting for SEQ_BEGIN/SEQ_END is needed
because such packets carries SEQPACKET header that couldn't be
fragmented by credit mechanism, so to avoid it, sender waits until
enough space will be ready.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/net/af_vsock.h | 2 +
net/vmw_vsock/af_vsock.c | 99 +++++++++++++++++++++++++---------------
2 files changed, 63 insertions(+), 38 deletions(-)
diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index 01563338cc03..6fbe88306403 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -205,6 +205,8 @@ void vsock_remove_sock(struct vsock_sock *vsk);
void vsock_for_each_connected_socket(void (*fn)(struct sock *sk));
int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk);
bool vsock_find_cid(unsigned int cid);
+int vsock_wait_space(struct sock *sk, size_t space, int flags,
+ struct vsock_transport_send_notify_data *send_data);
/**** TAP ****/
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index b754927a556a..09b377422b1e 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1692,6 +1692,65 @@ static int vsock_connectible_getsockopt(struct socket *sock,
return 0;
}
+int vsock_wait_space(struct sock *sk, size_t space, int flags,
+ struct vsock_transport_send_notify_data *send_data)
+{
+ const struct vsock_transport *transport;
+ struct vsock_sock *vsk;
+ long timeout;
+ int err;
+
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+ timeout = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
+ err = 0;
+
+ add_wait_queue(sk_sleep(sk), &wait);
+
+ while (vsock_stream_has_space(vsk) < space &&
+ sk->sk_err == 0 &&
+ !(sk->sk_shutdown & SEND_SHUTDOWN) &&
+ !(vsk->peer_shutdown & RCV_SHUTDOWN)) {
+
+ /* Don't wait for non-blocking sockets. */
+ if (timeout == 0) {
+ err = -EAGAIN;
+ goto out_err;
+ }
+
+ if (send_data) {
+ err = transport->notify_send_pre_block(vsk, send_data);
+ if (err < 0)
+ goto out_err;
+ }
+
+ release_sock(sk);
+ timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout);
+ lock_sock(sk);
+ if (signal_pending(current)) {
+ err = sock_intr_errno(timeout);
+ goto out_err;
+ } else if (timeout == 0) {
+ err = -EAGAIN;
+ goto out_err;
+ }
+ }
+
+ if (sk->sk_err) {
+ err = -sk->sk_err;
+ } else if ((sk->sk_shutdown & SEND_SHUTDOWN) ||
+ (vsk->peer_shutdown & RCV_SHUTDOWN)) {
+ err = -EPIPE;
+ }
+
+out_err:
+ remove_wait_queue(sk_sleep(sk), &wait);
+ return err;
+}
+EXPORT_SYMBOL_GPL(vsock_wait_space);
+
static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
size_t len)
{
@@ -1699,10 +1758,8 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
struct vsock_sock *vsk;
const struct vsock_transport *transport;
ssize_t total_written;
- long timeout;
int err;
struct vsock_transport_send_notify_data send_data;
- DEFINE_WAIT_FUNC(wait, woken_wake_function);
sk = sock->sk;
vsk = vsock_sk(sk);
@@ -1740,9 +1797,6 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
goto out;
}
- /* Wait for room in the produce queue to enqueue our user's data. */
- timeout = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
-
err = transport->notify_send_init(vsk, &send_data);
if (err < 0)
goto out;
@@ -1750,39 +1804,8 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
while (total_written < len) {
ssize_t written;
- add_wait_queue(sk_sleep(sk), &wait);
- while (vsock_stream_has_space(vsk) == 0 &&
- sk->sk_err == 0 &&
- !(sk->sk_shutdown & SEND_SHUTDOWN) &&
- !(vsk->peer_shutdown & RCV_SHUTDOWN)) {
-
- /* Don't wait for non-blocking sockets. */
- if (timeout == 0) {
- err = -EAGAIN;
- remove_wait_queue(sk_sleep(sk), &wait);
- goto out_err;
- }
-
- err = transport->notify_send_pre_block(vsk, &send_data);
- if (err < 0) {
- remove_wait_queue(sk_sleep(sk), &wait);
- goto out_err;
- }
-
- release_sock(sk);
- timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout);
- lock_sock(sk);
- if (signal_pending(current)) {
- err = sock_intr_errno(timeout);
- remove_wait_queue(sk_sleep(sk), &wait);
- goto out_err;
- } else if (timeout == 0) {
- err = -EAGAIN;
- remove_wait_queue(sk_sleep(sk), &wait);
- goto out_err;
- }
- }
- remove_wait_queue(sk_sleep(sk), &wait);
+ if (vsock_wait_space(sk, 1, msg->msg_flags, &send_data))
+ goto out_err;
/* These checks occur both as part of and after the loop
* conditional since we need to check before and after
--
2.25.1
This moves STREAM specific data receive logic to dedicated function:
'__vsock_stream_recvmsg()', while checks that will be same for both
types of socket are in shared function: 'vsock_connectible_recvmsg()'.
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 116 ++++++++++++++++++++++-----------------
1 file changed, 67 insertions(+), 49 deletions(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 6cf7bb977aa1..d277dc1cdbdf 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1894,65 +1894,22 @@ static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
return data;
}
-static int
-vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
- int flags)
+static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
{
- struct sock *sk;
- struct vsock_sock *vsk;
+ struct vsock_transport_recv_notify_data recv_data;
const struct vsock_transport *transport;
- int err;
- size_t target;
+ struct vsock_sock *vsk;
ssize_t copied;
+ size_t target;
long timeout;
- struct vsock_transport_recv_notify_data recv_data;
+ int err;
DEFINE_WAIT(wait);
- sk = sock->sk;
vsk = vsock_sk(sk);
- err = 0;
-
- lock_sock(sk);
-
transport = vsk->transport;
- if (!transport || sk->sk_state != TCP_ESTABLISHED) {
- /* Recvmsg is supposed to return 0 if a peer performs an
- * orderly shutdown. Differentiate between that case and when a
- * peer has not connected or a local shutdown occured with the
- * SOCK_DONE flag.
- */
- if (sock_flag(sk, SOCK_DONE))
- err = 0;
- else
- err = -ENOTCONN;
-
- goto out;
- }
-
- if (flags & MSG_OOB) {
- err = -EOPNOTSUPP;
- goto out;
- }
-
- /* We don't check peer_shutdown flag here since peer may actually shut
- * down, but there can be data in the queue that a local socket can
- * receive.
- */
- if (sk->sk_shutdown & RCV_SHUTDOWN) {
- err = 0;
- goto out;
- }
-
- /* It is valid on Linux to pass in a zero-length receive buffer. This
- * is not an error. We may as well bail out now.
- */
- if (!len) {
- err = 0;
- goto out;
- }
-
/* We must not copy less than target bytes into the user's buffer
* before returning successfully, so we wait for the consume queue to
* have that much data to consume before dequeueing. Note that this
@@ -2011,6 +1968,67 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
if (copied > 0)
err = copied;
+out:
+ return err;
+}
+
+static int
+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+ int flags)
+{
+ struct sock *sk;
+ struct vsock_sock *vsk;
+ const struct vsock_transport *transport;
+ int err;
+
+ DEFINE_WAIT(wait);
+
+ sk = sock->sk;
+ vsk = vsock_sk(sk);
+ err = 0;
+
+ lock_sock(sk);
+
+ transport = vsk->transport;
+
+ if (!transport || sk->sk_state != TCP_ESTABLISHED) {
+ /* Recvmsg is supposed to return 0 if a peer performs an
+ * orderly shutdown. Differentiate between that case and when a
+ * peer has not connected or a local shutdown occurred with the
+ * SOCK_DONE flag.
+ */
+ if (sock_flag(sk, SOCK_DONE))
+ err = 0;
+ else
+ err = -ENOTCONN;
+
+ goto out;
+ }
+
+ if (flags & MSG_OOB) {
+ err = -EOPNOTSUPP;
+ goto out;
+ }
+
+ /* We don't check peer_shutdown flag here since peer may actually shut
+ * down, but there can be data in the queue that a local socket can
+ * receive.
+ */
+ if (sk->sk_shutdown & RCV_SHUTDOWN) {
+ err = 0;
+ goto out;
+ }
+
+ /* It is valid on Linux to pass in a zero-length receive buffer. This
+ * is not an error. We may as well bail out now.
+ */
+ if (!len) {
+ err = 0;
+ goto out;
+ }
+
+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
+
out:
release_sock(sk);
return err;
--
2.25.1
This prepares af_vsock.c for SEQPACKET support: some functions such
as setsockopt(), getsockopt(), connect(), recvmsg(), sendmsg() are
shared between both types of sockets, so rename them in general
manner.
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 64 +++++++++++++++++++++-------------------
1 file changed, 34 insertions(+), 30 deletions(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 5546710d8ac1..656370e11707 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -604,8 +604,8 @@ static void vsock_pending_work(struct work_struct *work)
/**** SOCKET OPERATIONS ****/
-static int __vsock_bind_stream(struct vsock_sock *vsk,
- struct sockaddr_vm *addr)
+static int __vsock_bind_connectible(struct vsock_sock *vsk,
+ struct sockaddr_vm *addr)
{
static u32 port;
struct sockaddr_vm new_addr;
@@ -685,7 +685,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
switch (sk->sk_socket->type) {
case SOCK_STREAM:
spin_lock_bh(&vsock_table_lock);
- retval = __vsock_bind_stream(vsk, addr);
+ retval = __vsock_bind_connectible(vsk, addr);
spin_unlock_bh(&vsock_table_lock);
break;
@@ -767,6 +767,11 @@ static struct sock *__vsock_create(struct net *net,
return sk;
}
+static bool sock_type_connectible(u16 type)
+{
+ return type == SOCK_STREAM;
+}
+
static void __vsock_release(struct sock *sk, int level)
{
if (sk) {
@@ -785,7 +790,7 @@ static void __vsock_release(struct sock *sk, int level)
if (vsk->transport)
vsk->transport->release(vsk);
- else if (sk->sk_type == SOCK_STREAM)
+ else if (sock_type_connectible(sk->sk_type))
vsock_remove_sock(vsk);
sock_orphan(sk);
@@ -947,7 +952,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
lock_sock(sk);
if (sock->state == SS_UNCONNECTED) {
err = -ENOTCONN;
- if (sk->sk_type == SOCK_STREAM)
+ if (sock_type_connectible(sk->sk_type))
goto out;
} else {
sock->state = SS_DISCONNECTING;
@@ -960,7 +965,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
sk->sk_shutdown |= mode;
sk->sk_state_change(sk);
- if (sk->sk_type == SOCK_STREAM) {
+ if (sock_type_connectible(sk->sk_type)) {
sock_reset_flag(sk, SOCK_DONE);
vsock_send_shutdown(sk, mode);
}
@@ -1015,7 +1020,7 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
if (!(sk->sk_shutdown & SEND_SHUTDOWN))
mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;
- } else if (sock->type == SOCK_STREAM) {
+ } else if (sock_type_connectible(sk->sk_type)) {
const struct vsock_transport *transport;
lock_sock(sk);
@@ -1262,8 +1267,8 @@ static void vsock_connect_timeout(struct work_struct *work)
sock_put(sk);
}
-static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
- int addr_len, int flags)
+static int vsock_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
{
int err;
struct sock *sk;
@@ -1413,7 +1418,7 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
lock_sock(listener);
- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sock->type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1490,7 +1495,7 @@ static int vsock_listen(struct socket *sock, int backlog)
lock_sock(sk);
- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sk->sk_type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1534,11 +1539,11 @@ static void vsock_update_buffer_size(struct vsock_sock *vsk,
vsk->buffer_size = val;
}
-static int vsock_stream_setsockopt(struct socket *sock,
- int level,
- int optname,
- sockptr_t optval,
- unsigned int optlen)
+static int vsock_connectible_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
{
int err;
struct sock *sk;
@@ -1616,10 +1621,10 @@ static int vsock_stream_setsockopt(struct socket *sock,
return err;
}
-static int vsock_stream_getsockopt(struct socket *sock,
- int level, int optname,
- char __user *optval,
- int __user *optlen)
+static int vsock_connectible_getsockopt(struct socket *sock,
+ int level, int optname,
+ char __user *optval,
+ int __user *optlen)
{
int err;
int len;
@@ -1687,8 +1692,8 @@ static int vsock_stream_getsockopt(struct socket *sock,
return 0;
}
-static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
- size_t len)
+static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
{
struct sock *sk;
struct vsock_sock *vsk;
@@ -1827,10 +1832,9 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
return err;
}
-
static int
-vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
- int flags)
+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+ int flags)
{
struct sock *sk;
struct vsock_sock *vsk;
@@ -2006,7 +2010,7 @@ static const struct proto_ops vsock_stream_ops = {
.owner = THIS_MODULE,
.release = vsock_release,
.bind = vsock_bind,
- .connect = vsock_stream_connect,
+ .connect = vsock_connect,
.socketpair = sock_no_socketpair,
.accept = vsock_accept,
.getname = vsock_getname,
@@ -2014,10 +2018,10 @@ static const struct proto_ops vsock_stream_ops = {
.ioctl = sock_no_ioctl,
.listen = vsock_listen,
.shutdown = vsock_shutdown,
- .setsockopt = vsock_stream_setsockopt,
- .getsockopt = vsock_stream_getsockopt,
- .sendmsg = vsock_stream_sendmsg,
- .recvmsg = vsock_stream_recvmsg,
+ .setsockopt = vsock_connectible_setsockopt,
+ .getsockopt = vsock_connectible_getsockopt,
+ .sendmsg = vsock_connectible_sendmsg,
+ .recvmsg = vsock_connectible_recvmsg,
.mmap = sock_no_mmap,
.sendpage = sock_no_sendpage,
};
--
2.25.1
This does rest of SOCK_SEQPACKET support:
1) Adds socket ops for SEQPACKET type.
2) Allows to create socket with SEQPACKET type.
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 36 +++++++++++++++++++++++++++++++++++-
1 file changed, 35 insertions(+), 1 deletion(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index f352cd9d91ce..f4b02c6d35d1 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -452,6 +452,7 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
new_transport = transport_dgram;
break;
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
if (vsock_use_local_transport(remote_cid))
new_transport = transport_local;
else if (remote_cid <= VMADDR_CID_HOST || !transport_h2g ||
@@ -484,6 +485,14 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
if (!new_transport || !try_module_get(new_transport->module))
return -ENODEV;
+ if (sk->sk_type == SOCK_SEQPACKET) {
+ if (!new_transport->seqpacket_seq_send_len ||
+ !new_transport->seqpacket_seq_send_eor ||
+ !new_transport->seqpacket_seq_get_len ||
+ !new_transport->seqpacket_dequeue)
+ return -ESOCKTNOSUPPORT;
+ }
+
ret = new_transport->init(vsk, psk);
if (ret) {
module_put(new_transport->module);
@@ -684,6 +693,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
switch (sk->sk_socket->type) {
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
spin_lock_bh(&vsock_table_lock);
retval = __vsock_bind_connectible(vsk, addr);
spin_unlock_bh(&vsock_table_lock);
@@ -769,7 +779,7 @@ static struct sock *__vsock_create(struct net *net,
static bool sock_type_connectible(u16 type)
{
- return type == SOCK_STREAM;
+ return (type == SOCK_STREAM) || (type == SOCK_SEQPACKET);
}
static void __vsock_release(struct sock *sk, int level)
@@ -2191,6 +2201,27 @@ static const struct proto_ops vsock_stream_ops = {
.sendpage = sock_no_sendpage,
};
+static const struct proto_ops vsock_seqpacket_ops = {
+ .family = PF_VSOCK,
+ .owner = THIS_MODULE,
+ .release = vsock_release,
+ .bind = vsock_bind,
+ .connect = vsock_connect,
+ .socketpair = sock_no_socketpair,
+ .accept = vsock_accept,
+ .getname = vsock_getname,
+ .poll = vsock_poll,
+ .ioctl = sock_no_ioctl,
+ .listen = vsock_listen,
+ .shutdown = vsock_shutdown,
+ .setsockopt = vsock_connectible_setsockopt,
+ .getsockopt = vsock_connectible_getsockopt,
+ .sendmsg = vsock_connectible_sendmsg,
+ .recvmsg = vsock_connectible_recvmsg,
+ .mmap = sock_no_mmap,
+ .sendpage = sock_no_sendpage,
+};
+
static int vsock_create(struct net *net, struct socket *sock,
int protocol, int kern)
{
@@ -2211,6 +2242,9 @@ static int vsock_create(struct net *net, struct socket *sock,
case SOCK_STREAM:
sock->ops = &vsock_stream_ops;
break;
+ case SOCK_SEQPACKET:
+ sock->ops = &vsock_seqpacket_ops;
+ break;
default:
return -ESOCKTNOSUPPORT;
}
--
2.25.1
This adds receive loop for SEQPACKET. It looks like receive loop for
STREAM, but there is a little bit difference:
1) It doesn't call notify callbacks.
2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
there is no sense for these values in SEQPACKET case.
3) It waits until whole record is received or error is found during
receiving.
4) It processes and sets 'MSG_TRUNC' flag.
So to avoid extra conditions for two types of socket inside one loop, two
independent functions were created.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/net/af_vsock.h | 5 +++
net/vmw_vsock/af_vsock.c | 97 +++++++++++++++++++++++++++++++++++++++-
2 files changed, 101 insertions(+), 1 deletion(-)
diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index b1c717286993..01563338cc03 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -135,6 +135,11 @@ struct vsock_transport {
bool (*stream_is_active)(struct vsock_sock *);
bool (*stream_allow)(u32 cid, u32 port);
+ /* SEQ_PACKET. */
+ size_t (*seqpacket_seq_get_len)(struct vsock_sock *vsk);
+ int (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
+ int flags, bool *msg_ready);
+
/* Notification. */
int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index d277dc1cdbdf..b754927a556a 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1972,6 +1972,98 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
return err;
}
+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
+{
+ const struct vsock_transport *transport;
+ const struct iovec *orig_iov;
+ unsigned long orig_nr_segs;
+ bool msg_ready;
+ struct vsock_sock *vsk;
+ size_t record_len;
+ long timeout;
+ int err = 0;
+ DEFINE_WAIT(wait);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ orig_nr_segs = msg->msg_iter.nr_segs;
+ orig_iov = msg->msg_iter.iov;
+ msg_ready = false;
+ record_len = 0;
+
+ while (1) {
+ err = vsock_wait_data(sk, &wait, timeout, NULL, 0);
+
+ if (err <= 0) {
+ /* In case of any loop break(timeout, signal
+ * interrupt or shutdown), we report user that
+ * nothing was copied.
+ */
+ err = 0;
+ break;
+ }
+
+ if (record_len == 0) {
+ record_len =
+ transport->seqpacket_seq_get_len(vsk);
+
+ if (record_len == 0)
+ continue;
+ }
+
+ err = transport->seqpacket_dequeue(vsk, msg,
+ flags, &msg_ready);
+
+ if (err < 0) {
+ if (err == -EAGAIN) {
+ iov_iter_init(&msg->msg_iter, READ,
+ orig_iov, orig_nr_segs,
+ len);
+ /* Clear 'MSG_EOR' here, because dequeue
+ * callback above set it again if it was
+ * set by sender. This 'MSG_EOR' is from
+ * dropped record.
+ */
+ msg->msg_flags &= ~MSG_EOR;
+ record_len = 0;
+ continue;
+ }
+
+ err = -ENOMEM;
+ break;
+ }
+
+ if (msg_ready)
+ break;
+ }
+
+ if (sk->sk_err)
+ err = -sk->sk_err;
+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
+ err = 0;
+
+ if (msg_ready) {
+ /* User sets MSG_TRUNC, so return real length of
+ * packet.
+ */
+ if (flags & MSG_TRUNC)
+ err = record_len;
+ else
+ err = len - msg->msg_iter.count;
+
+ /* Always set MSG_TRUNC if real length of packet is
+ * bigger than user's buffer.
+ */
+ if (record_len > len)
+ msg->msg_flags |= MSG_TRUNC;
+ }
+
+ return err;
+}
+
static int
vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
int flags)
@@ -2027,7 +2119,10 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
goto out;
}
- err = __vsock_stream_recvmsg(sk, msg, len, flags);
+ if (sk->sk_type == SOCK_STREAM)
+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
+ else
+ err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
out:
release_sock(sk);
--
2.25.1
This adds some logic to current stream enqueue function for SEQPACKET
support:
1) Send record's begin/end marker.
2) Return value from enqueue function is whole record length or error
for SOCK_SEQPACKET.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/net/af_vsock.h | 2 ++
net/vmw_vsock/af_vsock.c | 22 ++++++++++++++++++++--
2 files changed, 22 insertions(+), 2 deletions(-)
diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index 6fbe88306403..0d4dd9386fb1 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -136,6 +136,8 @@ struct vsock_transport {
bool (*stream_allow)(u32 cid, u32 port);
/* SEQ_PACKET. */
+ int (*seqpacket_seq_send_len)(struct vsock_sock *vsk, size_t len, int flags);
+ int (*seqpacket_seq_send_eor)(struct vsock_sock *vsk, int flags);
size_t (*seqpacket_seq_get_len)(struct vsock_sock *vsk);
int (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
int flags, bool *msg_ready);
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 09b377422b1e..f352cd9d91ce 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1801,6 +1801,12 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
if (err < 0)
goto out;
+ if (sk->sk_type == SOCK_SEQPACKET) {
+ err = transport->seqpacket_seq_send_len(vsk, len, msg->msg_flags);
+ if (err < 0)
+ goto out;
+ }
+
while (total_written < len) {
ssize_t written;
@@ -1847,9 +1853,21 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
}
+ if (sk->sk_type == SOCK_SEQPACKET) {
+ err = transport->seqpacket_seq_send_eor(vsk, msg->msg_flags);
+ if (err < 0)
+ goto out;
+ }
+
out_err:
- if (total_written > 0)
- err = total_written;
+ if (total_written > 0) {
+ /* Return number of written bytes only if:
+ * 1) SOCK_STREAM socket.
+ * 2) SOCK_SEQPACKET socket when whole buffer is sent.
+ */
+ if (sk->sk_type == SOCK_STREAM || total_written == len)
+ err = total_written;
+ }
out:
release_sock(sk);
return err;
--
2.25.1
This replaces 'stream' to 'connect oriented' in comments as SEQPACKET is
also connect oriented.
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 31 +++++++++++++++++--------------
1 file changed, 17 insertions(+), 14 deletions(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index f4b02c6d35d1..f1bf6a5ad15e 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -415,8 +415,8 @@ static void vsock_deassign_transport(struct vsock_sock *vsk)
/* Assign a transport to a socket and call the .init transport callback.
*
- * Note: for stream socket this must be called when vsk->remote_addr is set
- * (e.g. during the connect() or when a connection request on a listener
+ * Note: for connection oriented socket this must be called when vsk->remote_addr
+ * is set (e.g. during the connect() or when a connection request on a listener
* socket is received).
* The vsk->remote_addr is used to decide which transport to use:
* - remote CID == VMADDR_CID_LOCAL or g2h->local_cid or VMADDR_CID_HOST if
@@ -470,10 +470,10 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
return 0;
/* transport->release() must be called with sock lock acquired.
- * This path can only be taken during vsock_stream_connect(),
- * where we have already held the sock lock.
- * In the other cases, this function is called on a new socket
- * which is not assigned to any transport.
+ * This path can only be taken during vsock_connect(), where we
+ * have already held the sock lock. In the other cases, this
+ * function is called on a new socket which is not assigned to
+ * any transport.
*/
vsk->transport->release(vsk);
vsock_deassign_transport(vsk);
@@ -658,9 +658,10 @@ static int __vsock_bind_connectible(struct vsock_sock *vsk,
vsock_addr_init(&vsk->local_addr, new_addr.svm_cid, new_addr.svm_port);
- /* Remove stream sockets from the unbound list and add them to the hash
- * table for easy lookup by its address. The unbound list is simply an
- * extra entry at the end of the hash table, a trick used by AF_UNIX.
+ /* Remove connection oriented sockets from the unbound list and add them
+ * to the hash table for easy lookup by its address. The unbound list
+ * is simply an extra entry at the end of the hash table, a trick used
+ * by AF_UNIX.
*/
__vsock_remove_bound(vsk);
__vsock_insert_bound(vsock_bound_sockets(&vsk->local_addr), vsk);
@@ -951,10 +952,10 @@ static int vsock_shutdown(struct socket *sock, int mode)
if ((mode & ~SHUTDOWN_MASK) || !mode)
return -EINVAL;
- /* If this is a STREAM socket and it is not connected then bail out
- * immediately. If it is a DGRAM socket then we must first kick the
- * socket so that it wakes up from any sleeping calls, for example
- * recv(), and then afterwards return the error.
+ /* If this is a connection oriented socket and it is not connected then
+ * bail out immediately. If it is a DGRAM socket then we must first
+ * kick the socket so that it wakes up from any sleeping calls, for
+ * example recv(), and then afterwards return the error.
*/
sk = sock->sk;
@@ -1783,7 +1784,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
transport = vsk->transport;
- /* Callers should not provide a destination with stream sockets. */
+ /* Callers should not provide a destination with connection oriented
+ * sockets.
+ */
if (msg->msg_namelen) {
err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
goto out;
--
2.25.1
'virtio_transport_send_credit_update()' has some extra args:
1) 'type' may be set in 'virtio_transport_send_pkt_info()' using type
of socket.
2) This function is static and 'hdr' arg was always NULL.
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/virtio_transport_common.c | 15 ++++-----------
1 file changed, 4 insertions(+), 11 deletions(-)
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 1c9d71ca5e8e..833104b71a1c 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -271,13 +271,10 @@ void virtio_transport_put_credit(struct virtio_vsock_sock *vvs, u32 credit)
}
EXPORT_SYMBOL_GPL(virtio_transport_put_credit);
-static int virtio_transport_send_credit_update(struct vsock_sock *vsk,
- int type,
- struct virtio_vsock_hdr *hdr)
+static int virtio_transport_send_credit_update(struct vsock_sock *vsk)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_CREDIT_UPDATE,
- .type = type,
.vsk = vsk,
};
@@ -385,11 +382,8 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
* messages, we set the limit to a high value. TODO: experiment
* with different values.
*/
- if (free_space < VIRTIO_VSOCK_MAX_PKT_BUF_SIZE) {
- virtio_transport_send_credit_update(vsk,
- VIRTIO_VSOCK_TYPE_STREAM,
- NULL);
- }
+ if (free_space < VIRTIO_VSOCK_MAX_PKT_BUF_SIZE)
+ virtio_transport_send_credit_update(vsk);
return total;
@@ -498,8 +492,7 @@ void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)
vvs->buf_alloc = *val;
- virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_STREAM,
- NULL);
+ virtio_transport_send_credit_update(vsk);
}
EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);
--
2.25.1
This adds transport callback and it's logic for SEQPACKET dequeue.
Callback fetches RW packets from rx queue of socket until whole record
is copied(if user's buffer is full, user is not woken up). This is done
to not stall sender, because if we wake up user and it leaves syscall,
nobody will send credit update for rest of record, and sender will wait
for next enter of read syscall at receiver's side. So if user buffer is
full, we just send credit update and drop data. If during copy SEQ_BEGIN
was found(and not all data was copied), copying is restarted by reset
user's iov iterator(previous unfinished data is dropped).
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 10 +++
include/uapi/linux/virtio_vsock.h | 16 ++++
net/vmw_vsock/virtio_transport_common.c | 114 ++++++++++++++++++++++++
3 files changed, 140 insertions(+)
diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index dc636b727179..003d06ae4a85 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -36,6 +36,11 @@ struct virtio_vsock_sock {
u32 rx_bytes;
u32 buf_alloc;
struct list_head rx_queue;
+
+ /* For SOCK_SEQPACKET */
+ u32 user_read_seq_len;
+ u32 user_read_copied;
+ u32 curr_rx_msg_cnt;
};
struct virtio_vsock_pkt {
@@ -80,6 +85,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);
+int
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ int flags,
+ bool *msg_ready);
s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
index 1d57ed3d84d2..cf9c165e5cca 100644
--- a/include/uapi/linux/virtio_vsock.h
+++ b/include/uapi/linux/virtio_vsock.h
@@ -63,8 +63,14 @@ struct virtio_vsock_hdr {
__le32 fwd_cnt;
} __attribute__((packed));
+struct virtio_vsock_seq_hdr {
+ __le32 msg_cnt;
+ __le32 msg_len;
+} __attribute__((packed));
+
enum virtio_vsock_type {
VIRTIO_VSOCK_TYPE_STREAM = 1,
+ VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
};
enum virtio_vsock_op {
@@ -83,6 +89,11 @@ enum virtio_vsock_op {
VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
/* Request the peer to send the credit info to us */
VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
+
+ /* Record begin for SOCK_SEQPACKET */
+ VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
+ /* Record end for SOCK_SEQPACKET */
+ VIRTIO_VSOCK_OP_SEQ_END = 9,
};
/* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
@@ -91,4 +102,9 @@ enum virtio_vsock_shutdown {
VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
};
+/* VIRTIO_VSOCK_OP_RW flags values */
+enum virtio_vsock_rw {
+ VIRTIO_VSOCK_RW_EOR = 1,
+};
+
#endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 833104b71a1c..d8ec2dfa2315 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -393,6 +393,108 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
return err;
}
+static inline void virtio_transport_remove_pkt(struct virtio_vsock_pkt *pkt)
+{
+ list_del(&pkt->list);
+ virtio_transport_free_pkt(pkt);
+}
+
+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ bool *msg_ready)
+{
+ struct virtio_vsock_sock *vvs = vsk->trans;
+ struct virtio_vsock_pkt *pkt;
+ int err = 0;
+ size_t user_buf_len = msg->msg_iter.count;
+
+ *msg_ready = false;
+ spin_lock_bh(&vvs->rx_lock);
+
+ while (!*msg_ready && !list_empty(&vvs->rx_queue) && !err) {
+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+ switch (le16_to_cpu(pkt->hdr.op)) {
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
+ /* Unexpected 'SEQ_BEGIN' during record copy:
+ * Leave receive loop, 'EAGAIN' will restart it from
+ * outer receive loop, packet is still in queue and
+ * counters are cleared. So in next loop enter,
+ * 'SEQ_BEGIN' will be dequeued first. User's iov
+ * iterator will be reset in outer loop. Also
+ * send credit update, because some bytes could be
+ * copied. User will never see unfinished record.
+ */
+ err = -EAGAIN;
+ break;
+ }
+ case VIRTIO_VSOCK_OP_SEQ_END: {
+ struct virtio_vsock_seq_hdr *seq_hdr;
+
+ seq_hdr = (struct virtio_vsock_seq_hdr *)pkt->buf;
+ /* First check that whole record is received. */
+
+ if (vvs->user_read_copied != vvs->user_read_seq_len ||
+ (le32_to_cpu(seq_hdr->msg_cnt) - vvs->curr_rx_msg_cnt) != 1) {
+ /* Tail of current record and head of next missed,
+ * so this EOR is from next record. Restart receive.
+ * Current record will be dropped, next headless will
+ * be dropped on next attempt to get record length.
+ */
+ err = -EAGAIN;
+ } else {
+ /* Success. */
+ *msg_ready = true;
+ }
+
+ break;
+ }
+ case VIRTIO_VSOCK_OP_RW: {
+ size_t bytes_to_copy;
+ size_t pkt_len;
+
+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
+ bytes_to_copy = min(user_buf_len, pkt_len);
+
+ /* sk_lock is held by caller so no one else can dequeue.
+ * Unlock rx_lock since memcpy_to_msg() may sleep.
+ */
+ spin_unlock_bh(&vvs->rx_lock);
+
+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
+ spin_lock_bh(&vvs->rx_lock);
+ err = -EINVAL;
+ break;
+ }
+
+ spin_lock_bh(&vvs->rx_lock);
+ user_buf_len -= bytes_to_copy;
+ vvs->user_read_copied += pkt_len;
+
+ if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_RW_EOR)
+ msg->msg_flags |= MSG_EOR;
+ break;
+ }
+ default:
+ ;
+ }
+
+ /* For unexpected 'SEQ_BEGIN', keep such packet in queue,
+ * but drop any other type of packet.
+ */
+ if (le16_to_cpu(pkt->hdr.op) != VIRTIO_VSOCK_OP_SEQ_BEGIN) {
+ virtio_transport_dec_rx_pkt(vvs, pkt);
+ virtio_transport_remove_pkt(pkt);
+ }
+ }
+
+ spin_unlock_bh(&vvs->rx_lock);
+
+ virtio_transport_send_credit_update(vsk);
+
+ return err;
+}
+
ssize_t
virtio_transport_stream_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
@@ -405,6 +507,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
}
EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
+int
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ int flags, bool *msg_ready)
+{
+ if (flags & MSG_PEEK)
+ return -EOPNOTSUPP;
+
+ return virtio_transport_seqpacket_do_dequeue(vsk, msg, msg_ready);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
+
int
virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
--
2.25.1
This adds rest of logic for SEQPACKET:
1) SEQPACKET specific functions which send SEQ_BEGIN/SEQ_END.
Note that both functions may sleep to wait enough space for
SEQPACKET header.
2) SEQ_BEGIN/SEQ_END in TAP packet capture.
3) Send SHUTDOWN on socket close for SEQPACKET type.
4) Set SEQPACKET packet type during send.
5) Set MSG_EOR in flags for SEQPACKET during send.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 3 ++
net/vmw_vsock/virtio_transport_common.c | 67 ++++++++++++++++++++++++-
2 files changed, 68 insertions(+), 2 deletions(-)
diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index 022667d57884..bf09d9aafa20 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -41,6 +41,7 @@ struct virtio_vsock_sock {
u32 user_read_seq_len;
u32 user_read_copied;
u32 curr_rx_msg_cnt;
+ u32 next_tx_msg_cnt;
};
struct virtio_vsock_pkt {
@@ -85,6 +86,8 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);
+int virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len, int flags);
+int virtio_transport_seqpacket_seq_send_eor(struct vsock_sock *vsk, int flags);
size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
int
virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 3ca0009c553e..8431d0a891ed 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -139,6 +139,8 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
break;
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN:
+ case VIRTIO_VSOCK_OP_SEQ_END:
hdr->op = cpu_to_le16(AF_VSOCK_OP_CONTROL);
break;
default:
@@ -187,7 +189,12 @@ static int virtio_transport_send_pkt_info(struct vsock_sock *vsk,
struct virtio_vsock_pkt *pkt;
u32 pkt_len = info->pkt_len;
- info->type = VIRTIO_VSOCK_TYPE_STREAM;
+ info->type = virtio_transport_get_type(sk_vsock(vsk));
+
+ if (info->type == VIRTIO_VSOCK_TYPE_SEQPACKET &&
+ info->msg &&
+ info->msg->msg_flags & MSG_EOR)
+ info->flags |= VIRTIO_VSOCK_RW_EOR;
t_ops = virtio_transport_get_ops(vsk);
if (unlikely(!t_ops))
@@ -401,6 +408,62 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
return err;
}
+static int virtio_transport_seqpacket_send_ctrl(struct vsock_sock *vsk,
+ int type,
+ size_t len,
+ int flags)
+{
+ struct virtio_vsock_sock *vvs = vsk->trans;
+ struct virtio_vsock_pkt_info info = {
+ .op = type,
+ .vsk = vsk,
+ .pkt_len = sizeof(struct virtio_vsock_seq_hdr)
+ };
+
+ struct virtio_vsock_seq_hdr seq_hdr = {
+ .msg_cnt = cpu_to_le32(vvs->next_tx_msg_cnt),
+ .msg_len = cpu_to_le32(len)
+ };
+
+ struct kvec seq_hdr_kiov = {
+ .iov_base = (void *)&seq_hdr,
+ .iov_len = sizeof(struct virtio_vsock_seq_hdr)
+ };
+
+ struct msghdr msg = {0};
+
+ //XXX: do we need 'vsock_transport_send_notify_data' pointer?
+ if (vsock_wait_space(sk_vsock(vsk),
+ sizeof(struct virtio_vsock_seq_hdr),
+ flags, NULL))
+ return -1;
+
+ iov_iter_kvec(&msg.msg_iter, WRITE, &seq_hdr_kiov, 1, sizeof(seq_hdr));
+
+ info.msg = &msg;
+ vvs->next_tx_msg_cnt++;
+
+ return virtio_transport_send_pkt_info(vsk, &info);
+}
+
+int virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len, int flags)
+{
+ return virtio_transport_seqpacket_send_ctrl(vsk,
+ VIRTIO_VSOCK_OP_SEQ_BEGIN,
+ len,
+ flags);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_send_len);
+
+int virtio_transport_seqpacket_seq_send_eor(struct vsock_sock *vsk, int flags)
+{
+ return virtio_transport_seqpacket_send_ctrl(vsk,
+ VIRTIO_VSOCK_OP_SEQ_END,
+ 0,
+ flags);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_send_eor);
+
static inline void virtio_transport_remove_pkt(struct virtio_vsock_pkt *pkt)
{
list_del(&pkt->list);
@@ -999,7 +1062,7 @@ void virtio_transport_release(struct vsock_sock *vsk)
struct sock *sk = &vsk->sk;
bool remove_sock = true;
- if (sk->sk_type == SOCK_STREAM)
+ if (sk->sk_type == SOCK_STREAM || sk->sk_type == SOCK_SEQPACKET)
remove_sock = virtio_transport_close(vsk);
list_for_each_entry_safe(pkt, tmp, &vvs->rx_queue, list) {
--
2.25.1
This moves passing type of packet from 'info' srtucture to send
function. There is no sense to set type of packet which differs
from type of socket, and since at current time only stream type
is supported, so force to use this type.
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/virtio_transport_common.c | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index e4370b1b7494..1c9d71ca5e8e 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -179,6 +179,8 @@ static int virtio_transport_send_pkt_info(struct vsock_sock *vsk,
struct virtio_vsock_pkt *pkt;
u32 pkt_len = info->pkt_len;
+ info->type = VIRTIO_VSOCK_TYPE_STREAM;
+
t_ops = virtio_transport_get_ops(vsk);
if (unlikely(!t_ops))
return -EFAULT;
@@ -624,7 +626,6 @@ int virtio_transport_connect(struct vsock_sock *vsk)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_REQUEST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.vsk = vsk,
};
@@ -636,7 +637,6 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_SHUTDOWN,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.flags = (mode & RCV_SHUTDOWN ?
VIRTIO_VSOCK_SHUTDOWN_RCV : 0) |
(mode & SEND_SHUTDOWN ?
@@ -665,7 +665,6 @@ virtio_transport_stream_enqueue(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RW,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.msg = msg,
.pkt_len = len,
.vsk = vsk,
@@ -688,7 +687,6 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.reply = !!pkt,
.vsk = vsk,
};
@@ -990,7 +988,6 @@ virtio_transport_send_response(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RESPONSE,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.remote_cid = le64_to_cpu(pkt->hdr.src_cid),
.remote_port = le32_to_cpu(pkt->hdr.src_port),
.reply = true,
--
2.25.1
This modifies current receive logic for SEQPACKET support:
1) Inserts 'SEQ_BEGIN' packet to socket's rx queue.
2) Inserts 'RW' packet to socket's rx queue, but without merging with
buffer of last packet in queue.
3) Performs check for packet and socket types on receive(if mismatch,
then reset connection).
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/virtio_transport_common.c | 63 +++++++++++++++++--------
1 file changed, 44 insertions(+), 19 deletions(-)
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index e9a2de72ebbf..3ca0009c553e 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -165,6 +165,14 @@ void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
}
EXPORT_SYMBOL_GPL(virtio_transport_deliver_tap_pkt);
+static u16 virtio_transport_get_type(struct sock *sk)
+{
+ if (sk->sk_type == SOCK_STREAM)
+ return VIRTIO_VSOCK_TYPE_STREAM;
+ else
+ return VIRTIO_VSOCK_TYPE_SEQPACKET;
+}
+
/* This function can only be used on connecting/connected sockets,
* since a socket assigned to a transport is required.
*
@@ -1060,25 +1068,27 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
goto out;
}
- /* Try to copy small packets into the buffer of last packet queued,
- * to avoid wasting memory queueing the entire buffer with a small
- * payload.
- */
- if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
- struct virtio_vsock_pkt *last_pkt;
+ if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM) {
+ /* Try to copy small packets into the buffer of last packet queued,
+ * to avoid wasting memory queueing the entire buffer with a small
+ * payload.
+ */
+ if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
+ struct virtio_vsock_pkt *last_pkt;
- last_pkt = list_last_entry(&vvs->rx_queue,
- struct virtio_vsock_pkt, list);
+ last_pkt = list_last_entry(&vvs->rx_queue,
+ struct virtio_vsock_pkt, list);
- /* If there is space in the last packet queued, we copy the
- * new packet in its buffer.
- */
- if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
- memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
- pkt->len);
- last_pkt->len += pkt->len;
- free_pkt = true;
- goto out;
+ /* If there is space in the last packet queued, we copy the
+ * new packet in its buffer.
+ */
+ if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
+ memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
+ pkt->len);
+ last_pkt->len += pkt->len;
+ free_pkt = true;
+ goto out;
+ }
}
}
@@ -1098,9 +1108,13 @@ virtio_transport_recv_connected(struct sock *sk,
int err = 0;
switch (le16_to_cpu(pkt->hdr.op)) {
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN:
+ case VIRTIO_VSOCK_OP_SEQ_END:
case VIRTIO_VSOCK_OP_RW:
virtio_transport_recv_enqueue(vsk, pkt);
- sk->sk_data_ready(sk);
+
+ if (le16_to_cpu(pkt->hdr.op) != VIRTIO_VSOCK_OP_SEQ_BEGIN)
+ sk->sk_data_ready(sk);
return err;
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
sk->sk_write_space(sk);
@@ -1243,6 +1257,12 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
return 0;
}
+static bool virtio_transport_valid_type(u16 type)
+{
+ return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
+ (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
+}
+
/* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
* lock.
*/
@@ -1268,7 +1288,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
le32_to_cpu(pkt->hdr.buf_alloc),
le32_to_cpu(pkt->hdr.fwd_cnt));
- if (le16_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_STREAM) {
+ if (!virtio_transport_valid_type(le16_to_cpu(pkt->hdr.type))) {
(void)virtio_transport_reset_no_sock(t, pkt);
goto free_pkt;
}
@@ -1285,6 +1305,11 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
}
}
+ if (virtio_transport_get_type(sk) != le16_to_cpu(pkt->hdr.type)) {
+ (void)virtio_transport_reset_no_sock(t, pkt);
+ goto free_pkt;
+ }
+
vsk = vsock_sk(sk);
lock_sock(sk);
--
2.25.1
This adds transport callback which tries to fetch record begin marker
from socket's rx queue. It is called from af_vsock.c before reading data
packets of record.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 1 +
net/vmw_vsock/virtio_transport_common.c | 53 +++++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index 003d06ae4a85..022667d57884 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -85,6 +85,7 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);
+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
int
virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index d8ec2dfa2315..e9a2de72ebbf 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -399,6 +399,59 @@ static inline void virtio_transport_remove_pkt(struct virtio_vsock_pkt *pkt)
virtio_transport_free_pkt(pkt);
}
+static size_t virtio_transport_drop_until_seq_begin(struct virtio_vsock_sock *vvs)
+{
+ struct virtio_vsock_pkt *pkt, *n;
+ size_t bytes_dropped = 0;
+
+ list_for_each_entry_safe(pkt, n, &vvs->rx_queue, list) {
+ if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_SEQ_BEGIN)
+ break;
+
+ bytes_dropped += le32_to_cpu(pkt->hdr.len);
+ virtio_transport_dec_rx_pkt(vvs, pkt);
+ virtio_transport_remove_pkt(pkt);
+ }
+
+ return bytes_dropped;
+}
+
+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk)
+{
+ struct virtio_vsock_seq_hdr *seq_hdr;
+ struct virtio_vsock_sock *vvs;
+ struct virtio_vsock_pkt *pkt;
+ size_t bytes_dropped;
+
+ vvs = vsk->trans;
+
+ spin_lock_bh(&vvs->rx_lock);
+
+ /* Fetch all orphaned 'RW' packets and send credit update. */
+ bytes_dropped = virtio_transport_drop_until_seq_begin(vvs);
+
+ if (list_empty(&vvs->rx_queue))
+ goto out;
+
+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+ vvs->user_read_copied = 0;
+
+ seq_hdr = (struct virtio_vsock_seq_hdr *)pkt->buf;
+ vvs->user_read_seq_len = le32_to_cpu(seq_hdr->msg_len);
+ vvs->curr_rx_msg_cnt = le32_to_cpu(seq_hdr->msg_cnt);
+ virtio_transport_dec_rx_pkt(vvs, pkt);
+ virtio_transport_remove_pkt(pkt);
+out:
+ spin_unlock_bh(&vvs->rx_lock);
+
+ if (bytes_dropped)
+ virtio_transport_send_credit_update(vsk);
+
+ return vvs->user_read_seq_len;
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_get_len);
+
static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
bool *msg_ready)
--
2.25.1
This also removes ignore of non-stream type of packets.
Signed-off-by: Arseny Krasnov <[email protected]>
---
drivers/vhost/vsock.c | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index 5e78fb719602..5c86d09e36d9 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -354,8 +354,7 @@ vhost_vsock_alloc_pkt(struct vhost_virtqueue *vq,
return NULL;
}
- if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM)
- pkt->len = le32_to_cpu(pkt->hdr.len);
+ pkt->len = le32_to_cpu(pkt->hdr.len);
/* No payload */
if (!pkt->len)
@@ -424,6 +423,11 @@ static struct virtio_transport vhost_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,
+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
+ .seqpacket_seq_send_eor = virtio_transport_seqpacket_seq_send_eor,
+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
--
2.25.1
This updates current implementation for trace event of virtio vsock:
SEQPACKET socket's type, SEQPACKET specific ops and SEQPACKET 'msg_len'
and 'msg_cnt' fields are added.
Signed-off-by: Arseny Krasnov <[email protected]>
---
.../events/vsock_virtio_transport_common.h | 48 +++++++++++++++----
net/vmw_vsock/virtio_transport_common.c | 25 +++++++++-
2 files changed, 61 insertions(+), 12 deletions(-)
diff --git a/include/trace/events/vsock_virtio_transport_common.h b/include/trace/events/vsock_virtio_transport_common.h
index 6782213778be..23bc0f39c450 100644
--- a/include/trace/events/vsock_virtio_transport_common.h
+++ b/include/trace/events/vsock_virtio_transport_common.h
@@ -9,9 +9,12 @@
#include <linux/tracepoint.h>
TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_STREAM);
+TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_SEQPACKET);
#define show_type(val) \
- __print_symbolic(val, { VIRTIO_VSOCK_TYPE_STREAM, "STREAM" })
+ __print_symbolic(val, \
+ { VIRTIO_VSOCK_TYPE_STREAM, "STREAM" }, \
+ { VIRTIO_VSOCK_TYPE_SEQPACKET, "SEQPACKET" })
TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_INVALID);
TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_REQUEST);
@@ -21,6 +24,8 @@ TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_SHUTDOWN);
TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_RW);
TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_CREDIT_UPDATE);
TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_CREDIT_REQUEST);
+TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_SEQ_BEGIN);
+TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_SEQ_END);
#define show_op(val) \
__print_symbolic(val, \
@@ -31,7 +36,9 @@ TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_CREDIT_REQUEST);
{ VIRTIO_VSOCK_OP_SHUTDOWN, "SHUTDOWN" }, \
{ VIRTIO_VSOCK_OP_RW, "RW" }, \
{ VIRTIO_VSOCK_OP_CREDIT_UPDATE, "CREDIT_UPDATE" }, \
- { VIRTIO_VSOCK_OP_CREDIT_REQUEST, "CREDIT_REQUEST" })
+ { VIRTIO_VSOCK_OP_CREDIT_REQUEST, "CREDIT_REQUEST" }, \
+ { VIRTIO_VSOCK_OP_SEQ_BEGIN, "SEQ_BEGIN" }, \
+ { VIRTIO_VSOCK_OP_SEQ_END, "SEQ_END" })
TRACE_EVENT(virtio_transport_alloc_pkt,
TP_PROTO(
@@ -40,7 +47,9 @@ TRACE_EVENT(virtio_transport_alloc_pkt,
__u32 len,
__u16 type,
__u16 op,
- __u32 flags
+ __u32 flags,
+ __u32 msg_len,
+ __u32 msg_cnt
),
TP_ARGS(
src_cid, src_port,
@@ -48,7 +57,9 @@ TRACE_EVENT(virtio_transport_alloc_pkt,
len,
type,
op,
- flags
+ flags,
+ msg_len,
+ msg_cnt
),
TP_STRUCT__entry(
__field(__u32, src_cid)
@@ -59,6 +70,8 @@ TRACE_EVENT(virtio_transport_alloc_pkt,
__field(__u16, type)
__field(__u16, op)
__field(__u32, flags)
+ __field(__u32, msg_len)
+ __field(__u32, msg_cnt)
),
TP_fast_assign(
__entry->src_cid = src_cid;
@@ -69,14 +82,19 @@ TRACE_EVENT(virtio_transport_alloc_pkt,
__entry->type = type;
__entry->op = op;
__entry->flags = flags;
+ __entry->msg_len = msg_len;
+ __entry->msg_cnt = msg_cnt;
),
- TP_printk("%u:%u -> %u:%u len=%u type=%s op=%s flags=%#x",
+ TP_printk("%u:%u -> %u:%u len=%u type=%s op=%s flags=%#x "
+ "msg_len=%u msg_cnt=%u",
__entry->src_cid, __entry->src_port,
__entry->dst_cid, __entry->dst_port,
__entry->len,
show_type(__entry->type),
show_op(__entry->op),
- __entry->flags)
+ __entry->flags,
+ __entry->msg_len,
+ __entry->msg_cnt)
);
TRACE_EVENT(virtio_transport_recv_pkt,
@@ -88,7 +106,9 @@ TRACE_EVENT(virtio_transport_recv_pkt,
__u16 op,
__u32 flags,
__u32 buf_alloc,
- __u32 fwd_cnt
+ __u32 fwd_cnt,
+ __u32 msg_len,
+ __u32 msg_cnt
),
TP_ARGS(
src_cid, src_port,
@@ -98,7 +118,9 @@ TRACE_EVENT(virtio_transport_recv_pkt,
op,
flags,
buf_alloc,
- fwd_cnt
+ fwd_cnt,
+ msg_len,
+ msg_cnt
),
TP_STRUCT__entry(
__field(__u32, src_cid)
@@ -111,6 +133,8 @@ TRACE_EVENT(virtio_transport_recv_pkt,
__field(__u32, flags)
__field(__u32, buf_alloc)
__field(__u32, fwd_cnt)
+ __field(__u32, msg_len)
+ __field(__u32, msg_cnt)
),
TP_fast_assign(
__entry->src_cid = src_cid;
@@ -123,9 +147,11 @@ TRACE_EVENT(virtio_transport_recv_pkt,
__entry->flags = flags;
__entry->buf_alloc = buf_alloc;
__entry->fwd_cnt = fwd_cnt;
+ __entry->msg_len = msg_len;
+ __entry->msg_cnt = msg_cnt;
),
TP_printk("%u:%u -> %u:%u len=%u type=%s op=%s flags=%#x "
- "buf_alloc=%u fwd_cnt=%u",
+ "buf_alloc=%u fwd_cnt=%u msg_len=%u msg_cnt=%u",
__entry->src_cid, __entry->src_port,
__entry->dst_cid, __entry->dst_port,
__entry->len,
@@ -133,7 +159,9 @@ TRACE_EVENT(virtio_transport_recv_pkt,
show_op(__entry->op),
__entry->flags,
__entry->buf_alloc,
- __entry->fwd_cnt)
+ __entry->fwd_cnt,
+ __entry->msg_len,
+ __entry->msg_cnt)
);
#endif /* _TRACE_VSOCK_VIRTIO_TRANSPORT_COMMON_H */
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 8431d0a891ed..2ab5291e2bf7 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -47,6 +47,8 @@ virtio_transport_alloc_pkt(struct virtio_vsock_pkt_info *info,
{
struct virtio_vsock_pkt *pkt;
int err;
+ u32 msg_len = 0;
+ u32 msg_cnt = 0;
pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
if (!pkt)
@@ -74,6 +76,14 @@ virtio_transport_alloc_pkt(struct virtio_vsock_pkt_info *info,
err = memcpy_from_msg(pkt->buf, info->msg, len);
if (err)
goto out;
+
+ if (info->op == VIRTIO_VSOCK_OP_SEQ_BEGIN ||
+ info->op == VIRTIO_VSOCK_OP_SEQ_END) {
+ struct virtio_vsock_seq_hdr *seq_hdr = pkt->buf;
+
+ msg_len = le32_to_cpu(seq_hdr->msg_len);
+ msg_cnt = le32_to_cpu(seq_hdr->msg_cnt);
+ }
}
trace_virtio_transport_alloc_pkt(src_cid, src_port,
@@ -81,7 +91,7 @@ virtio_transport_alloc_pkt(struct virtio_vsock_pkt_info *info,
len,
info->type,
info->op,
- info->flags);
+ info->flags, msg_len, msg_cnt);
return pkt;
@@ -1336,12 +1346,22 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
struct vsock_sock *vsk;
struct sock *sk;
bool space_available;
+ u32 msg_len = 0;
+ u32 msg_cnt = 0;
vsock_addr_init(&src, le64_to_cpu(pkt->hdr.src_cid),
le32_to_cpu(pkt->hdr.src_port));
vsock_addr_init(&dst, le64_to_cpu(pkt->hdr.dst_cid),
le32_to_cpu(pkt->hdr.dst_port));
+ if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_SEQ_BEGIN ||
+ le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_SEQ_END) {
+ struct virtio_vsock_seq_hdr *seq_hdr = pkt->buf;
+
+ msg_len = le32_to_cpu(seq_hdr->msg_len);
+ msg_cnt = le32_to_cpu(seq_hdr->msg_cnt);
+ }
+
trace_virtio_transport_recv_pkt(src.svm_cid, src.svm_port,
dst.svm_cid, dst.svm_port,
le32_to_cpu(pkt->hdr.len),
@@ -1349,7 +1369,8 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
le16_to_cpu(pkt->hdr.op),
le32_to_cpu(pkt->hdr.flags),
le32_to_cpu(pkt->hdr.buf_alloc),
- le32_to_cpu(pkt->hdr.fwd_cnt));
+ le32_to_cpu(pkt->hdr.fwd_cnt),
+ msg_len, msg_cnt);
if (!virtio_transport_valid_type(le16_to_cpu(pkt->hdr.type))) {
(void)virtio_transport_reset_no_sock(t, pkt);
--
2.25.1
This adds SEQPACKET ops for loopback transport
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/vsock_loopback.c | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/net/vmw_vsock/vsock_loopback.c b/net/vmw_vsock/vsock_loopback.c
index a45f7ffca8c5..c0da94119f74 100644
--- a/net/vmw_vsock/vsock_loopback.c
+++ b/net/vmw_vsock/vsock_loopback.c
@@ -89,6 +89,11 @@ static struct virtio_transport loopback_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,
+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
+ .seqpacket_seq_send_eor = virtio_transport_seqpacket_seq_send_eor,
+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
--
2.25.1
This adds two tests of SOCK_SEQPACKET socket: both transfer data and
then test MSG_EOR and MSG_TRUNC flags. Cases for connect(), bind(),
etc. are not tested, because it is same as for stream socket.
Signed-off-by: Arseny Krasnov <[email protected]>
---
tools/testing/vsock/util.c | 32 ++++++--
tools/testing/vsock/util.h | 3 +
tools/testing/vsock/vsock_test.c | 126 +++++++++++++++++++++++++++++++
3 files changed, 156 insertions(+), 5 deletions(-)
diff --git a/tools/testing/vsock/util.c b/tools/testing/vsock/util.c
index 93cbd6f603f9..2acbb7703c6a 100644
--- a/tools/testing/vsock/util.c
+++ b/tools/testing/vsock/util.c
@@ -84,7 +84,7 @@ void vsock_wait_remote_close(int fd)
}
/* Connect to <cid, port> and return the file descriptor. */
-int vsock_stream_connect(unsigned int cid, unsigned int port)
+static int vsock_connect(unsigned int cid, unsigned int port, int type)
{
union {
struct sockaddr sa;
@@ -101,7 +101,7 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)
control_expectln("LISTENING");
- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
+ fd = socket(AF_VSOCK, type, 0);
timeout_begin(TIMEOUT);
do {
@@ -120,11 +120,21 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)
return fd;
}
+int vsock_stream_connect(unsigned int cid, unsigned int port)
+{
+ return vsock_connect(cid, port, SOCK_STREAM);
+}
+
+int vsock_seqpacket_connect(unsigned int cid, unsigned int port)
+{
+ return vsock_connect(cid, port, SOCK_SEQPACKET);
+}
+
/* Listen on <cid, port> and return the first incoming connection. The remote
* address is stored to clientaddrp. clientaddrp may be NULL.
*/
-int vsock_stream_accept(unsigned int cid, unsigned int port,
- struct sockaddr_vm *clientaddrp)
+static int vsock_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp, int type)
{
union {
struct sockaddr sa;
@@ -145,7 +155,7 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
int client_fd;
int old_errno;
- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
+ fd = socket(AF_VSOCK, type, 0);
if (bind(fd, &addr.sa, sizeof(addr.svm)) < 0) {
perror("bind");
@@ -189,6 +199,18 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
return client_fd;
}
+int vsock_stream_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp)
+{
+ return vsock_accept(cid, port, clientaddrp, SOCK_STREAM);
+}
+
+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp)
+{
+ return vsock_accept(cid, port, clientaddrp, SOCK_SEQPACKET);
+}
+
/* Transmit one byte and check the return value.
*
* expected_ret:
diff --git a/tools/testing/vsock/util.h b/tools/testing/vsock/util.h
index e53dd09d26d9..a3375ad2fb7f 100644
--- a/tools/testing/vsock/util.h
+++ b/tools/testing/vsock/util.h
@@ -36,8 +36,11 @@ struct test_case {
void init_signals(void);
unsigned int parse_cid(const char *str);
int vsock_stream_connect(unsigned int cid, unsigned int port);
+int vsock_seqpacket_connect(unsigned int cid, unsigned int port);
int vsock_stream_accept(unsigned int cid, unsigned int port,
struct sockaddr_vm *clientaddrp);
+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp);
void vsock_wait_remote_close(int fd);
void send_byte(int fd, int expected_ret, int flags);
void recv_byte(int fd, int expected_ret, int flags);
diff --git a/tools/testing/vsock/vsock_test.c b/tools/testing/vsock/vsock_test.c
index 5a4fb80fa832..5fca9be5b1dd 100644
--- a/tools/testing/vsock/vsock_test.c
+++ b/tools/testing/vsock/vsock_test.c
@@ -14,6 +14,8 @@
#include <errno.h>
#include <unistd.h>
#include <linux/kernel.h>
+#include <sys/types.h>
+#include <sys/socket.h>
#include "timeout.h"
#include "control.h"
@@ -279,6 +281,120 @@ static void test_stream_msg_peek_server(const struct test_opts *opts)
close(fd);
}
+#define MESSAGES_CNT 7
+#define MESSAGE_EOR_IDX (MESSAGES_CNT / 2)
+static void test_seqpacket_msg_eor_client(const struct test_opts *opts)
+{
+ int fd;
+
+ fd = vsock_seqpacket_connect(opts->peer_cid, 1234);
+ if (fd < 0) {
+ perror("connect");
+ exit(EXIT_FAILURE);
+ }
+
+ /* Send several messages, one with MSG_EOR flag */
+ for (int i = 0; i < MESSAGES_CNT; i++)
+ send_byte(fd, 1, (i != MESSAGE_EOR_IDX) ? 0 : MSG_EOR);
+
+ control_writeln("SENDDONE");
+ close(fd);
+}
+
+static void test_seqpacket_msg_eor_server(const struct test_opts *opts)
+{
+ int fd;
+ char buf[16];
+ struct msghdr msg = {0};
+ struct iovec iov = {0};
+
+ fd = vsock_seqpacket_accept(VMADDR_CID_ANY, 1234, NULL);
+ if (fd < 0) {
+ perror("accept");
+ exit(EXIT_FAILURE);
+ }
+
+ control_expectln("SENDDONE");
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ for (int i = 0; i < MESSAGES_CNT; i++) {
+ if (recvmsg(fd, &msg, 0) != 1) {
+ perror("message bound violated");
+ exit(EXIT_FAILURE);
+ }
+
+ if (i == MESSAGE_EOR_IDX) {
+ if (!(msg.msg_flags & MSG_EOR)) {
+ fprintf(stderr, "MSG_EOR flag expected\n");
+ exit(EXIT_FAILURE);
+ }
+ } else {
+ if (msg.msg_flags & MSG_EOR) {
+ fprintf(stderr, "unexpected MSG_EOR flag\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+ }
+
+ close(fd);
+}
+
+#define MESSAGE_TRUNC_SZ 32
+static void test_seqpacket_msg_trunc_client(const struct test_opts *opts)
+{
+ int fd;
+ char buf[MESSAGE_TRUNC_SZ];
+
+ fd = vsock_seqpacket_connect(opts->peer_cid, 1234);
+ if (fd < 0) {
+ perror("connect");
+ exit(EXIT_FAILURE);
+ }
+
+ if (send(fd, buf, sizeof(buf), 0) != sizeof(buf)) {
+ perror("send failed");
+ exit(EXIT_FAILURE);
+ }
+
+ control_writeln("SENDDONE");
+ close(fd);
+}
+
+static void test_seqpacket_msg_trunc_server(const struct test_opts *opts)
+{
+ int fd;
+ char buf[MESSAGE_TRUNC_SZ / 2];
+ struct msghdr msg = {0};
+ struct iovec iov = {0};
+
+ fd = vsock_seqpacket_accept(VMADDR_CID_ANY, 1234, NULL);
+ if (fd < 0) {
+ perror("accept");
+ exit(EXIT_FAILURE);
+ }
+
+ control_expectln("SENDDONE");
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ if (recvmsg(fd, &msg, MSG_TRUNC) != MESSAGE_TRUNC_SZ) {
+ perror("MSG_TRUNC doesn't work");
+ exit(EXIT_FAILURE);
+ }
+
+ if (!(msg.msg_flags & MSG_TRUNC)) {
+ fprintf(stderr, "MSG_TRUNC expected\n");
+ exit(EXIT_FAILURE);
+ }
+
+ close(fd);
+}
+
static struct test_case test_cases[] = {
{
.name = "SOCK_STREAM connection reset",
@@ -309,6 +425,16 @@ static struct test_case test_cases[] = {
.run_client = test_stream_msg_peek_client,
.run_server = test_stream_msg_peek_server,
},
+ {
+ .name = "SOCK_SEQPACKET send data MSG_EOR",
+ .run_client = test_seqpacket_msg_eor_client,
+ .run_server = test_seqpacket_msg_eor_server,
+ },
+ {
+ .name = "SOCK_SEQPACKET send data MSG_TRUNC",
+ .run_client = test_seqpacket_msg_trunc_client,
+ .run_server = test_seqpacket_msg_trunc_server,
+ },
{},
};
--
2.25.1
This adds SEQPACKET ops for virtio transport
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/virtio_transport.c | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
index 2700a63ab095..bd3a854bb366 100644
--- a/net/vmw_vsock/virtio_transport.c
+++ b/net/vmw_vsock/virtio_transport.c
@@ -469,6 +469,11 @@ static struct virtio_transport virtio_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,
+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
+ .seqpacket_seq_send_eor = virtio_transport_seqpacket_seq_send_eor,
+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
--
2.25.1
On Thu, Feb 18, 2021 at 08:36:03AM +0300, Arseny Krasnov wrote:
>This prepares af_vsock.c for SEQPACKET support: some functions such
>as setsockopt(), getsockopt(), connect(), recvmsg(), sendmsg() are
>shared between both types of sockets, so rename them in general
>manner.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/af_vsock.c | 64 +++++++++++++++++++++-------------------
> 1 file changed, 34 insertions(+), 30 deletions(-)
IIRC I had already given my R-b to this patch. Please carry it over when
you post a new version.
Reviewed-by: Stefano Garzarella <[email protected]>
Thanks,
Stefano
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index 5546710d8ac1..656370e11707 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -604,8 +604,8 @@ static void vsock_pending_work(struct work_struct *work)
>
> /**** SOCKET OPERATIONS ****/
>
>-static int __vsock_bind_stream(struct vsock_sock *vsk,
>- struct sockaddr_vm *addr)
>+static int __vsock_bind_connectible(struct vsock_sock *vsk,
>+ struct sockaddr_vm *addr)
> {
> static u32 port;
> struct sockaddr_vm new_addr;
>@@ -685,7 +685,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
> switch (sk->sk_socket->type) {
> case SOCK_STREAM:
> spin_lock_bh(&vsock_table_lock);
>- retval = __vsock_bind_stream(vsk, addr);
>+ retval = __vsock_bind_connectible(vsk, addr);
> spin_unlock_bh(&vsock_table_lock);
> break;
>
>@@ -767,6 +767,11 @@ static struct sock *__vsock_create(struct net *net,
> return sk;
> }
>
>+static bool sock_type_connectible(u16 type)
>+{
>+ return type == SOCK_STREAM;
>+}
>+
> static void __vsock_release(struct sock *sk, int level)
> {
> if (sk) {
>@@ -785,7 +790,7 @@ static void __vsock_release(struct sock *sk, int level)
>
> if (vsk->transport)
> vsk->transport->release(vsk);
>- else if (sk->sk_type == SOCK_STREAM)
>+ else if (sock_type_connectible(sk->sk_type))
> vsock_remove_sock(vsk);
>
> sock_orphan(sk);
>@@ -947,7 +952,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
> lock_sock(sk);
> if (sock->state == SS_UNCONNECTED) {
> err = -ENOTCONN;
>- if (sk->sk_type == SOCK_STREAM)
>+ if (sock_type_connectible(sk->sk_type))
> goto out;
> } else {
> sock->state = SS_DISCONNECTING;
>@@ -960,7 +965,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
> sk->sk_shutdown |= mode;
> sk->sk_state_change(sk);
>
>- if (sk->sk_type == SOCK_STREAM) {
>+ if (sock_type_connectible(sk->sk_type)) {
> sock_reset_flag(sk, SOCK_DONE);
> vsock_send_shutdown(sk, mode);
> }
>@@ -1015,7 +1020,7 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
> if (!(sk->sk_shutdown & SEND_SHUTDOWN))
> mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;
>
>- } else if (sock->type == SOCK_STREAM) {
>+ } else if (sock_type_connectible(sk->sk_type)) {
> const struct vsock_transport *transport;
>
> lock_sock(sk);
>@@ -1262,8 +1267,8 @@ static void vsock_connect_timeout(struct work_struct *work)
> sock_put(sk);
> }
>
>-static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
>- int addr_len, int flags)
>+static int vsock_connect(struct socket *sock, struct sockaddr *addr,
>+ int addr_len, int flags)
> {
> int err;
> struct sock *sk;
>@@ -1413,7 +1418,7 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
>
> lock_sock(listener);
>
>- if (sock->type != SOCK_STREAM) {
>+ if (!sock_type_connectible(sock->type)) {
> err = -EOPNOTSUPP;
> goto out;
> }
>@@ -1490,7 +1495,7 @@ static int vsock_listen(struct socket *sock, int backlog)
>
> lock_sock(sk);
>
>- if (sock->type != SOCK_STREAM) {
>+ if (!sock_type_connectible(sk->sk_type)) {
> err = -EOPNOTSUPP;
> goto out;
> }
>@@ -1534,11 +1539,11 @@ static void vsock_update_buffer_size(struct vsock_sock *vsk,
> vsk->buffer_size = val;
> }
>
>-static int vsock_stream_setsockopt(struct socket *sock,
>- int level,
>- int optname,
>- sockptr_t optval,
>- unsigned int optlen)
>+static int vsock_connectible_setsockopt(struct socket *sock,
>+ int level,
>+ int optname,
>+ sockptr_t optval,
>+ unsigned int optlen)
> {
> int err;
> struct sock *sk;
>@@ -1616,10 +1621,10 @@ static int vsock_stream_setsockopt(struct socket *sock,
> return err;
> }
>
>-static int vsock_stream_getsockopt(struct socket *sock,
>- int level, int optname,
>- char __user *optval,
>- int __user *optlen)
>+static int vsock_connectible_getsockopt(struct socket *sock,
>+ int level, int optname,
>+ char __user *optval,
>+ int __user *optlen)
> {
> int err;
> int len;
>@@ -1687,8 +1692,8 @@ static int vsock_stream_getsockopt(struct socket *sock,
> return 0;
> }
>
>-static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>- size_t len)
>+static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>+ size_t len)
> {
> struct sock *sk;
> struct vsock_sock *vsk;
>@@ -1827,10 +1832,9 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> return err;
> }
>
>-
> static int
>-vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>- int flags)
>+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>+ int flags)
> {
> struct sock *sk;
> struct vsock_sock *vsk;
>@@ -2006,7 +2010,7 @@ static const struct proto_ops vsock_stream_ops = {
> .owner = THIS_MODULE,
> .release = vsock_release,
> .bind = vsock_bind,
>- .connect = vsock_stream_connect,
>+ .connect = vsock_connect,
> .socketpair = sock_no_socketpair,
> .accept = vsock_accept,
> .getname = vsock_getname,
>@@ -2014,10 +2018,10 @@ static const struct proto_ops vsock_stream_ops = {
> .ioctl = sock_no_ioctl,
> .listen = vsock_listen,
> .shutdown = vsock_shutdown,
>- .setsockopt = vsock_stream_setsockopt,
>- .getsockopt = vsock_stream_getsockopt,
>- .sendmsg = vsock_stream_sendmsg,
>- .recvmsg = vsock_stream_recvmsg,
>+ .setsockopt = vsock_connectible_setsockopt,
>+ .getsockopt = vsock_connectible_getsockopt,
>+ .sendmsg = vsock_connectible_sendmsg,
>+ .recvmsg = vsock_connectible_recvmsg,
> .mmap = sock_no_mmap,
> .sendpage = sock_no_sendpage,
> };
>--
>2.25.1
>
On 22.02.2021 13:50, Stefano Garzarella wrote:
> On Thu, Feb 18, 2021 at 08:36:03AM +0300, Arseny Krasnov wrote:
>> This prepares af_vsock.c for SEQPACKET support: some functions such
>> as setsockopt(), getsockopt(), connect(), recvmsg(), sendmsg() are
>> shared between both types of sockets, so rename them in general
>> manner.
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> net/vmw_vsock/af_vsock.c | 64 +++++++++++++++++++++-------------------
>> 1 file changed, 34 insertions(+), 30 deletions(-)
> IIRC I had already given my R-b to this patch. Please carry it over when
> you post a new version.
>
> Reviewed-by: Stefano Garzarella <[email protected]>
>
> Thanks,
> Stefano
Ack, sorry, didn't know that
>
>> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>> index 5546710d8ac1..656370e11707 100644
>> --- a/net/vmw_vsock/af_vsock.c
>> +++ b/net/vmw_vsock/af_vsock.c
>> @@ -604,8 +604,8 @@ static void vsock_pending_work(struct work_struct *work)
>>
>> /**** SOCKET OPERATIONS ****/
>>
>> -static int __vsock_bind_stream(struct vsock_sock *vsk,
>> - struct sockaddr_vm *addr)
>> +static int __vsock_bind_connectible(struct vsock_sock *vsk,
>> + struct sockaddr_vm *addr)
>> {
>> static u32 port;
>> struct sockaddr_vm new_addr;
>> @@ -685,7 +685,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
>> switch (sk->sk_socket->type) {
>> case SOCK_STREAM:
>> spin_lock_bh(&vsock_table_lock);
>> - retval = __vsock_bind_stream(vsk, addr);
>> + retval = __vsock_bind_connectible(vsk, addr);
>> spin_unlock_bh(&vsock_table_lock);
>> break;
>>
>> @@ -767,6 +767,11 @@ static struct sock *__vsock_create(struct net *net,
>> return sk;
>> }
>>
>> +static bool sock_type_connectible(u16 type)
>> +{
>> + return type == SOCK_STREAM;
>> +}
>> +
>> static void __vsock_release(struct sock *sk, int level)
>> {
>> if (sk) {
>> @@ -785,7 +790,7 @@ static void __vsock_release(struct sock *sk, int level)
>>
>> if (vsk->transport)
>> vsk->transport->release(vsk);
>> - else if (sk->sk_type == SOCK_STREAM)
>> + else if (sock_type_connectible(sk->sk_type))
>> vsock_remove_sock(vsk);
>>
>> sock_orphan(sk);
>> @@ -947,7 +952,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
>> lock_sock(sk);
>> if (sock->state == SS_UNCONNECTED) {
>> err = -ENOTCONN;
>> - if (sk->sk_type == SOCK_STREAM)
>> + if (sock_type_connectible(sk->sk_type))
>> goto out;
>> } else {
>> sock->state = SS_DISCONNECTING;
>> @@ -960,7 +965,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
>> sk->sk_shutdown |= mode;
>> sk->sk_state_change(sk);
>>
>> - if (sk->sk_type == SOCK_STREAM) {
>> + if (sock_type_connectible(sk->sk_type)) {
>> sock_reset_flag(sk, SOCK_DONE);
>> vsock_send_shutdown(sk, mode);
>> }
>> @@ -1015,7 +1020,7 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
>> if (!(sk->sk_shutdown & SEND_SHUTDOWN))
>> mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;
>>
>> - } else if (sock->type == SOCK_STREAM) {
>> + } else if (sock_type_connectible(sk->sk_type)) {
>> const struct vsock_transport *transport;
>>
>> lock_sock(sk);
>> @@ -1262,8 +1267,8 @@ static void vsock_connect_timeout(struct work_struct *work)
>> sock_put(sk);
>> }
>>
>> -static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
>> - int addr_len, int flags)
>> +static int vsock_connect(struct socket *sock, struct sockaddr *addr,
>> + int addr_len, int flags)
>> {
>> int err;
>> struct sock *sk;
>> @@ -1413,7 +1418,7 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
>>
>> lock_sock(listener);
>>
>> - if (sock->type != SOCK_STREAM) {
>> + if (!sock_type_connectible(sock->type)) {
>> err = -EOPNOTSUPP;
>> goto out;
>> }
>> @@ -1490,7 +1495,7 @@ static int vsock_listen(struct socket *sock, int backlog)
>>
>> lock_sock(sk);
>>
>> - if (sock->type != SOCK_STREAM) {
>> + if (!sock_type_connectible(sk->sk_type)) {
>> err = -EOPNOTSUPP;
>> goto out;
>> }
>> @@ -1534,11 +1539,11 @@ static void vsock_update_buffer_size(struct vsock_sock *vsk,
>> vsk->buffer_size = val;
>> }
>>
>> -static int vsock_stream_setsockopt(struct socket *sock,
>> - int level,
>> - int optname,
>> - sockptr_t optval,
>> - unsigned int optlen)
>> +static int vsock_connectible_setsockopt(struct socket *sock,
>> + int level,
>> + int optname,
>> + sockptr_t optval,
>> + unsigned int optlen)
>> {
>> int err;
>> struct sock *sk;
>> @@ -1616,10 +1621,10 @@ static int vsock_stream_setsockopt(struct socket *sock,
>> return err;
>> }
>>
>> -static int vsock_stream_getsockopt(struct socket *sock,
>> - int level, int optname,
>> - char __user *optval,
>> - int __user *optlen)
>> +static int vsock_connectible_getsockopt(struct socket *sock,
>> + int level, int optname,
>> + char __user *optval,
>> + int __user *optlen)
>> {
>> int err;
>> int len;
>> @@ -1687,8 +1692,8 @@ static int vsock_stream_getsockopt(struct socket *sock,
>> return 0;
>> }
>>
>> -static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>> - size_t len)
>> +static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>> + size_t len)
>> {
>> struct sock *sk;
>> struct vsock_sock *vsk;
>> @@ -1827,10 +1832,9 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>> return err;
>> }
>>
>> -
>> static int
>> -vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>> - int flags)
>> +vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>> + int flags)
>> {
>> struct sock *sk;
>> struct vsock_sock *vsk;
>> @@ -2006,7 +2010,7 @@ static const struct proto_ops vsock_stream_ops = {
>> .owner = THIS_MODULE,
>> .release = vsock_release,
>> .bind = vsock_bind,
>> - .connect = vsock_stream_connect,
>> + .connect = vsock_connect,
>> .socketpair = sock_no_socketpair,
>> .accept = vsock_accept,
>> .getname = vsock_getname,
>> @@ -2014,10 +2018,10 @@ static const struct proto_ops vsock_stream_ops = {
>> .ioctl = sock_no_ioctl,
>> .listen = vsock_listen,
>> .shutdown = vsock_shutdown,
>> - .setsockopt = vsock_stream_setsockopt,
>> - .getsockopt = vsock_stream_getsockopt,
>> - .sendmsg = vsock_stream_sendmsg,
>> - .recvmsg = vsock_stream_recvmsg,
>> + .setsockopt = vsock_connectible_setsockopt,
>> + .getsockopt = vsock_connectible_getsockopt,
>> + .sendmsg = vsock_connectible_sendmsg,
>> + .recvmsg = vsock_connectible_recvmsg,
>> .mmap = sock_no_mmap,
>> .sendpage = sock_no_sendpage,
>> };
>> --
>> 2.25.1
>>
>
On Mon, Feb 22, 2021 at 01:58:11PM +0300, Arseny Krasnov wrote:
>
>On 22.02.2021 13:50, Stefano Garzarella wrote:
>> On Thu, Feb 18, 2021 at 08:36:03AM +0300, Arseny Krasnov wrote:
>>> This prepares af_vsock.c for SEQPACKET support: some functions such
>>> as setsockopt(), getsockopt(), connect(), recvmsg(), sendmsg() are
>>> shared between both types of sockets, so rename them in general
>>> manner.
>>>
>>> Signed-off-by: Arseny Krasnov <[email protected]>
>>> ---
>>> net/vmw_vsock/af_vsock.c | 64 +++++++++++++++++++++-------------------
>>> 1 file changed, 34 insertions(+), 30 deletions(-)
>> IIRC I had already given my R-b to this patch. Please carry it over when
>> you post a new version.
>>
>> Reviewed-by: Stefano Garzarella <[email protected]>
>>
>> Thanks,
>> Stefano
>Ack, sorry, didn't know that
Don't worry :-)
It is documented here: Documentation/process/submitting-patches.rst
Both Tested-by and Reviewed-by tags, once received on mailing list from tester
or reviewer, should be added by author to the applicable patches when sending
next versions. However if the patch has changed substantially in following
version, these tags might not be applicable anymore and thus should be removed.
Usually removal of someone's Tested-by or Reviewed-by tags should be mentioned
in the patch changelog (after the '---' separator).
Thanks,
Stefano
On Thu, Feb 18, 2021 at 08:36:50AM +0300, Arseny Krasnov wrote:
>This moves STREAM specific data receive logic to dedicated function:
>'__vsock_stream_recvmsg()', while checks that will be same for both
>types of socket are in shared function: 'vsock_connectible_recvmsg()'.
I'm not a native speaker, but I would rewrite this message like this:
Move STREAM specific data receive logic to '__vsock_stream_recvmsg()'
dedicated function, while checks, that will be same for both STREAM
and SEQPACKET sockets, stays in 'vsock_connectible_recvmsg()' shared
functions.
Anyway the patch LGTM:
Reviewed-by: Stefano Garzarella <[email protected]>
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/af_vsock.c | 116 ++++++++++++++++++++++-----------------
> 1 file changed, 67 insertions(+), 49 deletions(-)
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index 6cf7bb977aa1..d277dc1cdbdf 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -1894,65 +1894,22 @@ static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
> return data;
> }
>
>-static int
>-vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>- int flags)
>+static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
>+ size_t len, int flags)
> {
>- struct sock *sk;
>- struct vsock_sock *vsk;
>+ struct vsock_transport_recv_notify_data recv_data;
> const struct vsock_transport *transport;
>- int err;
>- size_t target;
>+ struct vsock_sock *vsk;
> ssize_t copied;
>+ size_t target;
> long timeout;
>- struct vsock_transport_recv_notify_data recv_data;
>+ int err;
>
> DEFINE_WAIT(wait);
>
>- sk = sock->sk;
> vsk = vsock_sk(sk);
>- err = 0;
>-
>- lock_sock(sk);
>-
> transport = vsk->transport;
>
>- if (!transport || sk->sk_state != TCP_ESTABLISHED) {
>- /* Recvmsg is supposed to return 0 if a peer performs an
>- * orderly shutdown. Differentiate between that case and when a
>- * peer has not connected or a local shutdown occured
>with the
>- * SOCK_DONE flag.
>- */
>- if (sock_flag(sk, SOCK_DONE))
>- err = 0;
>- else
>- err = -ENOTCONN;
>-
>- goto out;
>- }
>-
>- if (flags & MSG_OOB) {
>- err = -EOPNOTSUPP;
>- goto out;
>- }
>-
>- /* We don't check peer_shutdown flag here since peer may actually shut
>- * down, but there can be data in the queue that a local socket can
>- * receive.
>- */
>- if (sk->sk_shutdown & RCV_SHUTDOWN) {
>- err = 0;
>- goto out;
>- }
>-
>- /* It is valid on Linux to pass in a zero-length receive buffer. This
>- * is not an error. We may as well bail out now.
>- */
>- if (!len) {
>- err = 0;
>- goto out;
>- }
>-
> /* We must not copy less than target bytes into the user's buffer
> * before returning successfully, so we wait for the consume queue to
> * have that much data to consume before dequeueing. Note that this
>@@ -2011,6 +1968,67 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> if (copied > 0)
> err = copied;
>
>+out:
>+ return err;
>+}
>+
>+static int
>+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>+ int flags)
>+{
>+ struct sock *sk;
>+ struct vsock_sock *vsk;
>+ const struct vsock_transport *transport;
>+ int err;
>+
>+ DEFINE_WAIT(wait);
>+
>+ sk = sock->sk;
>+ vsk = vsock_sk(sk);
>+ err = 0;
>+
>+ lock_sock(sk);
>+
>+ transport = vsk->transport;
>+
>+ if (!transport || sk->sk_state != TCP_ESTABLISHED) {
>+ /* Recvmsg is supposed to return 0 if a peer performs an
>+ * orderly shutdown. Differentiate between that case and
>when a
>+ * peer has not connected or a local shutdown occurred with the
>+ * SOCK_DONE flag.
>+ */
>+ if (sock_flag(sk, SOCK_DONE))
>+ err = 0;
>+ else
>+ err = -ENOTCONN;
>+
>+ goto out;
>+ }
>+
>+ if (flags & MSG_OOB) {
>+ err = -EOPNOTSUPP;
>+ goto out;
>+ }
>+
>+ /* We don't check peer_shutdown flag here since peer may actually shut
>+ * down, but there can be data in the queue that a local socket can
>+ * receive.
>+ */
>+ if (sk->sk_shutdown & RCV_SHUTDOWN) {
>+ err = 0;
>+ goto out;
>+ }
>+
>+ /* It is valid on Linux to pass in a zero-length receive buffer. This
>+ * is not an error. We may as well bail out now.
>+ */
>+ if (!len) {
>+ err = 0;
>+ goto out;
>+ }
>+
>+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
>+
> out:
> release_sock(sk);
> return err;
>--
>2.25.1
>
On Thu, Feb 18, 2021 at 08:37:15AM +0300, Arseny Krasnov wrote:
>This adds receive loop for SEQPACKET. It looks like receive loop for
>STREAM, but there is a little bit difference:
>1) It doesn't call notify callbacks.
>2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
> there is no sense for these values in SEQPACKET case.
>3) It waits until whole record is received or error is found during
> receiving.
>4) It processes and sets 'MSG_TRUNC' flag.
>
>So to avoid extra conditions for two types of socket inside one loop, two
>independent functions were created.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/net/af_vsock.h | 5 +++
> net/vmw_vsock/af_vsock.c | 97 +++++++++++++++++++++++++++++++++++++++-
> 2 files changed, 101 insertions(+), 1 deletion(-)
>
>diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>index b1c717286993..01563338cc03 100644
>--- a/include/net/af_vsock.h
>+++ b/include/net/af_vsock.h
>@@ -135,6 +135,11 @@ struct vsock_transport {
> bool (*stream_is_active)(struct vsock_sock *);
> bool (*stream_allow)(u32 cid, u32 port);
>
>+ /* SEQ_PACKET. */
>+ size_t (*seqpacket_seq_get_len)(struct vsock_sock *vsk);
>+ int (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
>+ int flags, bool *msg_ready);
I think this should be:
int (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
int flags, bool *msg_ready);
To avoid:
$ ./scripts/checkpatch.pl --strict -g HEAD
CHECK: Alignment should match open parenthesis
#35: FILE: include/net/af_vsock.h:141:
+ int (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
+ int flags, bool *msg_ready);
>+
> /* Notification. */
> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index d277dc1cdbdf..b754927a556a 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -1972,6 +1972,98 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
> return err;
> }
>
>+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
>+ size_t len, int flags)
>+{
>+ const struct vsock_transport *transport;
>+ const struct iovec *orig_iov;
>+ unsigned long orig_nr_segs;
>+ bool msg_ready;
>+ struct vsock_sock *vsk;
>+ size_t record_len;
>+ long timeout;
>+ int err = 0;
>+ DEFINE_WAIT(wait);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+
>+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>+ orig_nr_segs = msg->msg_iter.nr_segs;
>+ orig_iov = msg->msg_iter.iov;
>+ msg_ready = false;
>+ record_len = 0;
>+
>+ while (1) {
>+ err = vsock_wait_data(sk, &wait, timeout, NULL, 0);
>+
>+ if (err <= 0) {
>+ /* In case of any loop break(timeout, signal
>+ * interrupt or shutdown), we report user that
>+ * nothing was copied.
>+ */
>+ err = 0;
>+ break;
>+ }
>+
>+ if (record_len == 0) {
>+ record_len =
>+ transport->seqpacket_seq_get_len(vsk);
>+
>+ if (record_len == 0)
>+ continue;
>+ }
>+
>+ err = transport->seqpacket_dequeue(vsk, msg,
>+ flags, &msg_ready);
>+
Sorry, I expressed myself wrong.
Here it's fine to avoid the blank line as in the previous version, by
single line I meant the seqpacket_dequeue() call, something like this:
err = transport->seqpacket_dequeue(vsk, msg, flags, &msg_ready);
if (err < 0) {
>+ if (err < 0) {
>+ if (err == -EAGAIN) {
>+ iov_iter_init(&msg->msg_iter, READ,
>+ orig_iov, orig_nr_segs,
>+ len);
>+ /* Clear 'MSG_EOR' here, because dequeue
>+ * callback above set it again if it was
>+ * set by sender. This 'MSG_EOR' is from
>+ * dropped record.
>+ */
>+ msg->msg_flags &= ~MSG_EOR;
>+ record_len = 0;
>+ continue;
>+ }
>+
>+ err = -ENOMEM;
>+ break;
>+ }
>+
>+ if (msg_ready)
>+ break;
>+ }
>+
>+ if (sk->sk_err)
>+ err = -sk->sk_err;
>+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
>+ err = 0;
>+
>+ if (msg_ready) {
>+ /* User sets MSG_TRUNC, so return real length of
>+ * packet.
>+ */
>+ if (flags & MSG_TRUNC)
>+ err = record_len;
>+ else
>+ err = len - msg->msg_iter.count;
>+
>+ /* Always set MSG_TRUNC if real length of packet is
>+ * bigger than user's buffer.
>+ */
>+ if (record_len > len)
>+ msg->msg_flags |= MSG_TRUNC;
>+ }
>+
>+ return err;
>+}
>+
> static int
> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> int flags)
>@@ -2027,7 +2119,10 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> goto out;
> }
>
>- err = __vsock_stream_recvmsg(sk, msg, len, flags);
>+ if (sk->sk_type == SOCK_STREAM)
>+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
>+ else
>+ err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
>
> out:
> release_sock(sk);
>--
>2.25.1
>
On Thu, Feb 18, 2021 at 08:37:54AM +0300, Arseny Krasnov wrote:
>This moves loop that waits for space on send to separate function,
>because it will be used for SEQ_BEGIN/SEQ_END sending before and
>after data transmission. Waiting for SEQ_BEGIN/SEQ_END is needed
>because such packets carries SEQPACKET header that couldn't be
>fragmented by credit mechanism, so to avoid it, sender waits until
>enough space will be ready.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/net/af_vsock.h | 2 +
> net/vmw_vsock/af_vsock.c | 99 +++++++++++++++++++++++++---------------
> 2 files changed, 63 insertions(+), 38 deletions(-)
>
>diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>index 01563338cc03..6fbe88306403 100644
>--- a/include/net/af_vsock.h
>+++ b/include/net/af_vsock.h
>@@ -205,6 +205,8 @@ void vsock_remove_sock(struct vsock_sock *vsk);
> void vsock_for_each_connected_socket(void (*fn)(struct sock *sk));
> int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk);
> bool vsock_find_cid(unsigned int cid);
>+int vsock_wait_space(struct sock *sk, size_t space, int flags,
>+ struct vsock_transport_send_notify_data *send_data);
>
> /**** TAP ****/
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index b754927a556a..09b377422b1e 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -1692,6 +1692,65 @@ static int vsock_connectible_getsockopt(struct socket *sock,
> return 0;
> }
>
>+int vsock_wait_space(struct sock *sk, size_t space, int flags,
>+ struct vsock_transport_send_notify_data *send_data)
>+{
>+ const struct vsock_transport *transport;
>+ struct vsock_sock *vsk;
>+ long timeout;
>+ int err;
>+
>+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+ timeout = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
>+ err = 0;
>+
>+ add_wait_queue(sk_sleep(sk), &wait);
>+
>+ while (vsock_stream_has_space(vsk) < space &&
>+ sk->sk_err == 0 &&
>+ !(sk->sk_shutdown & SEND_SHUTDOWN) &&
>+ !(vsk->peer_shutdown & RCV_SHUTDOWN)) {
>+
>+ /* Don't wait for non-blocking sockets. */
>+ if (timeout == 0) {
>+ err = -EAGAIN;
>+ goto out_err;
>+ }
>+
>+ if (send_data) {
>+ err = transport->notify_send_pre_block(vsk, send_data);
>+ if (err < 0)
>+ goto out_err;
>+ }
>+
>+ release_sock(sk);
>+ timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout);
>+ lock_sock(sk);
>+ if (signal_pending(current)) {
>+ err = sock_intr_errno(timeout);
>+ goto out_err;
>+ } else if (timeout == 0) {
>+ err = -EAGAIN;
>+ goto out_err;
>+ }
>+ }
>+
>+ if (sk->sk_err) {
>+ err = -sk->sk_err;
>+ } else if ((sk->sk_shutdown & SEND_SHUTDOWN) ||
>+ (vsk->peer_shutdown & RCV_SHUTDOWN)) {
>+ err = -EPIPE;
>+ }
>+
>+out_err:
>+ remove_wait_queue(sk_sleep(sk), &wait);
>+ return err;
>+}
>+EXPORT_SYMBOL_GPL(vsock_wait_space);
>+
> static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> size_t len)
> {
>@@ -1699,10 +1758,8 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> struct vsock_sock *vsk;
> const struct vsock_transport *transport;
> ssize_t total_written;
>- long timeout;
> int err;
> struct vsock_transport_send_notify_data send_data;
>- DEFINE_WAIT_FUNC(wait, woken_wake_function);
>
> sk = sock->sk;
> vsk = vsock_sk(sk);
>@@ -1740,9 +1797,6 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> goto out;
> }
>
>- /* Wait for room in the produce queue to enqueue our user's data. */
>- timeout = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
>-
> err = transport->notify_send_init(vsk, &send_data);
> if (err < 0)
> goto out;
>@@ -1750,39 +1804,8 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> while (total_written < len) {
> ssize_t written;
>
>- add_wait_queue(sk_sleep(sk), &wait);
>- while (vsock_stream_has_space(vsk) == 0 &&
>- sk->sk_err == 0 &&
>- !(sk->sk_shutdown & SEND_SHUTDOWN) &&
>- !(vsk->peer_shutdown & RCV_SHUTDOWN)) {
>-
>- /* Don't wait for non-blocking sockets. */
>- if (timeout == 0) {
>- err = -EAGAIN;
>- remove_wait_queue(sk_sleep(sk), &wait);
>- goto out_err;
>- }
>-
>- err = transport->notify_send_pre_block(vsk, &send_data);
>- if (err < 0) {
>- remove_wait_queue(sk_sleep(sk), &wait);
>- goto out_err;
>- }
>-
>- release_sock(sk);
>- timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout);
>- lock_sock(sk);
>- if (signal_pending(current)) {
>- err = sock_intr_errno(timeout);
>- remove_wait_queue(sk_sleep(sk), &wait);
>- goto out_err;
>- } else if (timeout == 0) {
>- err = -EAGAIN;
>- remove_wait_queue(sk_sleep(sk), &wait);
>- goto out_err;
>- }
>- }
>- remove_wait_queue(sk_sleep(sk), &wait);
>+ if (vsock_wait_space(sk, 1, msg->msg_flags, &send_data))
>+ goto out_err;
>
> /* These checks occur both as part of and after the loop
> * conditional since we need to check before and after
>--
>2.25.1
>
The patch LGTM:
Reviewed-by: Stefano Garzarella <[email protected]>
On Thu, Feb 18, 2021 at 08:38:28AM +0300, Arseny Krasnov wrote:
>This does rest of SOCK_SEQPACKET support:
>1) Adds socket ops for SEQPACKET type.
>2) Allows to create socket with SEQPACKET type.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/af_vsock.c | 36 +++++++++++++++++++++++++++++++++++-
> 1 file changed, 35 insertions(+), 1 deletion(-)
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index f352cd9d91ce..f4b02c6d35d1 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -452,6 +452,7 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
> new_transport = transport_dgram;
> break;
> case SOCK_STREAM:
>+ case SOCK_SEQPACKET:
> if (vsock_use_local_transport(remote_cid))
> new_transport = transport_local;
> else if (remote_cid <= VMADDR_CID_HOST || !transport_h2g ||
>@@ -484,6 +485,14 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
> if (!new_transport || !try_module_get(new_transport->module))
> return -ENODEV;
>
>+ if (sk->sk_type == SOCK_SEQPACKET) {
>+ if (!new_transport->seqpacket_seq_send_len ||
>+ !new_transport->seqpacket_seq_send_eor ||
>+ !new_transport->seqpacket_seq_get_len ||
>+ !new_transport->seqpacket_dequeue)
We must release the module reference acquired above:
module_put(new_transport->module);
>+ return -ESOCKTNOSUPPORT;
>+ }
>+
> ret = new_transport->init(vsk, psk);
> if (ret) {
> module_put(new_transport->module);
>@@ -684,6 +693,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
>
> switch (sk->sk_socket->type) {
> case SOCK_STREAM:
>+ case SOCK_SEQPACKET:
> spin_lock_bh(&vsock_table_lock);
> retval = __vsock_bind_connectible(vsk, addr);
> spin_unlock_bh(&vsock_table_lock);
>@@ -769,7 +779,7 @@ static struct sock *__vsock_create(struct net *net,
>
> static bool sock_type_connectible(u16 type)
> {
>- return type == SOCK_STREAM;
>+ return (type == SOCK_STREAM) || (type == SOCK_SEQPACKET);
> }
>
> static void __vsock_release(struct sock *sk, int level)
>@@ -2191,6 +2201,27 @@ static const struct proto_ops vsock_stream_ops = {
> .sendpage = sock_no_sendpage,
> };
>
>+static const struct proto_ops vsock_seqpacket_ops = {
>+ .family = PF_VSOCK,
>+ .owner = THIS_MODULE,
>+ .release = vsock_release,
>+ .bind = vsock_bind,
>+ .connect = vsock_connect,
>+ .socketpair = sock_no_socketpair,
>+ .accept = vsock_accept,
>+ .getname = vsock_getname,
>+ .poll = vsock_poll,
>+ .ioctl = sock_no_ioctl,
>+ .listen = vsock_listen,
>+ .shutdown = vsock_shutdown,
>+ .setsockopt = vsock_connectible_setsockopt,
>+ .getsockopt = vsock_connectible_getsockopt,
>+ .sendmsg = vsock_connectible_sendmsg,
>+ .recvmsg = vsock_connectible_recvmsg,
>+ .mmap = sock_no_mmap,
>+ .sendpage = sock_no_sendpage,
>+};
>+
> static int vsock_create(struct net *net, struct socket *sock,
> int protocol, int kern)
> {
>@@ -2211,6 +2242,9 @@ static int vsock_create(struct net *net, struct socket *sock,
> case SOCK_STREAM:
> sock->ops = &vsock_stream_ops;
> break;
>+ case SOCK_SEQPACKET:
>+ sock->ops = &vsock_seqpacket_ops;
>+ break;
> default:
> return -ESOCKTNOSUPPORT;
> }
>--
>2.25.1
>
On Thu, Feb 18, 2021 at 08:38:48AM +0300, Arseny Krasnov wrote:
>This replaces 'stream' to 'connect oriented' in comments as SEQPACKET is
^ connection
You forgot to update the commit message :-)
With that fixed:
Reviewed-by: Stefano Garzarella <[email protected]>
>also connect oriented.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/af_vsock.c | 31 +++++++++++++++++--------------
> 1 file changed, 17 insertions(+), 14 deletions(-)
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index f4b02c6d35d1..f1bf6a5ad15e 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -415,8 +415,8 @@ static void vsock_deassign_transport(struct vsock_sock *vsk)
>
> /* Assign a transport to a socket and call the .init transport callback.
> *
>- * Note: for stream socket this must be called when vsk->remote_addr is set
>- * (e.g. during the connect() or when a connection request on a listener
>+ * Note: for connection oriented socket this must be called when vsk->remote_addr
>+ * is set (e.g. during the connect() or when a connection request on a listener
> * socket is received).
> * The vsk->remote_addr is used to decide which transport to use:
> * - remote CID == VMADDR_CID_LOCAL or g2h->local_cid or VMADDR_CID_HOST if
>@@ -470,10 +470,10 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
> return 0;
>
> /* transport->release() must be called with sock lock acquired.
>- * This path can only be taken during vsock_stream_connect(),
>- * where we have already held the sock lock.
>- * In the other cases, this function is called on a new socket
>- * which is not assigned to any transport.
>+ * This path can only be taken during vsock_connect(), where we
>+ * have already held the sock lock. In the other cases, this
>+ * function is called on a new socket which is not assigned to
>+ * any transport.
> */
> vsk->transport->release(vsk);
> vsock_deassign_transport(vsk);
>@@ -658,9 +658,10 @@ static int __vsock_bind_connectible(struct vsock_sock *vsk,
>
> vsock_addr_init(&vsk->local_addr, new_addr.svm_cid, new_addr.svm_port);
>
>- /* Remove stream sockets from the unbound list and add them to the hash
>- * table for easy lookup by its address. The unbound list is simply an
>- * extra entry at the end of the hash table, a trick used by AF_UNIX.
>+ /* Remove connection oriented sockets from the unbound list and add them
>+ * to the hash table for easy lookup by its address. The unbound list
>+ * is simply an extra entry at the end of the hash table, a trick used
>+ * by AF_UNIX.
> */
> __vsock_remove_bound(vsk);
> __vsock_insert_bound(vsock_bound_sockets(&vsk->local_addr), vsk);
>@@ -951,10 +952,10 @@ static int vsock_shutdown(struct socket *sock, int mode)
> if ((mode & ~SHUTDOWN_MASK) || !mode)
> return -EINVAL;
>
>- /* If this is a STREAM socket and it is not connected then bail out
>- * immediately. If it is a DGRAM socket then we must first kick the
>- * socket so that it wakes up from any sleeping calls, for example
>- * recv(), and then afterwards return the error.
>+ /* If this is a connection oriented socket and it is not connected then
>+ * bail out immediately. If it is a DGRAM socket then we must first
>+ * kick the socket so that it wakes up from any sleeping calls, for
>+ * example recv(), and then afterwards return the error.
> */
>
> sk = sock->sk;
>@@ -1783,7 +1784,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>
> transport = vsk->transport;
>
>- /* Callers should not provide a destination with stream sockets. */
>+ /* Callers should not provide a destination with connection oriented
>+ * sockets.
>+ */
> if (msg->msg_namelen) {
> err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
> goto out;
>--
>2.25.1
>
Hi Arseny,
On Thu, Feb 18, 2021 at 08:33:44AM +0300, Arseny Krasnov wrote:
> This patchset impelements support of SOCK_SEQPACKET for virtio
>transport.
> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>do it, two new packet operations were added: first for start of record
> and second to mark end of record(SEQ_BEGIN and SEQ_END later). Also,
>both operations carries metadata - to maintain boundaries and payload
>integrity. Metadata is introduced by adding special header with two
>fields - message count and message length:
>
> struct virtio_vsock_seq_hdr {
> __le32 msg_cnt;
> __le32 msg_len;
> } __attribute__((packed));
>
> This header is transmitted as payload of SEQ_BEGIN and SEQ_END
>packets(buffer of second virtio descriptor in chain) in the same way as
>data transmitted in RW packets. Payload was chosen as buffer for this
>header to avoid touching first virtio buffer which carries header of
>packet, because someone could check that size of this buffer is equal
>to size of packet header. To send record, packet with start marker is
>sent first(it's header contains length of record and counter), then
>counter is incremented and all data is sent as usual 'RW' packets and
>finally SEQ_END is sent(it also carries counter of message, which is
>counter of SEQ_BEGIN + 1), also after sedning SEQ_END counter is
>incremented again. On receiver's side, length of record is known from
>packet with start record marker. To check that no packets were dropped
>by transport, counters of two sequential SEQ_BEGIN and SEQ_END are
>checked(counter of SEQ_END must be bigger that counter of SEQ_BEGIN by
>1) and length of data between two markers is compared to length in
>SEQ_BEGIN header.
> Now as packets of one socket are not reordered neither on
>vsock nor on vhost transport layers, such markers allows to restore
>original record on receiver's side. If user's buffer is smaller that
>record length, when all out of size data is dropped.
> Maximum length of datagram is not limited as in stream socket,
>because same credit logic is used. Difference with stream socket is
>that user is not woken up until whole record is received or error
>occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
> Tests also implemented.
I reviewed the first part (af_vsock.c changes), tomorrow I'll review the
rest. That part looks great to me, only found a few minor issues.
In the meantime, however, I'm getting a doubt, especially with regard to
other transports besides virtio.
Should we hide the begin/end marker sending in the transport?
I mean, should the transport just provide a seqpacket_enqueue()
callbacl?
Inside it then the transport will send the markers. This is because some
transports might not need to send markers.
But thinking about it more, they could actually implement stubs for that
calls, if they don't need to send markers.
So I think for now it's fine since it allows us to reuse a lot of code,
unless someone has some objection.
Thanks,
Stefano
On Thu, Feb 18, 2021 at 08:39:37AM +0300, Arseny Krasnov wrote:
> This adds transport callback and it's logic for SEQPACKET dequeue.
> Callback fetches RW packets from rx queue of socket until whole record
> is copied(if user's buffer is full, user is not woken up). This is done
> to not stall sender, because if we wake up user and it leaves syscall,
> nobody will send credit update for rest of record, and sender will wait
> for next enter of read syscall at receiver's side. So if user buffer is
> full, we just send credit update and drop data. If during copy SEQ_BEGIN
> was found(and not all data was copied), copying is restarted by reset
> user's iov iterator(previous unfinished data is dropped).
>
> Signed-off-by: Arseny Krasnov <[email protected]>
> ---
> include/linux/virtio_vsock.h | 10 +++
> include/uapi/linux/virtio_vsock.h | 16 ++++
> net/vmw_vsock/virtio_transport_common.c | 114 ++++++++++++++++++++++++
> 3 files changed, 140 insertions(+)
>
> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
> index dc636b727179..003d06ae4a85 100644
> --- a/include/linux/virtio_vsock.h
> +++ b/include/linux/virtio_vsock.h
> @@ -36,6 +36,11 @@ struct virtio_vsock_sock {
> u32 rx_bytes;
> u32 buf_alloc;
> struct list_head rx_queue;
> +
> + /* For SOCK_SEQPACKET */
> + u32 user_read_seq_len;
> + u32 user_read_copied;
> + u32 curr_rx_msg_cnt;
wrap these in a struct to make it's clearer they
are related?
> };
>
> struct virtio_vsock_pkt {
> @@ -80,6 +85,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags);
>
> +int
> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
> + struct msghdr *msg,
> + int flags,
> + bool *msg_ready);
> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>
> diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
> index 1d57ed3d84d2..cf9c165e5cca 100644
> --- a/include/uapi/linux/virtio_vsock.h
> +++ b/include/uapi/linux/virtio_vsock.h
> @@ -63,8 +63,14 @@ struct virtio_vsock_hdr {
> __le32 fwd_cnt;
> } __attribute__((packed));
>
> +struct virtio_vsock_seq_hdr {
> + __le32 msg_cnt;
> + __le32 msg_len;
> +} __attribute__((packed));
> +
> enum virtio_vsock_type {
> VIRTIO_VSOCK_TYPE_STREAM = 1,
> + VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
> };
>
> enum virtio_vsock_op {
> @@ -83,6 +89,11 @@ enum virtio_vsock_op {
> VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
> /* Request the peer to send the credit info to us */
> VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
> +
> + /* Record begin for SOCK_SEQPACKET */
> + VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
> + /* Record end for SOCK_SEQPACKET */
> + VIRTIO_VSOCK_OP_SEQ_END = 9,
> };
>
> /* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
> @@ -91,4 +102,9 @@ enum virtio_vsock_shutdown {
> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
> };
>
> +/* VIRTIO_VSOCK_OP_RW flags values */
> +enum virtio_vsock_rw {
> + VIRTIO_VSOCK_RW_EOR = 1,
> +};
> +
> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
Probably a good idea to also have a feature bit gating
this functionality.
--
MST
The title is a little cryptic, maybe a something like:
virtio/vsock: set packet's type in virtio_transport_send_pkt_info()
On Thu, Feb 18, 2021 at 08:39:02AM +0300, Arseny Krasnov wrote:
>This moves passing type of packet from 'info' srtucture to send
Also here replace send with the function name.
>function. There is no sense to set type of packet which differs
>from type of socket, and since at current time only stream type
>is supported, so force to use this type.
I'm not a native speaker, but I would rephrase a bit the commit message:
There is no need to set type of packet which differs from type of
socket. Since at current time only stream type is supported, set
it directly in virtio_transport_send_pkt_info(), so callers don't
need to set it.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/virtio_transport_common.c | 7 ++-----
> 1 file changed, 2 insertions(+), 5 deletions(-)
If I haven't missed something, we can remove 'type' parameter also from
virtio_transport_send_credit_update(), right?
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index e4370b1b7494..1c9d71ca5e8e 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -179,6 +179,8 @@ static int virtio_transport_send_pkt_info(struct vsock_sock *vsk,
> struct virtio_vsock_pkt *pkt;
> u32 pkt_len = info->pkt_len;
>
>+ info->type = VIRTIO_VSOCK_TYPE_STREAM;
>+
> t_ops = virtio_transport_get_ops(vsk);
> if (unlikely(!t_ops))
> return -EFAULT;
>@@ -624,7 +626,6 @@ int virtio_transport_connect(struct vsock_sock *vsk)
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_REQUEST,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .vsk = vsk,
> };
>
>@@ -636,7 +637,6 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_SHUTDOWN,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .flags = (mode & RCV_SHUTDOWN ?
> VIRTIO_VSOCK_SHUTDOWN_RCV : 0) |
> (mode & SEND_SHUTDOWN ?
>@@ -665,7 +665,6 @@ virtio_transport_stream_enqueue(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RW,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .msg = msg,
> .pkt_len = len,
> .vsk = vsk,
>@@ -688,7 +687,6 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RST,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .reply = !!pkt,
> .vsk = vsk,
> };
>@@ -990,7 +988,6 @@ virtio_transport_send_response(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RESPONSE,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .remote_cid = le64_to_cpu(pkt->hdr.src_cid),
> .remote_port = le32_to_cpu(pkt->hdr.src_port),
> .reply = true,
>--
>2.25.1
>
On Thu, Feb 18, 2021 at 08:39:23AM +0300, Arseny Krasnov wrote:
>'virtio_transport_send_credit_update()' has some extra args:
>1) 'type' may be set in 'virtio_transport_send_pkt_info()' using type
> of socket.
>2) This function is static and 'hdr' arg was always NULL.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/virtio_transport_common.c | 15 ++++-----------
> 1 file changed, 4 insertions(+), 11 deletions(-)
>
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index 1c9d71ca5e8e..833104b71a1c 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -271,13 +271,10 @@ void virtio_transport_put_credit(struct virtio_vsock_sock *vvs, u32 credit)
> }
> EXPORT_SYMBOL_GPL(virtio_transport_put_credit);
>
>-static int virtio_transport_send_credit_update(struct vsock_sock *vsk,
>- int type,
>- struct virtio_vsock_hdr *hdr)
>+static int virtio_transport_send_credit_update(struct vsock_sock *vsk)
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_CREDIT_UPDATE,
>- .type = type,
> .vsk = vsk,
> };
I don't know if it's better to remove type with the others changes in
the previous patch, maybe it's more consistent.
I mean only the removal of 'type' parameter, the 'hdr' parameter should
be removed with this patch.
>
>@@ -385,11 +382,8 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> * messages, we set the limit to a high value. TODO: experiment
> * with different values.
> */
>- if (free_space < VIRTIO_VSOCK_MAX_PKT_BUF_SIZE) {
>- virtio_transport_send_credit_update(vsk,
>-
>VIRTIO_VSOCK_TYPE_STREAM,
>- NULL);
>- }
>+ if (free_space < VIRTIO_VSOCK_MAX_PKT_BUF_SIZE)
>+ virtio_transport_send_credit_update(vsk);
>
> return total;
>
>@@ -498,8 +492,7 @@ void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)
>
> vvs->buf_alloc = *val;
>
>- virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_STREAM,
>- NULL);
>+ virtio_transport_send_credit_update(vsk);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);
>
>--
>2.25.1
>
On Thu, Feb 18, 2021 at 08:39:37AM +0300, Arseny Krasnov wrote:
>This adds transport callback and it's logic for SEQPACKET dequeue.
>Callback fetches RW packets from rx queue of socket until whole record
>is copied(if user's buffer is full, user is not woken up). This is done
>to not stall sender, because if we wake up user and it leaves syscall,
>nobody will send credit update for rest of record, and sender will wait
>for next enter of read syscall at receiver's side. So if user buffer is
>full, we just send credit update and drop data. If during copy SEQ_BEGIN
>was found(and not all data was copied), copying is restarted by reset
>user's iov iterator(previous unfinished data is dropped).
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/linux/virtio_vsock.h | 10 +++
> include/uapi/linux/virtio_vsock.h | 16 ++++
> net/vmw_vsock/virtio_transport_common.c | 114 ++++++++++++++++++++++++
> 3 files changed, 140 insertions(+)
This patch LGTM, maybe we only need to change 'msg_cnt' as we discussed
on virtio-comment, but let's see if there are any other comments.
>
>diff --git a/include/linux/virtio_vsock.h
>b/include/linux/virtio_vsock.h
>index dc636b727179..003d06ae4a85 100644
>--- a/include/linux/virtio_vsock.h
>+++ b/include/linux/virtio_vsock.h
>@@ -36,6 +36,11 @@ struct virtio_vsock_sock {
> u32 rx_bytes;
> u32 buf_alloc;
> struct list_head rx_queue;
>+
>+ /* For SOCK_SEQPACKET */
>+ u32 user_read_seq_len;
>+ u32 user_read_copied;
>+ u32 curr_rx_msg_cnt;
> };
>
> struct virtio_vsock_pkt {
>@@ -80,6 +85,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags);
>
>+int
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ int flags,
>+ bool *msg_ready);
> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>
>diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
>index 1d57ed3d84d2..cf9c165e5cca 100644
>--- a/include/uapi/linux/virtio_vsock.h
>+++ b/include/uapi/linux/virtio_vsock.h
>@@ -63,8 +63,14 @@ struct virtio_vsock_hdr {
> __le32 fwd_cnt;
> } __attribute__((packed));
>
>+struct virtio_vsock_seq_hdr {
>+ __le32 msg_cnt;
>+ __le32 msg_len;
>+} __attribute__((packed));
>+
> enum virtio_vsock_type {
> VIRTIO_VSOCK_TYPE_STREAM = 1,
>+ VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
> };
>
> enum virtio_vsock_op {
>@@ -83,6 +89,11 @@ enum virtio_vsock_op {
> VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
> /* Request the peer to send the credit info to us */
> VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
>+
>+ /* Record begin for SOCK_SEQPACKET */
>+ VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
>+ /* Record end for SOCK_SEQPACKET */
>+ VIRTIO_VSOCK_OP_SEQ_END = 9,
> };
>
> /* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
>@@ -91,4 +102,9 @@ enum virtio_vsock_shutdown {
> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
> };
>
>+/* VIRTIO_VSOCK_OP_RW flags values */
>+enum virtio_vsock_rw {
>+ VIRTIO_VSOCK_RW_EOR = 1,
>+};
>+
> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index 833104b71a1c..d8ec2dfa2315 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -393,6 +393,108 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> return err;
> }
>
>+static inline void virtio_transport_remove_pkt(struct virtio_vsock_pkt *pkt)
>+{
>+ list_del(&pkt->list);
>+ virtio_transport_free_pkt(pkt);
>+}
>+
>+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ bool *msg_ready)
>+{
>+ struct virtio_vsock_sock *vvs = vsk->trans;
>+ struct virtio_vsock_pkt *pkt;
>+ int err = 0;
>+ size_t user_buf_len = msg->msg_iter.count;
>+
>+ *msg_ready = false;
>+ spin_lock_bh(&vvs->rx_lock);
>+
>+ while (!*msg_ready && !list_empty(&vvs->rx_queue) && !err) {
>+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>+
>+ switch (le16_to_cpu(pkt->hdr.op)) {
>+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
>+ /* Unexpected 'SEQ_BEGIN' during record copy:
>+ * Leave receive loop, 'EAGAIN' will restart it from
>+ * outer receive loop, packet is still in queue and
>+ * counters are cleared. So in next loop enter,
>+ * 'SEQ_BEGIN' will be dequeued first. User's iov
>+ * iterator will be reset in outer loop. Also
>+ * send credit update, because some bytes could be
>+ * copied. User will never see unfinished record.
>+ */
>+ err = -EAGAIN;
>+ break;
>+ }
>+ case VIRTIO_VSOCK_OP_SEQ_END: {
>+ struct virtio_vsock_seq_hdr *seq_hdr;
>+
>+ seq_hdr = (struct virtio_vsock_seq_hdr *)pkt->buf;
>+ /* First check that whole record is received. */
>+
>+ if (vvs->user_read_copied != vvs->user_read_seq_len ||
>+ (le32_to_cpu(seq_hdr->msg_cnt) - vvs->curr_rx_msg_cnt) != 1) {
>+ /* Tail of current record and head of next missed,
>+ * so this EOR is from next record. Restart receive.
>+ * Current record will be dropped, next headless will
>+ * be dropped on next attempt to get record length.
>+ */
>+ err = -EAGAIN;
>+ } else {
>+ /* Success. */
>+ *msg_ready = true;
>+ }
>+
>+ break;
>+ }
>+ case VIRTIO_VSOCK_OP_RW: {
>+ size_t bytes_to_copy;
>+ size_t pkt_len;
>+
>+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>+ bytes_to_copy = min(user_buf_len, pkt_len);
>+
>+ /* sk_lock is held by caller so no one else can dequeue.
>+ * Unlock rx_lock since memcpy_to_msg() may sleep.
>+ */
>+ spin_unlock_bh(&vvs->rx_lock);
>+
>+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
>+ spin_lock_bh(&vvs->rx_lock);
>+ err = -EINVAL;
>+ break;
>+ }
>+
>+ spin_lock_bh(&vvs->rx_lock);
>+ user_buf_len -= bytes_to_copy;
>+ vvs->user_read_copied += pkt_len;
>+
>+ if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_RW_EOR)
>+ msg->msg_flags |= MSG_EOR;
>+ break;
>+ }
>+ default:
>+ ;
>+ }
>+
>+ /* For unexpected 'SEQ_BEGIN', keep such packet in queue,
>+ * but drop any other type of packet.
>+ */
>+ if (le16_to_cpu(pkt->hdr.op) != VIRTIO_VSOCK_OP_SEQ_BEGIN) {
>+ virtio_transport_dec_rx_pkt(vvs, pkt);
>+ virtio_transport_remove_pkt(pkt);
>+ }
>+ }
>+
>+ spin_unlock_bh(&vvs->rx_lock);
>+
>+ virtio_transport_send_credit_update(vsk);
>+
>+ return err;
>+}
>+
> ssize_t
> virtio_transport_stream_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
>@@ -405,6 +507,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
> }
> EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
>
>+int
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ int flags, bool *msg_ready)
>+{
>+ if (flags & MSG_PEEK)
>+ return -EOPNOTSUPP;
>+
>+ return virtio_transport_seqpacket_do_dequeue(vsk, msg, msg_ready);
>+}
>+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
>+
> int
> virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
>--
>2.25.1
>
On Mon, Feb 22, 2021 at 03:23:11PM +0100, Stefano Garzarella wrote:
>Hi Arseny,
>
>On Thu, Feb 18, 2021 at 08:33:44AM +0300, Arseny Krasnov wrote:
>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>transport.
>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>do it, two new packet operations were added: first for start of record
>>and second to mark end of record(SEQ_BEGIN and SEQ_END later). Also,
>>both operations carries metadata - to maintain boundaries and payload
>>integrity. Metadata is introduced by adding special header with two
>>fields - message count and message length:
>>
>> struct virtio_vsock_seq_hdr {
>> __le32 msg_cnt;
>> __le32 msg_len;
>> } __attribute__((packed));
>>
>> This header is transmitted as payload of SEQ_BEGIN and SEQ_END
>>packets(buffer of second virtio descriptor in chain) in the same way as
>>data transmitted in RW packets. Payload was chosen as buffer for this
>>header to avoid touching first virtio buffer which carries header of
>>packet, because someone could check that size of this buffer is equal
>>to size of packet header. To send record, packet with start marker is
>>sent first(it's header contains length of record and counter), then
>>counter is incremented and all data is sent as usual 'RW' packets and
>>finally SEQ_END is sent(it also carries counter of message, which is
>>counter of SEQ_BEGIN + 1), also after sedning SEQ_END counter is
>>incremented again. On receiver's side, length of record is known from
>>packet with start record marker. To check that no packets were dropped
>>by transport, counters of two sequential SEQ_BEGIN and SEQ_END are
>>checked(counter of SEQ_END must be bigger that counter of SEQ_BEGIN by
>>1) and length of data between two markers is compared to length in
>>SEQ_BEGIN header.
>> Now as packets of one socket are not reordered neither on
>>vsock nor on vhost transport layers, such markers allows to restore
>>original record on receiver's side. If user's buffer is smaller that
>>record length, when all out of size data is dropped.
>> Maximum length of datagram is not limited as in stream socket,
>>because same credit logic is used. Difference with stream socket is
>>that user is not woken up until whole record is received or error
>>occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>> Tests also implemented.
>
>I reviewed the first part (af_vsock.c changes), tomorrow I'll review
>the rest. That part looks great to me, only found a few minor issues.
I revieiwed the rest of it as well, left a few minor comments, but I
think we're well on track.
I'll take a better look at the specification patch tomorrow.
Thanks,
Stefano
>
>In the meantime, however, I'm getting a doubt, especially with regard
>to other transports besides virtio.
>
>Should we hide the begin/end marker sending in the transport?
>
>I mean, should the transport just provide a seqpacket_enqueue()
>callbacl?
>Inside it then the transport will send the markers. This is because
>some transports might not need to send markers.
>
>But thinking about it more, they could actually implement stubs for
>that calls, if they don't need to send markers.
>
>So I think for now it's fine since it allows us to reuse a lot of
>code, unless someone has some objection.
>
>Thanks,
>Stefano
>
On Wed, Feb 24, 2021 at 07:29:25AM +0300, Arseny Krasnov wrote:
>
>On 23.02.2021 17:50, Stefano Garzarella wrote:
>> On Mon, Feb 22, 2021 at 03:23:11PM +0100, Stefano Garzarella wrote:
>>> Hi Arseny,
>>>
>>> On Thu, Feb 18, 2021 at 08:33:44AM +0300, Arseny Krasnov wrote:
>>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>>> transport.
>>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>>> do it, two new packet operations were added: first for start of record
>>>> and second to mark end of record(SEQ_BEGIN and SEQ_END later). Also,
>>>> both operations carries metadata - to maintain boundaries and payload
>>>> integrity. Metadata is introduced by adding special header with two
>>>> fields - message count and message length:
>>>>
>>>> struct virtio_vsock_seq_hdr {
>>>> __le32 msg_cnt;
>>>> __le32 msg_len;
>>>> } __attribute__((packed));
>>>>
>>>> This header is transmitted as payload of SEQ_BEGIN and SEQ_END
>>>> packets(buffer of second virtio descriptor in chain) in the same way as
>>>> data transmitted in RW packets. Payload was chosen as buffer for this
>>>> header to avoid touching first virtio buffer which carries header of
>>>> packet, because someone could check that size of this buffer is equal
>>>> to size of packet header. To send record, packet with start marker is
>>>> sent first(it's header contains length of record and counter), then
>>>> counter is incremented and all data is sent as usual 'RW' packets and
>>>> finally SEQ_END is sent(it also carries counter of message, which is
>>>> counter of SEQ_BEGIN + 1), also after sedning SEQ_END counter is
>>>> incremented again. On receiver's side, length of record is known from
>>>> packet with start record marker. To check that no packets were dropped
>>>> by transport, counters of two sequential SEQ_BEGIN and SEQ_END are
>>>> checked(counter of SEQ_END must be bigger that counter of SEQ_BEGIN by
>>>> 1) and length of data between two markers is compared to length in
>>>> SEQ_BEGIN header.
>>>> Now as packets of one socket are not reordered neither on
>>>> vsock nor on vhost transport layers, such markers allows to restore
>>>> original record on receiver's side. If user's buffer is smaller that
>>>> record length, when all out of size data is dropped.
>>>> Maximum length of datagram is not limited as in stream socket,
>>>> because same credit logic is used. Difference with stream socket is
>>>> that user is not woken up until whole record is received or error
>>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>>> Tests also implemented.
>>> I reviewed the first part (af_vsock.c changes), tomorrow I'll review
>>> the rest. That part looks great to me, only found a few minor issues.
>> I revieiwed the rest of it as well, left a few minor comments, but I
>> think we're well on track.
>>
>> I'll take a better look at the specification patch tomorrow.
>Great, Thank You
>>
>> Thanks,
>> Stefano
>>
>>> In the meantime, however, I'm getting a doubt, especially with regard
>>> to other transports besides virtio.
>>>
>>> Should we hide the begin/end marker sending in the transport?
>>>
>>> I mean, should the transport just provide a seqpacket_enqueue()
>>> callbacl?
>>> Inside it then the transport will send the markers. This is because
>>> some transports might not need to send markers.
>>>
>>> But thinking about it more, they could actually implement stubs for
>>> that calls, if they don't need to send markers.
>>>
>>> So I think for now it's fine since it allows us to reuse a lot of
>>> code, unless someone has some objection.
>
>I thought about that, I'll try to implement it in next version. Let's see...
If you want to discuss it first, write down the idea you want to
implement, I wouldn't want to make you do unnecessary work. :-)
Cheers,
Stefano
On 24.02.2021 11:23, Stefano Garzarella wrote:
> On Wed, Feb 24, 2021 at 07:29:25AM +0300, Arseny Krasnov wrote:
>> On 23.02.2021 17:50, Stefano Garzarella wrote:
>>> On Mon, Feb 22, 2021 at 03:23:11PM +0100, Stefano Garzarella wrote:
>>>> Hi Arseny,
>>>>
>>>> On Thu, Feb 18, 2021 at 08:33:44AM +0300, Arseny Krasnov wrote:
>>>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>>>> transport.
>>>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>>>> do it, two new packet operations were added: first for start of record
>>>>> and second to mark end of record(SEQ_BEGIN and SEQ_END later). Also,
>>>>> both operations carries metadata - to maintain boundaries and payload
>>>>> integrity. Metadata is introduced by adding special header with two
>>>>> fields - message count and message length:
>>>>>
>>>>> struct virtio_vsock_seq_hdr {
>>>>> __le32 msg_cnt;
>>>>> __le32 msg_len;
>>>>> } __attribute__((packed));
>>>>>
>>>>> This header is transmitted as payload of SEQ_BEGIN and SEQ_END
>>>>> packets(buffer of second virtio descriptor in chain) in the same way as
>>>>> data transmitted in RW packets. Payload was chosen as buffer for this
>>>>> header to avoid touching first virtio buffer which carries header of
>>>>> packet, because someone could check that size of this buffer is equal
>>>>> to size of packet header. To send record, packet with start marker is
>>>>> sent first(it's header contains length of record and counter), then
>>>>> counter is incremented and all data is sent as usual 'RW' packets and
>>>>> finally SEQ_END is sent(it also carries counter of message, which is
>>>>> counter of SEQ_BEGIN + 1), also after sedning SEQ_END counter is
>>>>> incremented again. On receiver's side, length of record is known from
>>>>> packet with start record marker. To check that no packets were dropped
>>>>> by transport, counters of two sequential SEQ_BEGIN and SEQ_END are
>>>>> checked(counter of SEQ_END must be bigger that counter of SEQ_BEGIN by
>>>>> 1) and length of data between two markers is compared to length in
>>>>> SEQ_BEGIN header.
>>>>> Now as packets of one socket are not reordered neither on
>>>>> vsock nor on vhost transport layers, such markers allows to restore
>>>>> original record on receiver's side. If user's buffer is smaller that
>>>>> record length, when all out of size data is dropped.
>>>>> Maximum length of datagram is not limited as in stream socket,
>>>>> because same credit logic is used. Difference with stream socket is
>>>>> that user is not woken up until whole record is received or error
>>>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>>>> Tests also implemented.
>>>> I reviewed the first part (af_vsock.c changes), tomorrow I'll review
>>>> the rest. That part looks great to me, only found a few minor issues.
>>> I revieiwed the rest of it as well, left a few minor comments, but I
>>> think we're well on track.
>>>
>>> I'll take a better look at the specification patch tomorrow.
>> Great, Thank You
>>> Thanks,
>>> Stefano
>>>
>>>> In the meantime, however, I'm getting a doubt, especially with regard
>>>> to other transports besides virtio.
>>>>
>>>> Should we hide the begin/end marker sending in the transport?
>>>>
>>>> I mean, should the transport just provide a seqpacket_enqueue()
>>>> callbacl?
>>>> Inside it then the transport will send the markers. This is because
>>>> some transports might not need to send markers.
>>>>
>>>> But thinking about it more, they could actually implement stubs for
>>>> that calls, if they don't need to send markers.
>>>>
>>>> So I think for now it's fine since it allows us to reuse a lot of
>>>> code, unless someone has some objection.
>> I thought about that, I'll try to implement it in next version. Let's see...
> If you want to discuss it first, write down the idea you want to
> implement, I wouldn't want to make you do unnecessary work. :-)
Idea is simple, in iov iterator of 'struct msghdr' which is passed to
enqueue callback we have two fields: 'iov_offset' which is byte
offset inside io vector where next data must be picked and 'count'
which is rest of unprocessed bytes in io vector. So in seqpacket
enqueue callback if 'iov_offset' is 0 i'll send SEQBEGIN, and if
'count' is 0 i'll send SEQEND.
>
> Cheers,
> Stefano
>
>
On Wed, Feb 24, 2021 at 01:41:56AM -0500, Michael S. Tsirkin wrote:
>On Wed, Feb 24, 2021 at 08:07:48AM +0300, Arseny Krasnov wrote:
>>
>> On 23.02.2021 17:17, Michael S. Tsirkin wrote:
>> > On Thu, Feb 18, 2021 at 08:39:37AM +0300, Arseny Krasnov wrote:
>> >> This adds transport callback and it's logic for SEQPACKET dequeue.
>> >> Callback fetches RW packets from rx queue of socket until whole record
>> >> is copied(if user's buffer is full, user is not woken up). This is done
>> >> to not stall sender, because if we wake up user and it leaves syscall,
>> >> nobody will send credit update for rest of record, and sender will wait
>> >> for next enter of read syscall at receiver's side. So if user buffer is
>> >> full, we just send credit update and drop data. If during copy SEQ_BEGIN
>> >> was found(and not all data was copied), copying is restarted by reset
>> >> user's iov iterator(previous unfinished data is dropped).
>> >>
>> >> Signed-off-by: Arseny Krasnov <[email protected]>
>> >> ---
>> >> include/linux/virtio_vsock.h | 10 +++
>> >> include/uapi/linux/virtio_vsock.h | 16 ++++
>> >> net/vmw_vsock/virtio_transport_common.c | 114 ++++++++++++++++++++++++
>> >> 3 files changed, 140 insertions(+)
>> >>
>> >> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>> >> index dc636b727179..003d06ae4a85 100644
>> >> --- a/include/linux/virtio_vsock.h
>> >> +++ b/include/linux/virtio_vsock.h
>> >> @@ -36,6 +36,11 @@ struct virtio_vsock_sock {
>> >> u32 rx_bytes;
>> >> u32 buf_alloc;
>> >> struct list_head rx_queue;
>> >> +
>> >> + /* For SOCK_SEQPACKET */
>> >> + u32 user_read_seq_len;
>> >> + u32 user_read_copied;
>> >> + u32 curr_rx_msg_cnt;
>> >
>> > wrap these in a struct to make it's clearer they
>> > are related?
>> Ack
>> >
>> >> };
>> >>
>> >> struct virtio_vsock_pkt {
>> >> @@ -80,6 +85,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>> >> struct msghdr *msg,
>> >> size_t len, int flags);
>> >>
>> >> +int
>> >> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>> >> + struct msghdr *msg,
>> >> + int flags,
>> >> + bool *msg_ready);
>> >> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>> >> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>> >>
>> >> diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
>> >> index 1d57ed3d84d2..cf9c165e5cca 100644
>> >> --- a/include/uapi/linux/virtio_vsock.h
>> >> +++ b/include/uapi/linux/virtio_vsock.h
>> >> @@ -63,8 +63,14 @@ struct virtio_vsock_hdr {
>> >> __le32 fwd_cnt;
>> >> } __attribute__((packed));
>> >>
>> >> +struct virtio_vsock_seq_hdr {
>> >> + __le32 msg_cnt;
>> >> + __le32 msg_len;
>> >> +} __attribute__((packed));
>> >> +
>> >> enum virtio_vsock_type {
>> >> VIRTIO_VSOCK_TYPE_STREAM = 1,
>> >> + VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
>> >> };
>> >>
>> >> enum virtio_vsock_op {
>> >> @@ -83,6 +89,11 @@ enum virtio_vsock_op {
>> >> VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
>> >> /* Request the peer to send the credit info to us */
>> >> VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
>> >> +
>> >> + /* Record begin for SOCK_SEQPACKET */
>> >> + VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
>> >> + /* Record end for SOCK_SEQPACKET */
>> >> + VIRTIO_VSOCK_OP_SEQ_END = 9,
>> >> };
>> >>
>> >> /* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
>> >> @@ -91,4 +102,9 @@ enum virtio_vsock_shutdown {
>> >> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
>> >> };
>> >>
>> >> +/* VIRTIO_VSOCK_OP_RW flags values */
>> >> +enum virtio_vsock_rw {
>> >> + VIRTIO_VSOCK_RW_EOR = 1,
>> >> +};
>> >> +
>> >> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
>> > Probably a good idea to also have a feature bit gating
>> > this functionality.
>>
>> IIUC this also requires some qemu patch, because in current
>>
>> implementation of vsock device in qemu, there is no 'set_features'
>>
>> callback for such device. This callback will handle guest's write
>>
>> to feature register, by calling vhost kernel backend, where this
>>
>> bit will be processed by host.
>
>Well patching userspace to make use of a kernel feature
>is par for the course, isn't it?
>
>>
>> IMHO I'm not sure that SEQPACKET support needs feature
>>
>> bit - it is just two new ops for virtio vsock protocol, and from point
>>
>> of view of virtio device it is same as STREAM. May be it is needed
>>
>> for cases when client tries to connect to server which doesn't support
>>
>> SEQPACKET, so without bit result will be "Connection reset by peer",
>>
>> and with such bit client will know that server doesn't support it and
>>
>> 'socket(SOCK_SEQPACKET)' will return error?
>
>Yes, a better error handling would be one reason to do it like this.
Agree, in this way we could implement a 'seqpacket_allow' callback
(similar to 'stream_allow'), and we can return 'true' if the feature is
negotiated.
So instead of checking all the seqpacket callbacks, we can use only this
callback to understand if the transport support it.
We can implement it also for other transports (vmci, hyperv) and return
always false for now.
Thanks,
Stefano
On Wed, Feb 24, 2021 at 11:28:50AM +0300, Arseny Krasnov wrote:
>
>On 24.02.2021 11:23, Stefano Garzarella wrote:
>> On Wed, Feb 24, 2021 at 07:29:25AM +0300, Arseny Krasnov wrote:
>>> On 23.02.2021 17:50, Stefano Garzarella wrote:
>>>> On Mon, Feb 22, 2021 at 03:23:11PM +0100, Stefano Garzarella wrote:
>>>>> Hi Arseny,
>>>>>
>>>>> On Thu, Feb 18, 2021 at 08:33:44AM +0300, Arseny Krasnov wrote:
>>>>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>>>>> transport.
>>>>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>>>>> do it, two new packet operations were added: first for start of record
>>>>>> and second to mark end of record(SEQ_BEGIN and SEQ_END later). Also,
>>>>>> both operations carries metadata - to maintain boundaries and payload
>>>>>> integrity. Metadata is introduced by adding special header with two
>>>>>> fields - message count and message length:
>>>>>>
>>>>>> struct virtio_vsock_seq_hdr {
>>>>>> __le32 msg_cnt;
>>>>>> __le32 msg_len;
>>>>>> } __attribute__((packed));
>>>>>>
>>>>>> This header is transmitted as payload of SEQ_BEGIN and SEQ_END
>>>>>> packets(buffer of second virtio descriptor in chain) in the same way as
>>>>>> data transmitted in RW packets. Payload was chosen as buffer for this
>>>>>> header to avoid touching first virtio buffer which carries header of
>>>>>> packet, because someone could check that size of this buffer is equal
>>>>>> to size of packet header. To send record, packet with start marker is
>>>>>> sent first(it's header contains length of record and counter), then
>>>>>> counter is incremented and all data is sent as usual 'RW' packets and
>>>>>> finally SEQ_END is sent(it also carries counter of message, which is
>>>>>> counter of SEQ_BEGIN + 1), also after sedning SEQ_END counter is
>>>>>> incremented again. On receiver's side, length of record is known from
>>>>>> packet with start record marker. To check that no packets were dropped
>>>>>> by transport, counters of two sequential SEQ_BEGIN and SEQ_END are
>>>>>> checked(counter of SEQ_END must be bigger that counter of SEQ_BEGIN by
>>>>>> 1) and length of data between two markers is compared to length in
>>>>>> SEQ_BEGIN header.
>>>>>> Now as packets of one socket are not reordered neither on
>>>>>> vsock nor on vhost transport layers, such markers allows to restore
>>>>>> original record on receiver's side. If user's buffer is smaller that
>>>>>> record length, when all out of size data is dropped.
>>>>>> Maximum length of datagram is not limited as in stream socket,
>>>>>> because same credit logic is used. Difference with stream socket is
>>>>>> that user is not woken up until whole record is received or error
>>>>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>>>>> Tests also implemented.
>>>>> I reviewed the first part (af_vsock.c changes), tomorrow I'll review
>>>>> the rest. That part looks great to me, only found a few minor issues.
>>>> I revieiwed the rest of it as well, left a few minor comments, but I
>>>> think we're well on track.
>>>>
>>>> I'll take a better look at the specification patch tomorrow.
>>> Great, Thank You
>>>> Thanks,
>>>> Stefano
>>>>
>>>>> In the meantime, however, I'm getting a doubt, especially with regard
>>>>> to other transports besides virtio.
>>>>>
>>>>> Should we hide the begin/end marker sending in the transport?
>>>>>
>>>>> I mean, should the transport just provide a seqpacket_enqueue()
>>>>> callbacl?
>>>>> Inside it then the transport will send the markers. This is because
>>>>> some transports might not need to send markers.
>>>>>
>>>>> But thinking about it more, they could actually implement stubs for
>>>>> that calls, if they don't need to send markers.
>>>>>
>>>>> So I think for now it's fine since it allows us to reuse a lot of
>>>>> code, unless someone has some objection.
>>> I thought about that, I'll try to implement it in next version. Let's see...
>> If you want to discuss it first, write down the idea you want to
>> implement, I wouldn't want to make you do unnecessary work. :-)
>
>Idea is simple, in iov iterator of 'struct msghdr' which is passed to
>
>enqueue callback we have two fields: 'iov_offset' which is byte
>
>offset inside io vector where next data must be picked and 'count'
>
>which is rest of unprocessed bytes in io vector. So in seqpacket
>
>enqueue callback if 'iov_offset' is 0 i'll send SEQBEGIN, and if
>
>'count' is 0 i'll send SEQEND.
>
Got it, make sense and it's defently more transparent for the vsock
core!
Go head, maybe adding a comment in the vsock core explaining this, so
other developers can understand better if they want to support SEPACKET
in other transports.
Thanks,
Stefano
On 24.02.2021 11:35, Stefano Garzarella wrote:
> On Wed, Feb 24, 2021 at 11:28:50AM +0300, Arseny Krasnov wrote:
>> On 24.02.2021 11:23, Stefano Garzarella wrote:
>>> On Wed, Feb 24, 2021 at 07:29:25AM +0300, Arseny Krasnov wrote:
>>>> On 23.02.2021 17:50, Stefano Garzarella wrote:
>>>>> On Mon, Feb 22, 2021 at 03:23:11PM +0100, Stefano Garzarella wrote:
>>>>>> Hi Arseny,
>>>>>>
>>>>>> On Thu, Feb 18, 2021 at 08:33:44AM +0300, Arseny Krasnov wrote:
>>>>>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>>>>>> transport.
>>>>>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>>>>>> do it, two new packet operations were added: first for start of record
>>>>>>> and second to mark end of record(SEQ_BEGIN and SEQ_END later). Also,
>>>>>>> both operations carries metadata - to maintain boundaries and payload
>>>>>>> integrity. Metadata is introduced by adding special header with two
>>>>>>> fields - message count and message length:
>>>>>>>
>>>>>>> struct virtio_vsock_seq_hdr {
>>>>>>> __le32 msg_cnt;
>>>>>>> __le32 msg_len;
>>>>>>> } __attribute__((packed));
>>>>>>>
>>>>>>> This header is transmitted as payload of SEQ_BEGIN and SEQ_END
>>>>>>> packets(buffer of second virtio descriptor in chain) in the same way as
>>>>>>> data transmitted in RW packets. Payload was chosen as buffer for this
>>>>>>> header to avoid touching first virtio buffer which carries header of
>>>>>>> packet, because someone could check that size of this buffer is equal
>>>>>>> to size of packet header. To send record, packet with start marker is
>>>>>>> sent first(it's header contains length of record and counter), then
>>>>>>> counter is incremented and all data is sent as usual 'RW' packets and
>>>>>>> finally SEQ_END is sent(it also carries counter of message, which is
>>>>>>> counter of SEQ_BEGIN + 1), also after sedning SEQ_END counter is
>>>>>>> incremented again. On receiver's side, length of record is known from
>>>>>>> packet with start record marker. To check that no packets were dropped
>>>>>>> by transport, counters of two sequential SEQ_BEGIN and SEQ_END are
>>>>>>> checked(counter of SEQ_END must be bigger that counter of SEQ_BEGIN by
>>>>>>> 1) and length of data between two markers is compared to length in
>>>>>>> SEQ_BEGIN header.
>>>>>>> Now as packets of one socket are not reordered neither on
>>>>>>> vsock nor on vhost transport layers, such markers allows to restore
>>>>>>> original record on receiver's side. If user's buffer is smaller that
>>>>>>> record length, when all out of size data is dropped.
>>>>>>> Maximum length of datagram is not limited as in stream socket,
>>>>>>> because same credit logic is used. Difference with stream socket is
>>>>>>> that user is not woken up until whole record is received or error
>>>>>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>>>>>> Tests also implemented.
>>>>>> I reviewed the first part (af_vsock.c changes), tomorrow I'll review
>>>>>> the rest. That part looks great to me, only found a few minor issues.
>>>>> I revieiwed the rest of it as well, left a few minor comments, but I
>>>>> think we're well on track.
>>>>>
>>>>> I'll take a better look at the specification patch tomorrow.
>>>> Great, Thank You
>>>>> Thanks,
>>>>> Stefano
>>>>>
>>>>>> In the meantime, however, I'm getting a doubt, especially with regard
>>>>>> to other transports besides virtio.
>>>>>>
>>>>>> Should we hide the begin/end marker sending in the transport?
>>>>>>
>>>>>> I mean, should the transport just provide a seqpacket_enqueue()
>>>>>> callbacl?
>>>>>> Inside it then the transport will send the markers. This is because
>>>>>> some transports might not need to send markers.
>>>>>>
>>>>>> But thinking about it more, they could actually implement stubs for
>>>>>> that calls, if they don't need to send markers.
>>>>>>
>>>>>> So I think for now it's fine since it allows us to reuse a lot of
>>>>>> code, unless someone has some objection.
>>>> I thought about that, I'll try to implement it in next version. Let's see...
>>> If you want to discuss it first, write down the idea you want to
>>> implement, I wouldn't want to make you do unnecessary work. :-)
>> Idea is simple, in iov iterator of 'struct msghdr' which is passed to
>>
>> enqueue callback we have two fields: 'iov_offset' which is byte
>>
>> offset inside io vector where next data must be picked and 'count'
>>
>> which is rest of unprocessed bytes in io vector. So in seqpacket
>>
>> enqueue callback if 'iov_offset' is 0 i'll send SEQBEGIN, and if
>>
>> 'count' is 0 i'll send SEQEND.
>>
> Got it, make sense and it's defently more transparent for the vsock
> core!
> Go head, maybe adding a comment in the vsock core explaining this, so
> other developers can understand better if they want to support SEPACKET
> in other transports.
Ack
>
> Thanks,
> Stefano
>
>
On 23.02.2021 17:50, Stefano Garzarella wrote:
> On Mon, Feb 22, 2021 at 03:23:11PM +0100, Stefano Garzarella wrote:
>> Hi Arseny,
>>
>> On Thu, Feb 18, 2021 at 08:33:44AM +0300, Arseny Krasnov wrote:
>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>> transport.
>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>> do it, two new packet operations were added: first for start of record
>>> and second to mark end of record(SEQ_BEGIN and SEQ_END later). Also,
>>> both operations carries metadata - to maintain boundaries and payload
>>> integrity. Metadata is introduced by adding special header with two
>>> fields - message count and message length:
>>>
>>> struct virtio_vsock_seq_hdr {
>>> __le32 msg_cnt;
>>> __le32 msg_len;
>>> } __attribute__((packed));
>>>
>>> This header is transmitted as payload of SEQ_BEGIN and SEQ_END
>>> packets(buffer of second virtio descriptor in chain) in the same way as
>>> data transmitted in RW packets. Payload was chosen as buffer for this
>>> header to avoid touching first virtio buffer which carries header of
>>> packet, because someone could check that size of this buffer is equal
>>> to size of packet header. To send record, packet with start marker is
>>> sent first(it's header contains length of record and counter), then
>>> counter is incremented and all data is sent as usual 'RW' packets and
>>> finally SEQ_END is sent(it also carries counter of message, which is
>>> counter of SEQ_BEGIN + 1), also after sedning SEQ_END counter is
>>> incremented again. On receiver's side, length of record is known from
>>> packet with start record marker. To check that no packets were dropped
>>> by transport, counters of two sequential SEQ_BEGIN and SEQ_END are
>>> checked(counter of SEQ_END must be bigger that counter of SEQ_BEGIN by
>>> 1) and length of data between two markers is compared to length in
>>> SEQ_BEGIN header.
>>> Now as packets of one socket are not reordered neither on
>>> vsock nor on vhost transport layers, such markers allows to restore
>>> original record on receiver's side. If user's buffer is smaller that
>>> record length, when all out of size data is dropped.
>>> Maximum length of datagram is not limited as in stream socket,
>>> because same credit logic is used. Difference with stream socket is
>>> that user is not woken up until whole record is received or error
>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>> Tests also implemented.
>> I reviewed the first part (af_vsock.c changes), tomorrow I'll review
>> the rest. That part looks great to me, only found a few minor issues.
> I revieiwed the rest of it as well, left a few minor comments, but I
> think we're well on track.
>
> I'll take a better look at the specification patch tomorrow.
Great, Thank You
>
> Thanks,
> Stefano
>
>> In the meantime, however, I'm getting a doubt, especially with regard
>> to other transports besides virtio.
>>
>> Should we hide the begin/end marker sending in the transport?
>>
>> I mean, should the transport just provide a seqpacket_enqueue()
>> callbacl?
>> Inside it then the transport will send the markers. This is because
>> some transports might not need to send markers.
>>
>> But thinking about it more, they could actually implement stubs for
>> that calls, if they don't need to send markers.
>>
>> So I think for now it's fine since it allows us to reuse a lot of
>> code, unless someone has some objection.
I thought about that, I'll try to implement it in next version. Let's see...
>>
>> Thanks,
>> Stefano
>>
>
On 23.02.2021 17:17, Michael S. Tsirkin wrote:
> On Thu, Feb 18, 2021 at 08:39:37AM +0300, Arseny Krasnov wrote:
>> This adds transport callback and it's logic for SEQPACKET dequeue.
>> Callback fetches RW packets from rx queue of socket until whole record
>> is copied(if user's buffer is full, user is not woken up). This is done
>> to not stall sender, because if we wake up user and it leaves syscall,
>> nobody will send credit update for rest of record, and sender will wait
>> for next enter of read syscall at receiver's side. So if user buffer is
>> full, we just send credit update and drop data. If during copy SEQ_BEGIN
>> was found(and not all data was copied), copying is restarted by reset
>> user's iov iterator(previous unfinished data is dropped).
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> include/linux/virtio_vsock.h | 10 +++
>> include/uapi/linux/virtio_vsock.h | 16 ++++
>> net/vmw_vsock/virtio_transport_common.c | 114 ++++++++++++++++++++++++
>> 3 files changed, 140 insertions(+)
>>
>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>> index dc636b727179..003d06ae4a85 100644
>> --- a/include/linux/virtio_vsock.h
>> +++ b/include/linux/virtio_vsock.h
>> @@ -36,6 +36,11 @@ struct virtio_vsock_sock {
>> u32 rx_bytes;
>> u32 buf_alloc;
>> struct list_head rx_queue;
>> +
>> + /* For SOCK_SEQPACKET */
>> + u32 user_read_seq_len;
>> + u32 user_read_copied;
>> + u32 curr_rx_msg_cnt;
>
> wrap these in a struct to make it's clearer they
> are related?
Ack
>
>> };
>>
>> struct virtio_vsock_pkt {
>> @@ -80,6 +85,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>> struct msghdr *msg,
>> size_t len, int flags);
>>
>> +int
>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>> + struct msghdr *msg,
>> + int flags,
>> + bool *msg_ready);
>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>
>> diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
>> index 1d57ed3d84d2..cf9c165e5cca 100644
>> --- a/include/uapi/linux/virtio_vsock.h
>> +++ b/include/uapi/linux/virtio_vsock.h
>> @@ -63,8 +63,14 @@ struct virtio_vsock_hdr {
>> __le32 fwd_cnt;
>> } __attribute__((packed));
>>
>> +struct virtio_vsock_seq_hdr {
>> + __le32 msg_cnt;
>> + __le32 msg_len;
>> +} __attribute__((packed));
>> +
>> enum virtio_vsock_type {
>> VIRTIO_VSOCK_TYPE_STREAM = 1,
>> + VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
>> };
>>
>> enum virtio_vsock_op {
>> @@ -83,6 +89,11 @@ enum virtio_vsock_op {
>> VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
>> /* Request the peer to send the credit info to us */
>> VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
>> +
>> + /* Record begin for SOCK_SEQPACKET */
>> + VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
>> + /* Record end for SOCK_SEQPACKET */
>> + VIRTIO_VSOCK_OP_SEQ_END = 9,
>> };
>>
>> /* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
>> @@ -91,4 +102,9 @@ enum virtio_vsock_shutdown {
>> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
>> };
>>
>> +/* VIRTIO_VSOCK_OP_RW flags values */
>> +enum virtio_vsock_rw {
>> + VIRTIO_VSOCK_RW_EOR = 1,
>> +};
>> +
>> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
> Probably a good idea to also have a feature bit gating
> this functionality.
IIUC this also requires some qemu patch, because in current
implementation of vsock device in qemu, there is no 'set_features'
callback for such device. This callback will handle guest's write
to feature register, by calling vhost kernel backend, where this
bit will be processed by host.
IMHO I'm not sure that SEQPACKET support needs feature
bit - it is just two new ops for virtio vsock protocol, and from point
of view of virtio device it is same as STREAM. May be it is needed
for cases when client tries to connect to server which doesn't support
SEQPACKET, so without bit result will be "Connection reset by peer",
and with such bit client will know that server doesn't support it and
'socket(SOCK_SEQPACKET)' will return error?
>
On Wed, Feb 24, 2021 at 08:07:48AM +0300, Arseny Krasnov wrote:
>
> On 23.02.2021 17:17, Michael S. Tsirkin wrote:
> > On Thu, Feb 18, 2021 at 08:39:37AM +0300, Arseny Krasnov wrote:
> >> This adds transport callback and it's logic for SEQPACKET dequeue.
> >> Callback fetches RW packets from rx queue of socket until whole record
> >> is copied(if user's buffer is full, user is not woken up). This is done
> >> to not stall sender, because if we wake up user and it leaves syscall,
> >> nobody will send credit update for rest of record, and sender will wait
> >> for next enter of read syscall at receiver's side. So if user buffer is
> >> full, we just send credit update and drop data. If during copy SEQ_BEGIN
> >> was found(and not all data was copied), copying is restarted by reset
> >> user's iov iterator(previous unfinished data is dropped).
> >>
> >> Signed-off-by: Arseny Krasnov <[email protected]>
> >> ---
> >> include/linux/virtio_vsock.h | 10 +++
> >> include/uapi/linux/virtio_vsock.h | 16 ++++
> >> net/vmw_vsock/virtio_transport_common.c | 114 ++++++++++++++++++++++++
> >> 3 files changed, 140 insertions(+)
> >>
> >> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
> >> index dc636b727179..003d06ae4a85 100644
> >> --- a/include/linux/virtio_vsock.h
> >> +++ b/include/linux/virtio_vsock.h
> >> @@ -36,6 +36,11 @@ struct virtio_vsock_sock {
> >> u32 rx_bytes;
> >> u32 buf_alloc;
> >> struct list_head rx_queue;
> >> +
> >> + /* For SOCK_SEQPACKET */
> >> + u32 user_read_seq_len;
> >> + u32 user_read_copied;
> >> + u32 curr_rx_msg_cnt;
> >
> > wrap these in a struct to make it's clearer they
> > are related?
> Ack
> >
> >> };
> >>
> >> struct virtio_vsock_pkt {
> >> @@ -80,6 +85,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> >> struct msghdr *msg,
> >> size_t len, int flags);
> >>
> >> +int
> >> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
> >> + struct msghdr *msg,
> >> + int flags,
> >> + bool *msg_ready);
> >> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> >> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
> >>
> >> diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
> >> index 1d57ed3d84d2..cf9c165e5cca 100644
> >> --- a/include/uapi/linux/virtio_vsock.h
> >> +++ b/include/uapi/linux/virtio_vsock.h
> >> @@ -63,8 +63,14 @@ struct virtio_vsock_hdr {
> >> __le32 fwd_cnt;
> >> } __attribute__((packed));
> >>
> >> +struct virtio_vsock_seq_hdr {
> >> + __le32 msg_cnt;
> >> + __le32 msg_len;
> >> +} __attribute__((packed));
> >> +
> >> enum virtio_vsock_type {
> >> VIRTIO_VSOCK_TYPE_STREAM = 1,
> >> + VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
> >> };
> >>
> >> enum virtio_vsock_op {
> >> @@ -83,6 +89,11 @@ enum virtio_vsock_op {
> >> VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
> >> /* Request the peer to send the credit info to us */
> >> VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
> >> +
> >> + /* Record begin for SOCK_SEQPACKET */
> >> + VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
> >> + /* Record end for SOCK_SEQPACKET */
> >> + VIRTIO_VSOCK_OP_SEQ_END = 9,
> >> };
> >>
> >> /* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
> >> @@ -91,4 +102,9 @@ enum virtio_vsock_shutdown {
> >> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
> >> };
> >>
> >> +/* VIRTIO_VSOCK_OP_RW flags values */
> >> +enum virtio_vsock_rw {
> >> + VIRTIO_VSOCK_RW_EOR = 1,
> >> +};
> >> +
> >> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
> > Probably a good idea to also have a feature bit gating
> > this functionality.
>
> IIUC this also requires some qemu patch, because in current
>
> implementation of vsock device in qemu, there is no 'set_features'
>
> callback for such device. This callback will handle guest's write
>
> to feature register, by calling vhost kernel backend, where this
>
> bit will be processed by host.
Well patching userspace to make use of a kernel feature
is par for the course, isn't it?
>
> IMHO I'm not sure that SEQPACKET support needs feature
>
> bit - it is just two new ops for virtio vsock protocol, and from point
>
> of view of virtio device it is same as STREAM. May be it is needed
>
> for cases when client tries to connect to server which doesn't support
>
> SEQPACKET, so without bit result will be "Connection reset by peer",
>
> and with such bit client will know that server doesn't support it and
>
> 'socket(SOCK_SEQPACKET)' will return error?
Yes, a better error handling would be one reason to do it like this.
--
MST
On 18 Feb 2021, at 06:37, Arseny Krasnov <[email protected]> wrote:
>
> This adds receive loop for SEQPACKET. It looks like receive loop for
> STREAM, but there is a little bit difference:
> 1) It doesn't call notify callbacks.
> 2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
> there is no sense for these values in SEQPACKET case.
> 3) It waits until whole record is received or error is found during
> receiving.
> 4) It processes and sets 'MSG_TRUNC' flag.
>
> So to avoid extra conditions for two types of socket inside one loop, two
> independent functions were created.
>
> Signed-off-by: Arseny Krasnov <[email protected]>
> ---
> include/net/af_vsock.h | 5 +++
> net/vmw_vsock/af_vsock.c | 97 +++++++++++++++++++++++++++++++++++++++-
> 2 files changed, 101 insertions(+), 1 deletion(-)
>
> diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
> index b1c717286993..01563338cc03 100644
> --- a/include/net/af_vsock.h
> +++ b/include/net/af_vsock.h
> @@ -135,6 +135,11 @@ struct vsock_transport {
> bool (*stream_is_active)(struct vsock_sock *);
> bool (*stream_allow)(u32 cid, u32 port);
>
> + /* SEQ_PACKET. */
> + size_t (*seqpacket_seq_get_len)(struct vsock_sock *vsk);
> + int (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
> + int flags, bool *msg_ready);
> +
> /* Notification. */
> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
> index d277dc1cdbdf..b754927a556a 100644
> --- a/net/vmw_vsock/af_vsock.c
> +++ b/net/vmw_vsock/af_vsock.c
> @@ -1972,6 +1972,98 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
> return err;
> }
>
> +static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
> + size_t len, int flags)
> +{
> + const struct vsock_transport *transport;
> + const struct iovec *orig_iov;
> + unsigned long orig_nr_segs;
> + bool msg_ready;
> + struct vsock_sock *vsk;
> + size_t record_len;
> + long timeout;
> + int err = 0;
> + DEFINE_WAIT(wait);
> +
> + vsk = vsock_sk(sk);
> + transport = vsk->transport;
> +
> + timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
> + orig_nr_segs = msg->msg_iter.nr_segs;
> + orig_iov = msg->msg_iter.iov;
> + msg_ready = false;
> + record_len = 0;
> +
> + while (1) {
> + err = vsock_wait_data(sk, &wait, timeout, NULL, 0);
> +
> + if (err <= 0) {
> + /* In case of any loop break(timeout, signal
> + * interrupt or shutdown), we report user that
> + * nothing was copied.
> + */
> + err = 0;
> + break;
> + }
> +
> + if (record_len == 0) {
> + record_len =
> + transport->seqpacket_seq_get_len(vsk);
> +
> + if (record_len == 0)
> + continue;
> + }
> +
> + err = transport->seqpacket_dequeue(vsk, msg,
> + flags, &msg_ready);
> +
> + if (err < 0) {
> + if (err == -EAGAIN) {
> + iov_iter_init(&msg->msg_iter, READ,
> + orig_iov, orig_nr_segs,
> + len);
> + /* Clear 'MSG_EOR' here, because dequeue
> + * callback above set it again if it was
> + * set by sender. This 'MSG_EOR' is from
> + * dropped record.
> + */
> + msg->msg_flags &= ~MSG_EOR;
> + record_len = 0;
> + continue;
> + }
So a question for my understanding of the flow here. SOCK_SEQPACKET is reliable, so
what does it mean to drop the record? Is the transport supposed to roll back to the
beginning of the current record? If the incoming data in the transport doesn’t follow
the protocol, and packets need to be dropped, shouldn’t the socket be reset or similar?
Maybe there is potential for simplifying the flow if that is the case.
> +
> + err = -ENOMEM;
> + break;
> + }
> +
> + if (msg_ready)
> + break;
> + }
> +
> + if (sk->sk_err)
> + err = -sk->sk_err;
> + else if (sk->sk_shutdown & RCV_SHUTDOWN)
> + err = 0;
> +
> + if (msg_ready) {
> + /* User sets MSG_TRUNC, so return real length of
> + * packet.
> + */
> + if (flags & MSG_TRUNC)
> + err = record_len;
> + else
> + err = len - msg->msg_iter.count;
> +
> + /* Always set MSG_TRUNC if real length of packet is
> + * bigger than user's buffer.
> + */
> + if (record_len > len)
> + msg->msg_flags |= MSG_TRUNC;
> + }
> +
> + return err;
> +}
> +
> static int
> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> int flags)
> @@ -2027,7 +2119,10 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> goto out;
> }
>
> - err = __vsock_stream_recvmsg(sk, msg, len, flags);
> + if (sk->sk_type == SOCK_STREAM)
> + err = __vsock_stream_recvmsg(sk, msg, len, flags);
> + else
> + err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
>
> out:
> release_sock(sk);
> --
> 2.25.1
>
On 25.02.2021 19:27, Jorgen Hansen wrote:
> On 18 Feb 2021, at 06:37, Arseny Krasnov <[email protected]> wrote:
>> This adds receive loop for SEQPACKET. It looks like receive loop for
>> STREAM, but there is a little bit difference:
>> 1) It doesn't call notify callbacks.
>> 2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
>> there is no sense for these values in SEQPACKET case.
>> 3) It waits until whole record is received or error is found during
>> receiving.
>> 4) It processes and sets 'MSG_TRUNC' flag.
>>
>> So to avoid extra conditions for two types of socket inside one loop, two
>> independent functions were created.
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> include/net/af_vsock.h | 5 +++
>> net/vmw_vsock/af_vsock.c | 97 +++++++++++++++++++++++++++++++++++++++-
>> 2 files changed, 101 insertions(+), 1 deletion(-)
>>
>> diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>> index b1c717286993..01563338cc03 100644
>> --- a/include/net/af_vsock.h
>> +++ b/include/net/af_vsock.h
>> @@ -135,6 +135,11 @@ struct vsock_transport {
>> bool (*stream_is_active)(struct vsock_sock *);
>> bool (*stream_allow)(u32 cid, u32 port);
>>
>> + /* SEQ_PACKET. */
>> + size_t (*seqpacket_seq_get_len)(struct vsock_sock *vsk);
>> + int (*seqpacket_dequeue)(struct vsock_sock *vsk, struct msghdr *msg,
>> + int flags, bool *msg_ready);
>> +
>> /* Notification. */
>> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
>> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
>> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>> index d277dc1cdbdf..b754927a556a 100644
>> --- a/net/vmw_vsock/af_vsock.c
>> +++ b/net/vmw_vsock/af_vsock.c
>> @@ -1972,6 +1972,98 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
>> return err;
>> }
>>
>> +static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
>> + size_t len, int flags)
>> +{
>> + const struct vsock_transport *transport;
>> + const struct iovec *orig_iov;
>> + unsigned long orig_nr_segs;
>> + bool msg_ready;
>> + struct vsock_sock *vsk;
>> + size_t record_len;
>> + long timeout;
>> + int err = 0;
>> + DEFINE_WAIT(wait);
>> +
>> + vsk = vsock_sk(sk);
>> + transport = vsk->transport;
>> +
>> + timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>> + orig_nr_segs = msg->msg_iter.nr_segs;
>> + orig_iov = msg->msg_iter.iov;
>> + msg_ready = false;
>> + record_len = 0;
>> +
>> + while (1) {
>> + err = vsock_wait_data(sk, &wait, timeout, NULL, 0);
>> +
>> + if (err <= 0) {
>> + /* In case of any loop break(timeout, signal
>> + * interrupt or shutdown), we report user that
>> + * nothing was copied.
>> + */
>> + err = 0;
>> + break;
>> + }
>> +
>> + if (record_len == 0) {
>> + record_len =
>> + transport->seqpacket_seq_get_len(vsk);
>> +
>> + if (record_len == 0)
>> + continue;
>> + }
>> +
>> + err = transport->seqpacket_dequeue(vsk, msg,
>> + flags, &msg_ready);
>> +
>> + if (err < 0) {
>> + if (err == -EAGAIN) {
>> + iov_iter_init(&msg->msg_iter, READ,
>> + orig_iov, orig_nr_segs,
>> + len);
>> + /* Clear 'MSG_EOR' here, because dequeue
>> + * callback above set it again if it was
>> + * set by sender. This 'MSG_EOR' is from
>> + * dropped record.
>> + */
>> + msg->msg_flags &= ~MSG_EOR;
>> + record_len = 0;
>> + continue;
>> + }
> So a question for my understanding of the flow here. SOCK_SEQPACKET is reliable, so
> what does it mean to drop the record? Is the transport supposed to roll back to the
> beginning of the current record? If the incoming data in the transport doesn’t follow
> the protocol, and packets need to be dropped, shouldn’t the socket be reset or similar?
> Maybe there is potential for simplifying the flow if that is the case.
As vhost transport could drop some packets(for example when kmalloc failed),
in this case user will see part of record(when RW packet was dropped), or it will
be impossible to distinguish two records(when END of first and BEGIN of second
were missed). So in this case user continues to sleep and such orphaned packets
will be dropped.
Yes, it will simplify logic a lot, if i'll just send connection reset when invalid
sequence of packets were detected.
>
>> +
>> + err = -ENOMEM;
>> + break;
>> + }
>> +
>> + if (msg_ready)
>> + break;
>> + }
>> +
>> + if (sk->sk_err)
>> + err = -sk->sk_err;
>> + else if (sk->sk_shutdown & RCV_SHUTDOWN)
>> + err = 0;
>> +
>> + if (msg_ready) {
>> + /* User sets MSG_TRUNC, so return real length of
>> + * packet.
>> + */
>> + if (flags & MSG_TRUNC)
>> + err = record_len;
>> + else
>> + err = len - msg->msg_iter.count;
>> +
>> + /* Always set MSG_TRUNC if real length of packet is
>> + * bigger than user's buffer.
>> + */
>> + if (record_len > len)
>> + msg->msg_flags |= MSG_TRUNC;
>> + }
>> +
>> + return err;
>> +}
>> +
>> static int
>> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>> int flags)
>> @@ -2027,7 +2119,10 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>> goto out;
>> }
>>
>> - err = __vsock_stream_recvmsg(sk, msg, len, flags);
>> + if (sk->sk_type == SOCK_STREAM)
>> + err = __vsock_stream_recvmsg(sk, msg, len, flags);
>> + else
>> + err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
>>
>> out:
>> release_sock(sk);
>> --
>> 2.25.1
>>
On 24.02.2021 09:41, Michael S. Tsirkin wrote:
> On Wed, Feb 24, 2021 at 08:07:48AM +0300, Arseny Krasnov wrote:
>> On 23.02.2021 17:17, Michael S. Tsirkin wrote:
>>> On Thu, Feb 18, 2021 at 08:39:37AM +0300, Arseny Krasnov wrote:
>>>> This adds transport callback and it's logic for SEQPACKET dequeue.
>>>> Callback fetches RW packets from rx queue of socket until whole record
>>>> is copied(if user's buffer is full, user is not woken up). This is done
>>>> to not stall sender, because if we wake up user and it leaves syscall,
>>>> nobody will send credit update for rest of record, and sender will wait
>>>> for next enter of read syscall at receiver's side. So if user buffer is
>>>> full, we just send credit update and drop data. If during copy SEQ_BEGIN
>>>> was found(and not all data was copied), copying is restarted by reset
>>>> user's iov iterator(previous unfinished data is dropped).
>>>>
>>>> Signed-off-by: Arseny Krasnov <[email protected]>
>>>> ---
>>>> include/linux/virtio_vsock.h | 10 +++
>>>> include/uapi/linux/virtio_vsock.h | 16 ++++
>>>> net/vmw_vsock/virtio_transport_common.c | 114 ++++++++++++++++++++++++
>>>> 3 files changed, 140 insertions(+)
>>>>
>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>>> index dc636b727179..003d06ae4a85 100644
>>>> --- a/include/linux/virtio_vsock.h
>>>> +++ b/include/linux/virtio_vsock.h
>>>> @@ -36,6 +36,11 @@ struct virtio_vsock_sock {
>>>> u32 rx_bytes;
>>>> u32 buf_alloc;
>>>> struct list_head rx_queue;
>>>> +
>>>> + /* For SOCK_SEQPACKET */
>>>> + u32 user_read_seq_len;
>>>> + u32 user_read_copied;
>>>> + u32 curr_rx_msg_cnt;
>>> wrap these in a struct to make it's clearer they
>>> are related?
>> Ack
>>>> };
>>>>
>>>> struct virtio_vsock_pkt {
>>>> @@ -80,6 +85,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>>> struct msghdr *msg,
>>>> size_t len, int flags);
>>>>
>>>> +int
>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>>> + struct msghdr *msg,
>>>> + int flags,
>>>> + bool *msg_ready);
>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>>
>>>> diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
>>>> index 1d57ed3d84d2..cf9c165e5cca 100644
>>>> --- a/include/uapi/linux/virtio_vsock.h
>>>> +++ b/include/uapi/linux/virtio_vsock.h
>>>> @@ -63,8 +63,14 @@ struct virtio_vsock_hdr {
>>>> __le32 fwd_cnt;
>>>> } __attribute__((packed));
>>>>
>>>> +struct virtio_vsock_seq_hdr {
>>>> + __le32 msg_cnt;
>>>> + __le32 msg_len;
>>>> +} __attribute__((packed));
>>>> +
>>>> enum virtio_vsock_type {
>>>> VIRTIO_VSOCK_TYPE_STREAM = 1,
>>>> + VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
>>>> };
>>>>
>>>> enum virtio_vsock_op {
>>>> @@ -83,6 +89,11 @@ enum virtio_vsock_op {
>>>> VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
>>>> /* Request the peer to send the credit info to us */
>>>> VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
>>>> +
>>>> + /* Record begin for SOCK_SEQPACKET */
>>>> + VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
>>>> + /* Record end for SOCK_SEQPACKET */
>>>> + VIRTIO_VSOCK_OP_SEQ_END = 9,
>>>> };
>>>>
>>>> /* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
>>>> @@ -91,4 +102,9 @@ enum virtio_vsock_shutdown {
>>>> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
>>>> };
>>>>
>>>> +/* VIRTIO_VSOCK_OP_RW flags values */
>>>> +enum virtio_vsock_rw {
>>>> + VIRTIO_VSOCK_RW_EOR = 1,
>>>> +};
>>>> +
>>>> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
>>> Probably a good idea to also have a feature bit gating
>>> this functionality.
>> IIUC this also requires some qemu patch, because in current
>>
>> implementation of vsock device in qemu, there is no 'set_features'
>>
>> callback for such device. This callback will handle guest's write
>>
>> to feature register, by calling vhost kernel backend, where this
>>
>> bit will be processed by host.
> Well patching userspace to make use of a kernel feature
> is par for the course, isn't it?
>
>> IMHO I'm not sure that SEQPACKET support needs feature
>>
>> bit - it is just two new ops for virtio vsock protocol, and from point
>>
>> of view of virtio device it is same as STREAM. May be it is needed
>>
>> for cases when client tries to connect to server which doesn't support
>>
>> SEQPACKET, so without bit result will be "Connection reset by peer",
>>
>> and with such bit client will know that server doesn't support it and
>>
>> 'socket(SOCK_SEQPACKET)' will return error?
> Yes, a better error handling would be one reason to do it like this.
May be it will be better to add special flag to OP_RST. When someone
tries to connect to server which doesn't support such socket type(seqpacket
or dgram), connection reset is sent. This reset carries special flag which
indicates, that such socket type is not supported. Thus client will distinguish
cases when port listener is missed and socket type is not supported.
It will be easy to implement and qemu patch not needed.
>
On Thu, 18 Feb 2021 08:42:15 +0300
Arseny Krasnov <[email protected]> wrote:
Not sure if this was pulled in yet, but I do have a small issue with this
patch.
> @@ -69,14 +82,19 @@ TRACE_EVENT(virtio_transport_alloc_pkt,
> __entry->type = type;
> __entry->op = op;
> __entry->flags = flags;
> + __entry->msg_len = msg_len;
> + __entry->msg_cnt = msg_cnt;
> ),
> - TP_printk("%u:%u -> %u:%u len=%u type=%s op=%s flags=%#x",
> + TP_printk("%u:%u -> %u:%u len=%u type=%s op=%s flags=%#x "
> + "msg_len=%u msg_cnt=%u",
It's considered poor formatting to split strings like the above. This is
one of the exceptions for the 80 character limit. Do not break strings just
to keep it within 80 characters.
-- Steve
> __entry->src_cid, __entry->src_port,
> __entry->dst_cid, __entry->dst_port,
> __entry->len,
> show_type(__entry->type),
> show_op(__entry->op),
> - __entry->flags)
> + __entry->flags,
> + __entry->msg_len,
> + __entry->msg_cnt)
> );
On 03.03.2021 01:25, Steven Rostedt wrote:
> On Thu, 18 Feb 2021 08:42:15 +0300
> Arseny Krasnov <[email protected]> wrote:
>
> Not sure if this was pulled in yet, but I do have a small issue with this
> patch.
No, it is in RFC state.
>
>> @@ -69,14 +82,19 @@ TRACE_EVENT(virtio_transport_alloc_pkt,
>> __entry->type = type;
>> __entry->op = op;
>> __entry->flags = flags;
>> + __entry->msg_len = msg_len;
>> + __entry->msg_cnt = msg_cnt;
>> ),
>> - TP_printk("%u:%u -> %u:%u len=%u type=%s op=%s flags=%#x",
>> + TP_printk("%u:%u -> %u:%u len=%u type=%s op=%s flags=%#x "
>> + "msg_len=%u msg_cnt=%u",
> It's considered poor formatting to split strings like the above. This is
> one of the exceptions for the 80 character limit. Do not break strings just
> to keep it within 80 characters.
>
> -- Steve
Ok, will fix in next version, Thank You
>
>
>> __entry->src_cid, __entry->src_port,
>> __entry->dst_cid, __entry->dst_port,
>> __entry->len,
>> show_type(__entry->type),
>> show_op(__entry->op),
>> - __entry->flags)
>> + __entry->flags,
>> + __entry->msg_len,
>> + __entry->msg_cnt)
>> );