This patchset impelements support of SOCK_SEQPACKET for virtio
transport.
As SOCK_SEQPACKET guarantees to save record boundaries, so to
do it, new packet operation was added: it marks start of record (with
record length in header), such packet doesn't carry any data. To send
record, packet with start marker is sent first, then all data is sent
as usual 'RW' packets. On receiver's side, length of record is known
from packet with start record marker. Now as packets of one socket
are not reordered neither on vsock nor on vhost transport layers, such
marker allows to restore original record on receiver's side. If user's
buffer is smaller that record length, when all out of size data is
dropped.
Maximum length of datagram is not limited as in stream socket,
because same credit logic is used. Difference with stream socket is
that user is not woken up until whole record is received or error
occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
Tests also implemented.
Arseny Krasnov (13):
af_vsock: prepare for SOCK_SEQPACKET support
af_vsock: prepare 'vsock_connectible_recvmsg()'
af_vsock: implement SEQPACKET rx loop
af_vsock: implement send logic for SOCK_SEQPACKET
af_vsock: rest of SEQPACKET support
af_vsock: update comments for stream sockets
virtio/vsock: dequeue callback for SOCK_SEQPACKET
virtio/vsock: fetch length for SEQPACKET record
virtio/vsock: add SEQPACKET receive logic
virtio/vsock: rest of SOCK_SEQPACKET support
virtio/vsock: setup SEQPACKET ops for transport
vhost/vsock: setup SEQPACKET ops for transport
vsock_test: add SOCK_SEQPACKET tests
drivers/vhost/vsock.c | 7 +-
include/linux/virtio_vsock.h | 12 +
include/net/af_vsock.h | 6 +
include/uapi/linux/virtio_vsock.h | 9 +
net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
net/vmw_vsock/virtio_transport.c | 4 +
net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
tools/testing/vsock/util.c | 32 +-
tools/testing/vsock/util.h | 3 +
tools/testing/vsock/vsock_test.c | 126 +++++
10 files changed, 862 insertions(+), 175 deletions(-)
TODO:
- Support for record integrity control. As transport could drop some
packets, something like "record-id" and record end marker need to
be implemented. Idea is that SEQ_BEGIN packet carries both record
length and record id, end marker(let it be SEQ_END) carries only
record id. To be sure that no one packet was lost, receiver checks
length of data between SEQ_BEGIN and SEQ_END(it must be same with
value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
means that both markers were not dropped. I think that easiest way
to implement record id for SEQ_BEGIN is to reuse another field of
packet header(SEQ_BEGIN already uses 'flags' as record length).For
SEQ_END record id could be stored in 'flags'.
Another way to implement it, is to move metadata of both SEQ_END
and SEQ_BEGIN to payload. But this approach has problem, because
if we move something to payload, such payload is accounted by
credit logic, which fragments payload, while payload with record
length and id couldn't be fragmented. One way to overcome it is to
ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
is to update 'stream_has_space()' function: current implementation
return non-zero when at least 1 byte is allowed to use,but updated
version will have extra argument, which is needed length. For 'RW'
packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
record id) and for SEQ_END it is sizeof(record id).
- What to do, when server doesn't support SOCK_SEQPACKET. In current
implementation RST is replied in the same way when listening port
is not found. I think that current RST is enough,because case when
server doesn't support SEQ_PACKET is same when listener missed(e.g.
no listener in both cases).
v2 -> v3:
- patches reorganized: split for prepare and implementation patches
- local variables are declared in "Reverse Christmas tree" manner
- virtio_transport_common.c: valid leXX_to_cpu() for vsock header
fields access
- af_vsock.c: 'vsock_connectible_*sockopt()' added as shared code
between stream and seqpacket sockets.
- af_vsock.c: loops in '__vsock_*_recvmsg()' refactored.
- af_vsock.c: 'vsock_wait_data()' refactored.
v1 -> v2:
- patches reordered: af_vsock.c related changes now before virtio vsock
- patches reorganized: more small patches, where +/- are not mixed
- tests for SOCK_SEQPACKET added
- all commit messages updated
- af_vsock.c: 'vsock_pre_recv_check()' inlined to
'vsock_connectible_recvmsg()'
- af_vsock.c: 'vsock_assign_transport()' returns ENODEV if transport
was not found
- virtio_transport_common.c: transport callback for seqpacket dequeue
- virtio_transport_common.c: simplified
'virtio_transport_recv_connected()'
- virtio_transport_common.c: send reset on socket and packet type
mismatch.
Signed-off-by: Arseny Krasnov <[email protected]>
--
2.25.1
This adds some logic to current stream enqueue function for SEQPACKET
support:
1) Send record begin marker with length of record.
2) Return value from enqueue function is wholevrecord length or error
for SOCK_SEQPACKET.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/net/af_vsock.h | 1 +
net/vmw_vsock/af_vsock.c | 16 ++++++++++++++--
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index 46073842d489..ec6bf4600ef8 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -136,6 +136,7 @@ struct vsock_transport {
bool (*stream_allow)(u32 cid, u32 port);
/* SEQ_PACKET. */
+ bool (*seqpacket_seq_send_len)(struct vsock_sock *, size_t len);
size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
ssize_t (*seqpacket_dequeue)(struct vsock_sock *, struct msghdr *,
size_t len, int flags);
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 3b266880b7c8..4600c1ec3bb3 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1767,6 +1767,12 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
if (err < 0)
goto out;
+ if (sk->sk_type == SOCK_SEQPACKET) {
+ err = transport->seqpacket_seq_send_len(vsk, len);
+ if (err < 0)
+ goto out;
+ }
+
while (total_written < len) {
ssize_t written;
@@ -1845,8 +1851,14 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
}
out_err:
- if (total_written > 0)
- err = total_written;
+ if (total_written > 0) {
+ /* Return number of written bytes only if:
+ * 1) SOCK_STREAM socket.
+ * 2) SOCK_SEQPACKET socket when whole buffer is sent.
+ */
+ if (sk->sk_type == SOCK_STREAM || total_written == len)
+ err = total_written;
+ }
out:
release_sock(sk);
return err;
--
2.25.1
This does rest of SOCK_SEQPACKET support:
1) Adds socket ops for SEQPACKET type.
2) Allows to create socket with SEQPACKET type.
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 71 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 71 insertions(+)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 4600c1ec3bb3..bbc3c31085aa 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -452,6 +452,7 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
new_transport = transport_dgram;
break;
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
if (vsock_use_local_transport(remote_cid))
new_transport = transport_local;
else if (remote_cid <= VMADDR_CID_HOST || !transport_h2g ||
@@ -459,6 +460,13 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
new_transport = transport_g2h;
else
new_transport = transport_h2g;
+
+ if (sk->sk_type == SOCK_SEQPACKET) {
+ if (!new_transport->seqpacket_seq_send_len ||
+ !new_transport->seqpacket_seq_get_len ||
+ !new_transport->seqpacket_dequeue)
+ return -ENODEV;
+ }
break;
default:
return -ESOCKTNOSUPPORT;
@@ -684,6 +692,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
switch (sk->sk_socket->type) {
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
spin_lock_bh(&vsock_table_lock);
retval = __vsock_bind_connectible(vsk, addr);
spin_unlock_bh(&vsock_table_lock);
@@ -1406,6 +1415,12 @@ static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
return vsock_connect(sock, addr, addr_len, flags);
}
+static int vsock_seqpacket_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
+{
+ return vsock_connect(sock, addr, addr_len, flags);
+}
+
static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
bool kern)
{
@@ -1633,6 +1648,16 @@ static int vsock_stream_setsockopt(struct socket *sock,
optlen);
}
+static int vsock_seqpacket_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
+{
+ return vsock_connectible_setsockopt(sock, level, optname, optval,
+ optlen);
+}
+
static int vsock_connectible_getsockopt(struct socket *sock,
int level, int optname,
char __user *optval,
@@ -1713,6 +1738,15 @@ static int vsock_stream_getsockopt(struct socket *sock,
optlen);
}
+static int vsock_seqpacket_getsockopt(struct socket *sock,
+ int level, int optname,
+ char __user *optval,
+ int __user *optlen)
+{
+ return vsock_connectible_getsockopt(sock, level, optname, optval,
+ optlen);
+}
+
static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
size_t len)
{
@@ -1870,6 +1904,12 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
return vsock_connectible_sendmsg(sock, msg, len);
}
+static int vsock_seqpacket_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
+{
+ return vsock_connectible_sendmsg(sock, msg, len);
+}
+
static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
long timeout,
struct vsock_transport_recv_notify_data *recv_data,
@@ -2184,6 +2224,13 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
return vsock_connectible_recvmsg(sock, msg, len, flags);
}
+static int
+vsock_seqpacket_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+ int flags)
+{
+ return vsock_connectible_recvmsg(sock, msg, len, flags);
+}
+
static const struct proto_ops vsock_stream_ops = {
.family = PF_VSOCK,
.owner = THIS_MODULE,
@@ -2205,6 +2252,27 @@ static const struct proto_ops vsock_stream_ops = {
.sendpage = sock_no_sendpage,
};
+static const struct proto_ops vsock_seqpacket_ops = {
+ .family = PF_VSOCK,
+ .owner = THIS_MODULE,
+ .release = vsock_release,
+ .bind = vsock_bind,
+ .connect = vsock_seqpacket_connect,
+ .socketpair = sock_no_socketpair,
+ .accept = vsock_accept,
+ .getname = vsock_getname,
+ .poll = vsock_poll,
+ .ioctl = sock_no_ioctl,
+ .listen = vsock_listen,
+ .shutdown = vsock_shutdown,
+ .setsockopt = vsock_seqpacket_setsockopt,
+ .getsockopt = vsock_seqpacket_getsockopt,
+ .sendmsg = vsock_seqpacket_sendmsg,
+ .recvmsg = vsock_seqpacket_recvmsg,
+ .mmap = sock_no_mmap,
+ .sendpage = sock_no_sendpage,
+};
+
static int vsock_create(struct net *net, struct socket *sock,
int protocol, int kern)
{
@@ -2225,6 +2293,9 @@ static int vsock_create(struct net *net, struct socket *sock,
case SOCK_STREAM:
sock->ops = &vsock_stream_ops;
break;
+ case SOCK_SEQPACKET:
+ sock->ops = &vsock_seqpacket_ops;
+ break;
default:
return -ESOCKTNOSUPPORT;
}
--
2.25.1
This replaces 'stream' to 'connect oriented' in comments as SEQPACKET is
also connect oriented.
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 31 +++++++++++++++++--------------
1 file changed, 17 insertions(+), 14 deletions(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index bbc3c31085aa..0ff1190aad7b 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -415,8 +415,8 @@ static void vsock_deassign_transport(struct vsock_sock *vsk)
/* Assign a transport to a socket and call the .init transport callback.
*
- * Note: for stream socket this must be called when vsk->remote_addr is set
- * (e.g. during the connect() or when a connection request on a listener
+ * Note: for connect oriented socket this must be called when vsk->remote_addr
+ * is set (e.g. during the connect() or when a connection request on a listener
* socket is received).
* The vsk->remote_addr is used to decide which transport to use:
* - remote CID == VMADDR_CID_LOCAL or g2h->local_cid or VMADDR_CID_HOST if
@@ -477,10 +477,10 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
return 0;
/* transport->release() must be called with sock lock acquired.
- * This path can only be taken during vsock_stream_connect(),
- * where we have already held the sock lock.
- * In the other cases, this function is called on a new socket
- * which is not assigned to any transport.
+ * This path can only be taken during vsock_connect(), where we
+ * have already held the sock lock. In the other cases, this
+ * function is called on a new socket which is not assigned to
+ * any transport.
*/
vsk->transport->release(vsk);
vsock_deassign_transport(vsk);
@@ -657,9 +657,10 @@ static int __vsock_bind_connectible(struct vsock_sock *vsk,
vsock_addr_init(&vsk->local_addr, new_addr.svm_cid, new_addr.svm_port);
- /* Remove stream sockets from the unbound list and add them to the hash
- * table for easy lookup by its address. The unbound list is simply an
- * extra entry at the end of the hash table, a trick used by AF_UNIX.
+ /* Remove connect oriented sockets from the unbound list and add them
+ * to the hash table for easy lookup by its address. The unbound list
+ * is simply an extra entry at the end of the hash table, a trick used
+ * by AF_UNIX.
*/
__vsock_remove_bound(vsk);
__vsock_insert_bound(vsock_bound_sockets(&vsk->local_addr), vsk);
@@ -950,10 +951,10 @@ static int vsock_shutdown(struct socket *sock, int mode)
if ((mode & ~SHUTDOWN_MASK) || !mode)
return -EINVAL;
- /* If this is a STREAM socket and it is not connected then bail out
- * immediately. If it is a DGRAM socket then we must first kick the
- * socket so that it wakes up from any sleeping calls, for example
- * recv(), and then afterwards return the error.
+ /* If this is a connect oriented socket and it is not connected then
+ * bail out immediately. If it is a DGRAM socket then we must first
+ * kick the socket so that it wakes up from any sleeping calls, for
+ * example recv(), and then afterwards return the error.
*/
sk = sock->sk;
@@ -1770,7 +1771,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
lock_sock(sk);
- /* Callers should not provide a destination with stream sockets. */
+ /* Callers should not provide a destination with connect oriented
+ * sockets.
+ */
if (msg->msg_namelen) {
err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
goto out;
--
2.25.1
This adds transport callback which tries to fetch record begin marker
from socket's rx queue. It is called from af_vsock.c before reading data
packets of record.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 1 +
net/vmw_vsock/virtio_transport_common.c | 33 +++++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index 7f0ef5204e33..af8705ea8b95 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -84,6 +84,7 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);
+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index e66ec39445ff..dcce35d7b462 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -420,6 +420,39 @@ static size_t virtio_transport_drop_until_seq_begin(struct virtio_vsock_sock *vv
return bytes_dropped;
}
+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk)
+{
+ struct virtio_vsock_sock *vvs = vsk->trans;
+ struct virtio_vsock_pkt *pkt;
+ size_t bytes_dropped;
+
+ spin_lock_bh(&vvs->rx_lock);
+
+ /* Fetch all orphaned 'RW', packets, and
+ * send credit update.
+ */
+ bytes_dropped = virtio_transport_drop_until_seq_begin(vvs);
+
+ if (list_empty(&vvs->rx_queue))
+ goto out;
+
+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+ vvs->user_read_copied = 0;
+ vvs->user_read_seq_len = le32_to_cpu(pkt->hdr.flags);
+ virtio_transport_del_n_free_pkt(pkt);
+out:
+ spin_unlock_bh(&vvs->rx_lock);
+
+ if (bytes_dropped)
+ virtio_transport_send_credit_update(vsk,
+ VIRTIO_VSOCK_TYPE_SEQPACKET,
+ NULL);
+
+ return vvs->user_read_seq_len;
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_get_len);
+
static ssize_t virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t user_buf_len)
--
2.25.1
This prepares af_vsock.c for SEQPACKET support:
1) As both stream and seqpacket sockets are connection oriented, add
check for SOCK_SEQPACKET to conditions where SOCK_STREAM is checked.
2) Some functions such as setsockopt(), getsockopt(), connect(),
recvmsg(), sendmsg() are shared between both types of sockets, so
rename them in general manner and create entry points for each type
of socket to call these functions(for stream in this patch, for
seqpacket in further patches).
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 91 +++++++++++++++++++++++++++++-----------
1 file changed, 67 insertions(+), 24 deletions(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index b12d3a322242..c9ce57db9554 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -604,8 +604,8 @@ static void vsock_pending_work(struct work_struct *work)
/**** SOCKET OPERATIONS ****/
-static int __vsock_bind_stream(struct vsock_sock *vsk,
- struct sockaddr_vm *addr)
+static int __vsock_bind_connectible(struct vsock_sock *vsk,
+ struct sockaddr_vm *addr)
{
static u32 port;
struct sockaddr_vm new_addr;
@@ -685,7 +685,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
switch (sk->sk_socket->type) {
case SOCK_STREAM:
spin_lock_bh(&vsock_table_lock);
- retval = __vsock_bind_stream(vsk, addr);
+ retval = __vsock_bind_connectible(vsk, addr);
spin_unlock_bh(&vsock_table_lock);
break;
@@ -767,6 +767,11 @@ static struct sock *__vsock_create(struct net *net,
return sk;
}
+static bool sock_type_connectible(u16 type)
+{
+ return (type == SOCK_STREAM || type == SOCK_SEQPACKET);
+}
+
static void __vsock_release(struct sock *sk, int level)
{
if (sk) {
@@ -785,7 +790,7 @@ static void __vsock_release(struct sock *sk, int level)
if (vsk->transport)
vsk->transport->release(vsk);
- else if (sk->sk_type == SOCK_STREAM)
+ else if (sock_type_connectible(sk->sk_type))
vsock_remove_sock(vsk);
sock_orphan(sk);
@@ -945,7 +950,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
sk = sock->sk;
if (sock->state == SS_UNCONNECTED) {
err = -ENOTCONN;
- if (sk->sk_type == SOCK_STREAM)
+ if (sock_type_connectible(sk->sk_type))
return err;
} else {
sock->state = SS_DISCONNECTING;
@@ -960,7 +965,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
sk->sk_state_change(sk);
release_sock(sk);
- if (sk->sk_type == SOCK_STREAM) {
+ if (sock_type_connectible(sk->sk_type)) {
sock_reset_flag(sk, SOCK_DONE);
vsock_send_shutdown(sk, mode);
}
@@ -1013,7 +1018,7 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
if (!(sk->sk_shutdown & SEND_SHUTDOWN))
mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;
- } else if (sock->type == SOCK_STREAM) {
+ } else if (sock_type_connectible(sk->sk_type)) {
const struct vsock_transport *transport = vsk->transport;
lock_sock(sk);
@@ -1259,8 +1264,8 @@ static void vsock_connect_timeout(struct work_struct *work)
sock_put(sk);
}
-static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
- int addr_len, int flags)
+static int vsock_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
{
int err;
struct sock *sk;
@@ -1395,6 +1400,12 @@ static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
return err;
}
+static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
+{
+ return vsock_connect(sock, addr, addr_len, flags);
+}
+
static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
bool kern)
{
@@ -1410,7 +1421,7 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
lock_sock(listener);
- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sock->type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1487,7 +1498,7 @@ static int vsock_listen(struct socket *sock, int backlog)
lock_sock(sk);
- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sk->sk_type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1531,11 +1542,11 @@ static void vsock_update_buffer_size(struct vsock_sock *vsk,
vsk->buffer_size = val;
}
-static int vsock_stream_setsockopt(struct socket *sock,
- int level,
- int optname,
- sockptr_t optval,
- unsigned int optlen)
+static int vsock_connectible_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
{
int err;
struct sock *sk;
@@ -1612,10 +1623,20 @@ static int vsock_stream_setsockopt(struct socket *sock,
return err;
}
-static int vsock_stream_getsockopt(struct socket *sock,
- int level, int optname,
- char __user *optval,
- int __user *optlen)
+static int vsock_stream_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
+{
+ return vsock_connectible_setsockopt(sock, level, optname, optval,
+ optlen);
+}
+
+static int vsock_connectible_getsockopt(struct socket *sock,
+ int level, int optname,
+ char __user *optval,
+ int __user *optlen)
{
int err;
int len;
@@ -1683,8 +1704,17 @@ static int vsock_stream_getsockopt(struct socket *sock,
return 0;
}
-static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
- size_t len)
+static int vsock_stream_getsockopt(struct socket *sock,
+ int level, int optname,
+ char __user *optval,
+ int __user *optlen)
+{
+ return vsock_connectible_getsockopt(sock, level, optname, optval,
+ optlen);
+}
+
+static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
{
struct sock *sk;
struct vsock_sock *vsk;
@@ -1822,10 +1852,16 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
return err;
}
+static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
+{
+ return vsock_connectible_sendmsg(sock, msg, len);
+}
+
static int
-vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
- int flags)
+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+ int flags)
{
struct sock *sk;
struct vsock_sock *vsk;
@@ -1995,6 +2031,13 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
return err;
}
+static int
+vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+ int flags)
+{
+ return vsock_connectible_recvmsg(sock, msg, len, flags);
+}
+
static const struct proto_ops vsock_stream_ops = {
.family = PF_VSOCK,
.owner = THIS_MODULE,
--
2.25.1
This prepares 'vsock_connectible_recvmg()' to call SEQPACKET receive
loop:
1) Some shared check left in this function, then socket type
specific receive loop is called.
2) Stream receive loop is moved to separate function.
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 242 ++++++++++++++++++++++-----------------
1 file changed, 138 insertions(+), 104 deletions(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index c9ce57db9554..524df8fc84cd 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1858,65 +1858,69 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
return vsock_connectible_sendmsg(sock, msg, len);
}
-
-static int
-vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
- int flags)
+static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
+ long timeout,
+ struct vsock_transport_recv_notify_data *recv_data,
+ size_t target)
{
- struct sock *sk;
+ int err = 0;
struct vsock_sock *vsk;
const struct vsock_transport *transport;
- int err;
- size_t target;
- ssize_t copied;
- long timeout;
- struct vsock_transport_recv_notify_data recv_data;
-
- DEFINE_WAIT(wait);
- sk = sock->sk;
vsk = vsock_sk(sk);
transport = vsk->transport;
- err = 0;
-
- lock_sock(sk);
-
- if (!transport || sk->sk_state != TCP_ESTABLISHED) {
- /* Recvmsg is supposed to return 0 if a peer performs an
- * orderly shutdown. Differentiate between that case and when a
- * peer has not connected or a local shutdown occured with the
- * SOCK_DONE flag.
- */
- if (sock_flag(sk, SOCK_DONE))
- err = 0;
- else
- err = -ENOTCONN;
+ if (sk->sk_err != 0 ||
+ (sk->sk_shutdown & RCV_SHUTDOWN) ||
+ (vsk->peer_shutdown & SEND_SHUTDOWN)) {
+ err = -1;
goto out;
}
-
- if (flags & MSG_OOB) {
- err = -EOPNOTSUPP;
+ /* Don't wait for non-blocking sockets. */
+ if (timeout == 0) {
+ err = -EAGAIN;
goto out;
}
- /* We don't check peer_shutdown flag here since peer may actually shut
- * down, but there can be data in the queue that a local socket can
- * receive.
- */
- if (sk->sk_shutdown & RCV_SHUTDOWN) {
- err = 0;
- goto out;
+ if (recv_data) {
+ err = transport->notify_recv_pre_block(vsk, target, recv_data);
+ if (err < 0)
+ goto out;
}
- /* It is valid on Linux to pass in a zero-length receive buffer. This
- * is not an error. We may as well bail out now.
- */
- if (!len) {
- err = 0;
+ release_sock(sk);
+ timeout = schedule_timeout(timeout);
+ lock_sock(sk);
+
+ if (signal_pending(current)) {
+ err = sock_intr_errno(timeout);
+ goto out;
+ } else if (timeout == 0) {
+ err = -EAGAIN;
goto out;
}
+out:
+ finish_wait(sk_sleep(sk), wait);
+ return err;
+}
+
+static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
+{
+ struct vsock_transport_recv_notify_data recv_data;
+ const struct vsock_transport *transport;
+ struct vsock_sock *vsk;
+ ssize_t copied;
+ size_t target;
+ long timeout;
+ int err;
+
+ DEFINE_WAIT(wait);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
/* We must not copy less than target bytes into the user's buffer
* before returning successfully, so we wait for the consume queue to
* have that much data to consume before dequeueing. Note that this
@@ -1937,85 +1941,53 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
while (1) {
+ ssize_t read;
s64 ready;
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
ready = vsock_stream_has_data(vsk);
if (ready == 0) {
- if (sk->sk_err != 0 ||
- (sk->sk_shutdown & RCV_SHUTDOWN) ||
- (vsk->peer_shutdown & SEND_SHUTDOWN)) {
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
- /* Don't wait for non-blocking sockets. */
- if (timeout == 0) {
- err = -EAGAIN;
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
-
- err = transport->notify_recv_pre_block(
- vsk, target, &recv_data);
- if (err < 0) {
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
- release_sock(sk);
- timeout = schedule_timeout(timeout);
- lock_sock(sk);
-
- if (signal_pending(current)) {
- err = sock_intr_errno(timeout);
- finish_wait(sk_sleep(sk), &wait);
- break;
- } else if (timeout == 0) {
- err = -EAGAIN;
- finish_wait(sk_sleep(sk), &wait);
+ if (vsock_wait_data(sk, &wait, timeout, &recv_data, target))
break;
- }
- } else {
- ssize_t read;
+ continue;
+ }
- finish_wait(sk_sleep(sk), &wait);
+ finish_wait(sk_sleep(sk), &wait);
- if (ready < 0) {
- /* Invalid queue pair content. XXX This should
- * be changed to a connection reset in a later
- * change.
- */
+ if (ready < 0) {
+ /* Invalid queue pair content. XXX This should
+ * be changed to a connection reset in a later
+ * change.
+ */
- err = -ENOMEM;
- goto out;
- }
+ err = -ENOMEM;
+ goto out;
+ }
- err = transport->notify_recv_pre_dequeue(
- vsk, target, &recv_data);
- if (err < 0)
- break;
+ err = transport->notify_recv_pre_dequeue(vsk,
+ target, &recv_data);
+ if (err < 0)
+ break;
+ read = transport->stream_dequeue(vsk, msg, len - copied, flags);
- read = transport->stream_dequeue(
- vsk, msg,
- len - copied, flags);
- if (read < 0) {
- err = -ENOMEM;
- break;
- }
+ if (read < 0) {
+ err = -ENOMEM;
+ break;
+ }
- copied += read;
+ copied += read;
- err = transport->notify_recv_post_dequeue(
- vsk, target, read,
+ err = transport->notify_recv_post_dequeue(vsk,
+ target, read,
!(flags & MSG_PEEK), &recv_data);
- if (err < 0)
- goto out;
+ if (err < 0)
+ goto out;
- if (read >= target || flags & MSG_PEEK)
- break;
+ if (read >= target || flags & MSG_PEEK)
+ break;
- target -= read;
- }
+ target -= read;
}
if (sk->sk_err)
@@ -2031,6 +2003,68 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
return err;
}
+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
+{
+ return -1;
+}
+
+static int
+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+ int flags)
+{
+ const struct vsock_transport *transport;
+ struct vsock_sock *vsk;
+ struct sock *sk;
+ int err = 0;
+
+ sk = sock->sk;
+
+ lock_sock(sk);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
+ if (!transport || sk->sk_state != TCP_ESTABLISHED) {
+ /* Recvmsg is supposed to return 0 if a peer performs an
+ * orderly shutdown. Differentiate between that case and when a
+ * peer has not connected or a local shutdown occurred with the
+ * SOCK_DONE flag.
+ */
+ if (!sock_flag(sk, SOCK_DONE))
+ err = -ENOTCONN;
+
+ goto out;
+ }
+
+ if (flags & MSG_OOB) {
+ err = -EOPNOTSUPP;
+ goto out;
+ }
+
+ /* We don't check peer_shutdown flag here since peer may actually shut
+ * down, but there can be data in the queue that a local socket can
+ * receive.
+ */
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ goto out;
+
+ /* It is valid on Linux to pass in a zero-length receive buffer. This
+ * is not an error. We may as well bail out now.
+ */
+ if (!len)
+ goto out;
+
+ if (sk->sk_type == SOCK_STREAM)
+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
+ else
+ err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
+
+out:
+ release_sock(sk);
+ return err;
+}
+
static int
vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
int flags)
--
2.25.1
This also removes ignore of non-stream type of packets.
Signed-off-by: Arseny Krasnov <[email protected]>
---
drivers/vhost/vsock.c | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index 5e78fb719602..4d60a99aed14 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -354,8 +354,7 @@ vhost_vsock_alloc_pkt(struct vhost_virtqueue *vq,
return NULL;
}
- if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM)
- pkt->len = le32_to_cpu(pkt->hdr.len);
+ pkt->len = le32_to_cpu(pkt->hdr.len);
/* No payload */
if (!pkt->len)
@@ -424,6 +423,10 @@ static struct virtio_transport vhost_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,
+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
--
2.25.1
This adds SEQPACKET ops for virtio transport
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/virtio_transport.c | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
index 2700a63ab095..5a7ab1befee8 100644
--- a/net/vmw_vsock/virtio_transport.c
+++ b/net/vmw_vsock/virtio_transport.c
@@ -469,6 +469,10 @@ static struct virtio_transport virtio_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,
+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
+ .seqpacket_dequeue = virtio_transport_seqpacket_dequeue,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
--
2.25.1
This adds rest of logic for SEQPACKET:
1) Shared functions for packet sending now set valid type of packet
according socket type.
2) SEQPACKET specific function like SEQ_BEGIN send and data dequeue.
3) TAP support for SEQPACKET is not so easy if it is necessary to
send whole record to TAP interface. This could be done by allocating
new packet when whole record is received, data of record must be
copied to TAP packet.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 7 ++++
net/vmw_vsock/virtio_transport_common.c | 55 +++++++++++++++++++++----
2 files changed, 55 insertions(+), 7 deletions(-)
diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index af8705ea8b95..ad9783df97c9 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -84,7 +84,14 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);
+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len);
size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ size_t len,
+ int type);
+
s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 90f9feef9d8f..fab14679ca7b 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -139,6 +139,7 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
break;
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN:
hdr->op = cpu_to_le16(AF_VSOCK_OP_CONTROL);
break;
default:
@@ -157,6 +158,10 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
{
+ /* TODO: implement tap support for SOCK_SEQPACKET. */
+ if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET)
+ return;
+
if (pkt->tap_delivered)
return;
@@ -405,6 +410,19 @@ static u16 virtio_transport_get_type(struct sock *sk)
return VIRTIO_VSOCK_TYPE_SEQPACKET;
}
+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len)
+{
+ struct virtio_vsock_pkt_info info = {
+ .type = VIRTIO_VSOCK_TYPE_SEQPACKET,
+ .op = VIRTIO_VSOCK_OP_SEQ_BEGIN,
+ .vsk = vsk,
+ .flags = len
+ };
+
+ return virtio_transport_send_pkt_info(vsk, &info);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_send_len);
+
static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
{
list_del(&pkt->list);
@@ -576,6 +594,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
}
EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ size_t len, int flags)
+{
+ if (flags & MSG_PEEK)
+ return -EOPNOTSUPP;
+
+ return virtio_transport_seqpacket_do_dequeue(vsk, msg, len);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
+
int
virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
@@ -659,14 +689,15 @@ EXPORT_SYMBOL_GPL(virtio_transport_do_socket_init);
void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)
{
struct virtio_vsock_sock *vvs = vsk->trans;
+ int type;
if (*val > VIRTIO_VSOCK_MAX_BUF_SIZE)
*val = VIRTIO_VSOCK_MAX_BUF_SIZE;
vvs->buf_alloc = *val;
- virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_STREAM,
- NULL);
+ type = virtio_transport_get_type(sk_vsock(vsk));
+ virtio_transport_send_credit_update(vsk, type, NULL);
}
EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);
@@ -793,10 +824,11 @@ int virtio_transport_connect(struct vsock_sock *vsk)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_REQUEST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.vsk = vsk,
};
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
EXPORT_SYMBOL_GPL(virtio_transport_connect);
@@ -805,7 +837,6 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_SHUTDOWN,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.flags = (mode & RCV_SHUTDOWN ?
VIRTIO_VSOCK_SHUTDOWN_RCV : 0) |
(mode & SEND_SHUTDOWN ?
@@ -813,6 +844,8 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
.vsk = vsk,
};
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
EXPORT_SYMBOL_GPL(virtio_transport_shutdown);
@@ -834,12 +867,18 @@ virtio_transport_stream_enqueue(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RW,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.msg = msg,
.pkt_len = len,
.vsk = vsk,
+ .flags = 0,
};
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
+ if (info.type == VIRTIO_VSOCK_TYPE_SEQPACKET &&
+ msg->msg_flags & MSG_EOR)
+ info.flags |= VIRTIO_VSOCK_RW_EOR;
+
return virtio_transport_send_pkt_info(vsk, &info);
}
EXPORT_SYMBOL_GPL(virtio_transport_stream_enqueue);
@@ -857,7 +896,6 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.reply = !!pkt,
.vsk = vsk,
};
@@ -866,6 +904,8 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
if (pkt && le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RST)
return 0;
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
@@ -1177,13 +1217,14 @@ virtio_transport_send_response(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RESPONSE,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.remote_cid = le64_to_cpu(pkt->hdr.src_cid),
.remote_port = le32_to_cpu(pkt->hdr.src_port),
.reply = true,
.vsk = vsk,
};
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
--
2.25.1
This adds receive loop for SEQPACKET. It looks like receive loop for
SEQPACKET, but there is a little bit difference:
1) It doesn't call notify callbacks.
2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
there is no sense for these values in SEQPACKET case.
3) It waits until whole record is received or error is found during
receiving.
4) It processes and sets 'MSG_TRUNC' flag.
So to avoid extra conditions for two types of socket inside one loop, two
independent functions were created.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/net/af_vsock.h | 5 ++
net/vmw_vsock/af_vsock.c | 102 ++++++++++++++++++++++++++++++++++++++-
2 files changed, 106 insertions(+), 1 deletion(-)
diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index b1c717286993..46073842d489 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -135,6 +135,11 @@ struct vsock_transport {
bool (*stream_is_active)(struct vsock_sock *);
bool (*stream_allow)(u32 cid, u32 port);
+ /* SEQ_PACKET. */
+ size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
+ ssize_t (*seqpacket_dequeue)(struct vsock_sock *, struct msghdr *,
+ size_t len, int flags);
+
/* Notification. */
int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 524df8fc84cd..3b266880b7c8 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -2006,7 +2006,107 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
size_t len, int flags)
{
- return -1;
+ const struct vsock_transport *transport;
+ const struct iovec *orig_iov;
+ unsigned long orig_nr_segs;
+ ssize_t dequeued_total = 0;
+ struct vsock_sock *vsk;
+ size_t record_len;
+ long timeout;
+ int err = 0;
+ DEFINE_WAIT(wait);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ msg->msg_flags &= ~MSG_EOR;
+ orig_nr_segs = msg->msg_iter.nr_segs;
+ orig_iov = msg->msg_iter.iov;
+
+ while (1) {
+ ssize_t dequeued;
+ s64 ready;
+
+ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
+ ready = vsock_stream_has_data(vsk);
+
+ if (ready == 0) {
+ if (vsock_wait_data(sk, &wait, timeout, NULL, 0)) {
+ /* In case of any loop break(timeout, signal
+ * interrupt or shutdown), we report user that
+ * nothing was copied.
+ */
+ dequeued_total = 0;
+ break;
+ }
+ continue;
+ }
+
+ finish_wait(sk_sleep(sk), &wait);
+
+ if (ready < 0) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ if (dequeued_total == 0) {
+ record_len =
+ transport->seqpacket_seq_get_len(vsk);
+
+ if (record_len == 0)
+ continue;
+ }
+
+ /* 'msg_iter.count' is number of unused bytes in iov.
+ * On every copy to iov iterator it is decremented at
+ * size of data.
+ */
+ dequeued = transport->seqpacket_dequeue(vsk, msg,
+ msg->msg_iter.count, flags);
+
+ if (dequeued < 0) {
+ dequeued_total = 0;
+
+ if (dequeued == -EAGAIN) {
+ iov_iter_init(&msg->msg_iter, READ,
+ orig_iov, orig_nr_segs,
+ len);
+ msg->msg_flags &= ~MSG_EOR;
+ continue;
+ }
+
+ err = -ENOMEM;
+ break;
+ }
+
+ dequeued_total += dequeued;
+
+ if (dequeued_total >= record_len)
+ break;
+ }
+ if (sk->sk_err)
+ err = -sk->sk_err;
+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
+ err = 0;
+
+ if (dequeued_total > 0) {
+ /* User sets MSG_TRUNC, so return real length of
+ * packet.
+ */
+ if (flags & MSG_TRUNC)
+ err = record_len;
+ else
+ err = len - msg->msg_iter.count;
+
+ /* Always set MSG_TRUNC if real length of packet is
+ * bigger that user buffer.
+ */
+ if (record_len > len)
+ msg->msg_flags |= MSG_TRUNC;
+ }
+out:
+ return err;
}
static int
--
2.25.1
This adds transport callback and it's logic for SEQPACKET dequeue.
Callback fetches RW packets from rx queue of socket until whole record
is copied(if user's buffer is full, user is not woken up). This is done
to not stall sender, because if we wake up user and it leaves syscall,
nobody will send credit update for rest of record, and sender will wait
for next enter of read syscall at receiver's side. So if user buffer is
full, we just send credit update and drop data. If during copy SEQ_BEGIN
was found(and not all data was copied), copying is restarted by reset
user's iov iterator(previous unfinished data is dropped).
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 4 +
include/uapi/linux/virtio_vsock.h | 9 ++
net/vmw_vsock/virtio_transport_common.c | 128 ++++++++++++++++++++++++
3 files changed, 141 insertions(+)
diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index dc636b727179..7f0ef5204e33 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -36,6 +36,10 @@ struct virtio_vsock_sock {
u32 rx_bytes;
u32 buf_alloc;
struct list_head rx_queue;
+
+ /* For SOCK_SEQPACKET */
+ u32 user_read_seq_len;
+ u32 user_read_copied;
};
struct virtio_vsock_pkt {
diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
index 1d57ed3d84d2..058908bc19fc 100644
--- a/include/uapi/linux/virtio_vsock.h
+++ b/include/uapi/linux/virtio_vsock.h
@@ -65,6 +65,7 @@ struct virtio_vsock_hdr {
enum virtio_vsock_type {
VIRTIO_VSOCK_TYPE_STREAM = 1,
+ VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
};
enum virtio_vsock_op {
@@ -83,6 +84,9 @@ enum virtio_vsock_op {
VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
/* Request the peer to send the credit info to us */
VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
+
+ /* Record begin for SOCK_SEQPACKET */
+ VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
};
/* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
@@ -91,4 +95,9 @@ enum virtio_vsock_shutdown {
VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
};
+/* VIRTIO_VSOCK_OP_RW flags values for SOCK_SEQPACKET type */
+enum virtio_vsock_rw_seqpacket {
+ VIRTIO_VSOCK_RW_EOR = 1,
+};
+
#endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 5956939eebb7..e66ec39445ff 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -397,6 +397,132 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
return err;
}
+static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
+{
+ list_del(&pkt->list);
+ virtio_transport_free_pkt(pkt);
+}
+
+static size_t virtio_transport_drop_until_seq_begin(struct virtio_vsock_sock *vvs)
+{
+ struct virtio_vsock_pkt *pkt, *n;
+ size_t bytes_dropped = 0;
+
+ list_for_each_entry_safe(pkt, n, &vvs->rx_queue, list) {
+ if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_SEQ_BEGIN)
+ break;
+
+ bytes_dropped += le32_to_cpu(pkt->hdr.len);
+ virtio_transport_dec_rx_pkt(vvs, pkt);
+ virtio_transport_del_n_free_pkt(pkt);
+ }
+
+ return bytes_dropped;
+}
+
+static ssize_t virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ size_t user_buf_len)
+{
+ struct virtio_vsock_sock *vvs = vsk->trans;
+ struct virtio_vsock_pkt *pkt;
+ size_t bytes_handled = 0;
+ int err = 0;
+
+ spin_lock_bh(&vvs->rx_lock);
+
+ if (user_buf_len == 0) {
+ /* User's buffer is full, we processing rest of
+ * record and drop it. If 'SEQ_BEGIN' is found
+ * while iterating, user will be woken up,
+ * because record is already copied, and we
+ * don't care about absent of some tail RW packets
+ * of it. Return number of bytes(rest of record),
+ * but ignore credit update for such absent bytes.
+ */
+ bytes_handled = virtio_transport_drop_until_seq_begin(vvs);
+ vvs->user_read_copied += bytes_handled;
+
+ if (!list_empty(&vvs->rx_queue) &&
+ vvs->user_read_copied < vvs->user_read_seq_len) {
+ /* 'SEQ_BEGIN' found, but record isn't complete.
+ * Set number of copied bytes to fit record size
+ * and force counters to finish receiving.
+ */
+ bytes_handled += (vvs->user_read_seq_len - vvs->user_read_copied);
+ vvs->user_read_copied = vvs->user_read_seq_len;
+ }
+ }
+
+ /* Now start copying. */
+ while (vvs->user_read_copied < vvs->user_read_seq_len &&
+ vvs->rx_bytes &&
+ user_buf_len &&
+ !err) {
+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+ switch (le16_to_cpu(pkt->hdr.op)) {
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
+ /* Unexpected 'SEQ_BEGIN' during record copy:
+ * Leave receive loop, 'EAGAIN' will restart it from
+ * outer receive loop, packet is still in queue and
+ * counters are cleared. So in next loop enter,
+ * 'SEQ_BEGIN' will be dequeued first. User's iov
+ * iterator will be reset in outer loop. Also
+ * send credit update, because some bytes could be
+ * copied. User will never see unfinished record.
+ */
+ err = -EAGAIN;
+ break;
+ }
+ case VIRTIO_VSOCK_OP_RW: {
+ size_t bytes_to_copy;
+ size_t pkt_len;
+
+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
+ bytes_to_copy = min(user_buf_len, pkt_len);
+
+ /* sk_lock is held by caller so no one else can dequeue.
+ * Unlock rx_lock since memcpy_to_msg() may sleep.
+ */
+ spin_unlock_bh(&vvs->rx_lock);
+
+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
+ spin_lock_bh(&vvs->rx_lock);
+ err = -EINVAL;
+ break;
+ }
+
+ spin_lock_bh(&vvs->rx_lock);
+ user_buf_len -= bytes_to_copy;
+ bytes_handled += pkt->len;
+ vvs->user_read_copied += bytes_to_copy;
+
+ if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_RW_EOR)
+ msg->msg_flags |= MSG_EOR;
+ break;
+ }
+ default:
+ ;
+ }
+
+ /* For unexpected 'SEQ_BEGIN', keep such packet in queue,
+ * but drop any other type of packet.
+ */
+ if (le16_to_cpu(pkt->hdr.op) != VIRTIO_VSOCK_OP_SEQ_BEGIN) {
+ virtio_transport_dec_rx_pkt(vvs, pkt);
+ virtio_transport_del_n_free_pkt(pkt);
+ }
+ }
+
+ spin_unlock_bh(&vvs->rx_lock);
+
+ virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_SEQPACKET,
+ NULL);
+
+ return err ?: bytes_handled;
+}
+
ssize_t
virtio_transport_stream_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
@@ -481,6 +607,8 @@ int virtio_transport_do_socket_init(struct vsock_sock *vsk,
spin_lock_init(&vvs->rx_lock);
spin_lock_init(&vvs->tx_lock);
INIT_LIST_HEAD(&vvs->rx_queue);
+ vvs->user_read_copied = 0;
+ vvs->user_read_seq_len = 0;
return 0;
}
--
2.25.1
This modifies current receive logic for SEQPACKET support:
1) Inserts 'SEQ_BEGIN' packet to socket's rx queue.
2) Inserts 'RW' packet to socket's rx queue, but without merging with
buffer of last packet in queue.
3) Performs check for packet and socket types on receive(if mismatch,
then reset connection).
Signed-off-by: Arseny Krasnov <[email protected]>
---
net/vmw_vsock/virtio_transport_common.c | 79 ++++++++++++++++++-------
1 file changed, 58 insertions(+), 21 deletions(-)
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index dcce35d7b462..90f9feef9d8f 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -397,6 +397,14 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
return err;
}
+static u16 virtio_transport_get_type(struct sock *sk)
+{
+ if (sk->sk_type == SOCK_STREAM)
+ return VIRTIO_VSOCK_TYPE_STREAM;
+ else
+ return VIRTIO_VSOCK_TYPE_SEQPACKET;
+}
+
static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
{
list_del(&pkt->list);
@@ -1050,39 +1058,49 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
struct virtio_vsock_pkt *pkt)
{
struct virtio_vsock_sock *vvs = vsk->trans;
- bool can_enqueue, free_pkt = false;
+ bool free_pkt = false;
pkt->len = le32_to_cpu(pkt->hdr.len);
pkt->off = 0;
spin_lock_bh(&vvs->rx_lock);
- can_enqueue = virtio_transport_inc_rx_pkt(vvs, pkt);
- if (!can_enqueue) {
+ if (!virtio_transport_inc_rx_pkt(vvs, pkt)) {
free_pkt = true;
goto out;
}
- /* Try to copy small packets into the buffer of last packet queued,
- * to avoid wasting memory queueing the entire buffer with a small
- * payload.
- */
- if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
- struct virtio_vsock_pkt *last_pkt;
+ switch (le16_to_cpu(pkt->hdr.type)) {
+ case VIRTIO_VSOCK_TYPE_STREAM: {
+ /* Try to copy small packets into the buffer of last packet queued,
+ * to avoid wasting memory queueing the entire buffer with a small
+ * payload.
+ */
+ if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
+ struct virtio_vsock_pkt *last_pkt;
- last_pkt = list_last_entry(&vvs->rx_queue,
- struct virtio_vsock_pkt, list);
+ last_pkt = list_last_entry(&vvs->rx_queue,
+ struct virtio_vsock_pkt, list);
- /* If there is space in the last packet queued, we copy the
- * new packet in its buffer.
- */
- if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
- memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
- pkt->len);
- last_pkt->len += pkt->len;
- free_pkt = true;
- goto out;
+ /* If there is space in the last packet queued, we copy the
+ * new packet in its buffer.
+ */
+ if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
+ memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
+ pkt->len);
+ last_pkt->len += pkt->len;
+ free_pkt = true;
+ goto out;
+ }
}
+
+ break;
+ }
+ case VIRTIO_VSOCK_TYPE_SEQPACKET: {
+ break;
+ }
+ default:
+ goto out;
}
list_add_tail(&pkt->list, &vvs->rx_queue);
@@ -1101,6 +1119,14 @@ virtio_transport_recv_connected(struct sock *sk,
int err = 0;
switch (le16_to_cpu(pkt->hdr.op)) {
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
+ struct virtio_vsock_sock *vvs = vsk->trans;
+
+ spin_lock_bh(&vvs->rx_lock);
+ list_add_tail(&pkt->list, &vvs->rx_queue);
+ spin_unlock_bh(&vvs->rx_lock);
+ return err;
+ }
case VIRTIO_VSOCK_OP_RW:
virtio_transport_recv_enqueue(vsk, pkt);
sk->sk_data_ready(sk);
@@ -1247,6 +1273,12 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
return 0;
}
+static bool virtio_transport_valid_type(u16 type)
+{
+ return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
+ (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
+}
+
/* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
* lock.
*/
@@ -1272,7 +1304,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
le32_to_cpu(pkt->hdr.buf_alloc),
le32_to_cpu(pkt->hdr.fwd_cnt));
- if (le16_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_STREAM) {
+ if (!virtio_transport_valid_type(le16_to_cpu(pkt->hdr.type))) {
(void)virtio_transport_reset_no_sock(t, pkt);
goto free_pkt;
}
@@ -1289,6 +1321,11 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
}
}
+ if (virtio_transport_get_type(sk) != le16_to_cpu(pkt->hdr.type)) {
+ (void)virtio_transport_reset_no_sock(t, pkt);
+ goto free_pkt;
+ }
+
vsk = vsock_sk(sk);
space_available = virtio_transport_space_update(sk, pkt);
--
2.25.1
25.01.2021 14:12, Arseny Krasnov пишет:
> This adds receive loop for SEQPACKET. It looks like receive loop for
> SEQPACKET,
   ^^^
You meant "STREAM"?
This adds two tests of SOCK_SEQPACKET socket: both transfer data and then
test MSG_EOR and MSG_TRUNC flags. Cases for connect(), bind(), etc. are
not tested, because it is same as for stream socket.
Signed-off-by: Arseny Krasnov <[email protected]>
---
tools/testing/vsock/util.c | 32 ++++++--
tools/testing/vsock/util.h | 3 +
tools/testing/vsock/vsock_test.c | 126 +++++++++++++++++++++++++++++++
3 files changed, 156 insertions(+), 5 deletions(-)
diff --git a/tools/testing/vsock/util.c b/tools/testing/vsock/util.c
index 93cbd6f603f9..2acbb7703c6a 100644
--- a/tools/testing/vsock/util.c
+++ b/tools/testing/vsock/util.c
@@ -84,7 +84,7 @@ void vsock_wait_remote_close(int fd)
}
/* Connect to <cid, port> and return the file descriptor. */
-int vsock_stream_connect(unsigned int cid, unsigned int port)
+static int vsock_connect(unsigned int cid, unsigned int port, int type)
{
union {
struct sockaddr sa;
@@ -101,7 +101,7 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)
control_expectln("LISTENING");
- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
+ fd = socket(AF_VSOCK, type, 0);
timeout_begin(TIMEOUT);
do {
@@ -120,11 +120,21 @@ int vsock_stream_connect(unsigned int cid, unsigned int port)
return fd;
}
+int vsock_stream_connect(unsigned int cid, unsigned int port)
+{
+ return vsock_connect(cid, port, SOCK_STREAM);
+}
+
+int vsock_seqpacket_connect(unsigned int cid, unsigned int port)
+{
+ return vsock_connect(cid, port, SOCK_SEQPACKET);
+}
+
/* Listen on <cid, port> and return the first incoming connection. The remote
* address is stored to clientaddrp. clientaddrp may be NULL.
*/
-int vsock_stream_accept(unsigned int cid, unsigned int port,
- struct sockaddr_vm *clientaddrp)
+static int vsock_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp, int type)
{
union {
struct sockaddr sa;
@@ -145,7 +155,7 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
int client_fd;
int old_errno;
- fd = socket(AF_VSOCK, SOCK_STREAM, 0);
+ fd = socket(AF_VSOCK, type, 0);
if (bind(fd, &addr.sa, sizeof(addr.svm)) < 0) {
perror("bind");
@@ -189,6 +199,18 @@ int vsock_stream_accept(unsigned int cid, unsigned int port,
return client_fd;
}
+int vsock_stream_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp)
+{
+ return vsock_accept(cid, port, clientaddrp, SOCK_STREAM);
+}
+
+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp)
+{
+ return vsock_accept(cid, port, clientaddrp, SOCK_SEQPACKET);
+}
+
/* Transmit one byte and check the return value.
*
* expected_ret:
diff --git a/tools/testing/vsock/util.h b/tools/testing/vsock/util.h
index e53dd09d26d9..a3375ad2fb7f 100644
--- a/tools/testing/vsock/util.h
+++ b/tools/testing/vsock/util.h
@@ -36,8 +36,11 @@ struct test_case {
void init_signals(void);
unsigned int parse_cid(const char *str);
int vsock_stream_connect(unsigned int cid, unsigned int port);
+int vsock_seqpacket_connect(unsigned int cid, unsigned int port);
int vsock_stream_accept(unsigned int cid, unsigned int port,
struct sockaddr_vm *clientaddrp);
+int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
+ struct sockaddr_vm *clientaddrp);
void vsock_wait_remote_close(int fd);
void send_byte(int fd, int expected_ret, int flags);
void recv_byte(int fd, int expected_ret, int flags);
diff --git a/tools/testing/vsock/vsock_test.c b/tools/testing/vsock/vsock_test.c
index 5a4fb80fa832..db6cc49fa5e4 100644
--- a/tools/testing/vsock/vsock_test.c
+++ b/tools/testing/vsock/vsock_test.c
@@ -14,6 +14,8 @@
#include <errno.h>
#include <unistd.h>
#include <linux/kernel.h>
+#include <sys/types.h>
+#include <sys/socket.h>
#include "timeout.h"
#include "control.h"
@@ -279,6 +281,120 @@ static void test_stream_msg_peek_server(const struct test_opts *opts)
close(fd);
}
+#define MESSAGES_CNT 7
+#define MESSAGE_EOR_IDX (MESSAGES_CNT / 2)
+static void test_seqpacket_msg_send_client(const struct test_opts *opts)
+{
+ int fd;
+
+ fd = vsock_seqpacket_connect(opts->peer_cid, 1234);
+ if (fd < 0) {
+ perror("connect");
+ exit(EXIT_FAILURE);
+ }
+
+ /* Send several messages, one with MSG_EOR flag */
+ for (int i = 0; i < MESSAGES_CNT; i++)
+ send_byte(fd, 1, (i != MESSAGE_EOR_IDX) ? 0 : MSG_EOR);
+
+ control_writeln("SENDDONE");
+ close(fd);
+}
+
+static void test_seqpacket_msg_send_server(const struct test_opts *opts)
+{
+ int fd;
+ char buf[16];
+ struct msghdr msg = {0};
+ struct iovec iov = {0};
+
+ fd = vsock_seqpacket_accept(VMADDR_CID_ANY, 1234, NULL);
+ if (fd < 0) {
+ perror("accept");
+ exit(EXIT_FAILURE);
+ }
+
+ control_expectln("SENDDONE");
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ for (int i = 0; i < MESSAGES_CNT; i++) {
+ if (recvmsg(fd, &msg, 0) != 1) {
+ perror("message bound violated");
+ exit(EXIT_FAILURE);
+ }
+
+ if (i == MESSAGE_EOR_IDX) {
+ if (!(msg.msg_flags & MSG_EOR)) {
+ fprintf(stderr, "MSG_EOR flag expected\n");
+ exit(EXIT_FAILURE);
+ }
+ } else {
+ if (msg.msg_flags & MSG_EOR) {
+ fprintf(stderr, "unexpected MSG_EOR flag\n");
+ exit(EXIT_FAILURE);
+ }
+ }
+ }
+
+ close(fd);
+}
+
+#define MESSAGE_TRUNC_SZ 32
+static void test_seqpacket_msg_trunc_client(const struct test_opts *opts)
+{
+ int fd;
+ char buf[MESSAGE_TRUNC_SZ];
+
+ fd = vsock_seqpacket_connect(opts->peer_cid, 1234);
+ if (fd < 0) {
+ perror("connect");
+ exit(EXIT_FAILURE);
+ }
+
+ if (send(fd, buf, sizeof(buf), 0) != sizeof(buf)) {
+ perror("send failed");
+ exit(EXIT_FAILURE);
+ }
+
+ control_writeln("SENDDONE");
+ close(fd);
+}
+
+static void test_seqpacket_msg_trunc_server(const struct test_opts *opts)
+{
+ int fd;
+ char buf[MESSAGE_TRUNC_SZ / 2];
+ struct msghdr msg = {0};
+ struct iovec iov = {0};
+
+ fd = vsock_seqpacket_accept(VMADDR_CID_ANY, 1234, NULL);
+ if (fd < 0) {
+ perror("accept");
+ exit(EXIT_FAILURE);
+ }
+
+ control_expectln("SENDDONE");
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ if (recvmsg(fd, &msg, MSG_TRUNC) != MESSAGE_TRUNC_SZ) {
+ perror("MSG_TRUNC doesn't work");
+ exit(EXIT_FAILURE);
+ }
+
+ if (!(msg.msg_flags & MSG_TRUNC)) {
+ fprintf(stderr, "MSG_TRUNC expected\n");
+ exit(EXIT_FAILURE);
+ }
+
+ close(fd);
+}
+
static struct test_case test_cases[] = {
{
.name = "SOCK_STREAM connection reset",
@@ -309,6 +425,16 @@ static struct test_case test_cases[] = {
.run_client = test_stream_msg_peek_client,
.run_server = test_stream_msg_peek_server,
},
+ {
+ .name = "SOCK_SEQPACKET send data MSG_EOR",
+ .run_client = test_seqpacket_msg_send_client,
+ .run_server = test_seqpacket_msg_send_server,
+ },
+ {
+ .name = "SOCK_SEQPACKET send data MSG_TRUNC",
+ .run_client = test_seqpacket_msg_trunc_client,
+ .run_server = test_seqpacket_msg_trunc_server,
+ },
{},
};
--
2.25.1
Hi Arseny,
thanks for this new series!
I'm a bit busy but I hope to review it tomorrow or on Thursday.
Stefano
On Mon, Jan 25, 2021 at 02:09:00PM +0300, Arseny Krasnov wrote:
> This patchset impelements support of SOCK_SEQPACKET for virtio
>transport.
> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>do it, new packet operation was added: it marks start of record (with
>record length in header), such packet doesn't carry any data. To send
>record, packet with start marker is sent first, then all data is sent
>as usual 'RW' packets. On receiver's side, length of record is known
>from packet with start record marker. Now as packets of one socket
>are not reordered neither on vsock nor on vhost transport layers, such
>marker allows to restore original record on receiver's side. If user's
>buffer is smaller that record length, when all out of size data is
>dropped.
> Maximum length of datagram is not limited as in stream socket,
>because same credit logic is used. Difference with stream socket is
>that user is not woken up until whole record is received or error
>occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
> Tests also implemented.
>
> Arseny Krasnov (13):
> af_vsock: prepare for SOCK_SEQPACKET support
> af_vsock: prepare 'vsock_connectible_recvmsg()'
> af_vsock: implement SEQPACKET rx loop
> af_vsock: implement send logic for SOCK_SEQPACKET
> af_vsock: rest of SEQPACKET support
> af_vsock: update comments for stream sockets
> virtio/vsock: dequeue callback for SOCK_SEQPACKET
> virtio/vsock: fetch length for SEQPACKET record
> virtio/vsock: add SEQPACKET receive logic
> virtio/vsock: rest of SOCK_SEQPACKET support
> virtio/vsock: setup SEQPACKET ops for transport
> vhost/vsock: setup SEQPACKET ops for transport
> vsock_test: add SOCK_SEQPACKET tests
>
> drivers/vhost/vsock.c | 7 +-
> include/linux/virtio_vsock.h | 12 +
> include/net/af_vsock.h | 6 +
> include/uapi/linux/virtio_vsock.h | 9 +
> net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
> net/vmw_vsock/virtio_transport.c | 4 +
> net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
> tools/testing/vsock/util.c | 32 +-
> tools/testing/vsock/util.h | 3 +
> tools/testing/vsock/vsock_test.c | 126 +++++
> 10 files changed, 862 insertions(+), 175 deletions(-)
>
> TODO:
> - Support for record integrity control. As transport could drop some
> packets, something like "record-id" and record end marker need to
> be implemented. Idea is that SEQ_BEGIN packet carries both record
> length and record id, end marker(let it be SEQ_END) carries only
> record id. To be sure that no one packet was lost, receiver checks
> length of data between SEQ_BEGIN and SEQ_END(it must be same with
> value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
> means that both markers were not dropped. I think that easiest way
> to implement record id for SEQ_BEGIN is to reuse another field of
> packet header(SEQ_BEGIN already uses 'flags' as record length).For
> SEQ_END record id could be stored in 'flags'.
> Another way to implement it, is to move metadata of both SEQ_END
> and SEQ_BEGIN to payload. But this approach has problem, because
> if we move something to payload, such payload is accounted by
> credit logic, which fragments payload, while payload with record
> length and id couldn't be fragmented. One way to overcome it is to
> ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
> is to update 'stream_has_space()' function: current implementation
> return non-zero when at least 1 byte is allowed to use,but updated
> version will have extra argument, which is needed length. For 'RW'
> packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
> record id) and for SEQ_END it is sizeof(record id).
>
> - What to do, when server doesn't support SOCK_SEQPACKET. In current
> implementation RST is replied in the same way when listening port
> is not found. I think that current RST is enough,because case when
> server doesn't support SEQ_PACKET is same when listener missed(e.g.
> no listener in both cases).
>
> v2 -> v3:
> - patches reorganized: split for prepare and implementation patches
> - local variables are declared in "Reverse Christmas tree" manner
> - virtio_transport_common.c: valid leXX_to_cpu() for vsock header
> fields access
> - af_vsock.c: 'vsock_connectible_*sockopt()' added as shared code
> between stream and seqpacket sockets.
> - af_vsock.c: loops in '__vsock_*_recvmsg()' refactored.
> - af_vsock.c: 'vsock_wait_data()' refactored.
>
> v1 -> v2:
> - patches reordered: af_vsock.c related changes now before virtio vsock
> - patches reorganized: more small patches, where +/- are not mixed
> - tests for SOCK_SEQPACKET added
> - all commit messages updated
> - af_vsock.c: 'vsock_pre_recv_check()' inlined to
> 'vsock_connectible_recvmsg()'
> - af_vsock.c: 'vsock_assign_transport()' returns ENODEV if transport
> was not found
> - virtio_transport_common.c: transport callback for seqpacket dequeue
> - virtio_transport_common.c: simplified
> 'virtio_transport_recv_connected()'
> - virtio_transport_common.c: send reset on socket and packet type
> mismatch.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>
>--
>2.25.1
>
I think the patch title should be more explicit, so something like
vsock: generalize function to manage connectible sockets
On Mon, Jan 25, 2021 at 02:11:28PM +0300, Arseny Krasnov wrote:
>This prepares af_vsock.c for SEQPACKET support:
>1) As both stream and seqpacket sockets are connection oriented, add
> check for SOCK_SEQPACKET to conditions where SOCK_STREAM is checked.
>2) Some functions such as setsockopt(), getsockopt(), connect(),
> recvmsg(), sendmsg() are shared between both types of sockets, so
> rename them in general manner and create entry points for each type
> of socket to call these functions(for stream in this patch, for
> seqpacket in further patches).
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/af_vsock.c | 91 +++++++++++++++++++++++++++++-----------
> 1 file changed, 67 insertions(+), 24 deletions(-)
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index b12d3a322242..c9ce57db9554 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -604,8 +604,8 @@ static void vsock_pending_work(struct work_struct *work)
>
> /**** SOCKET OPERATIONS ****/
>
>-static int __vsock_bind_stream(struct vsock_sock *vsk,
>- struct sockaddr_vm *addr)
>+static int __vsock_bind_connectible(struct vsock_sock *vsk,
>+ struct sockaddr_vm *addr)
> {
> static u32 port;
> struct sockaddr_vm new_addr;
>@@ -685,7 +685,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
> switch (sk->sk_socket->type) {
> case SOCK_STREAM:
> spin_lock_bh(&vsock_table_lock);
>- retval = __vsock_bind_stream(vsk, addr);
>+ retval = __vsock_bind_connectible(vsk, addr);
> spin_unlock_bh(&vsock_table_lock);
> break;
>
>@@ -767,6 +767,11 @@ static struct sock *__vsock_create(struct net *net,
> return sk;
> }
>
>+static bool sock_type_connectible(u16 type)
>+{
>+ return (type == SOCK_STREAM || type == SOCK_SEQPACKET);
>+}
>+
I think it's okay to add this function in this patch, but until
SOCK_SEQPACKET is not supported, I would check only SOCK_STREAM and add
SOCK_SEQPACKET only when you add 'vsock_seqpacket_ops' later.
> static void __vsock_release(struct sock *sk, int level)
> {
> if (sk) {
>@@ -785,7 +790,7 @@ static void __vsock_release(struct sock *sk, int level)
>
> if (vsk->transport)
> vsk->transport->release(vsk);
>- else if (sk->sk_type == SOCK_STREAM)
>+ else if (sock_type_connectible(sk->sk_type))
> vsock_remove_sock(vsk);
>
> sock_orphan(sk);
>@@ -945,7 +950,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
> sk = sock->sk;
> if (sock->state == SS_UNCONNECTED) {
> err = -ENOTCONN;
>- if (sk->sk_type == SOCK_STREAM)
>+ if (sock_type_connectible(sk->sk_type))
> return err;
> } else {
> sock->state = SS_DISCONNECTING;
>@@ -960,7 +965,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
> sk->sk_state_change(sk);
> release_sock(sk);
>
>- if (sk->sk_type == SOCK_STREAM) {
>+ if (sock_type_connectible(sk->sk_type)) {
> sock_reset_flag(sk, SOCK_DONE);
> vsock_send_shutdown(sk, mode);
> }
>@@ -1013,7 +1018,7 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
> if (!(sk->sk_shutdown & SEND_SHUTDOWN))
> mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;
>
>- } else if (sock->type == SOCK_STREAM) {
>+ } else if (sock_type_connectible(sk->sk_type)) {
> const struct vsock_transport *transport = vsk->transport;
> lock_sock(sk);
>
>@@ -1259,8 +1264,8 @@ static void vsock_connect_timeout(struct work_struct *work)
> sock_put(sk);
> }
>
>-static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
>- int addr_len, int flags)
>+static int vsock_connect(struct socket *sock, struct sockaddr *addr,
>+ int addr_len, int flags)
> {
> int err;
> struct sock *sk;
>@@ -1395,6 +1400,12 @@ static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
> return err;
> }
>
>+static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
>+ int addr_len, int flags)
>+{
>+ return vsock_connect(sock, addr, addr_len, flags);
>+}
>+
I think you can directly use vsock_connect in 'vsock_stream_ops'.
> static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
> bool kern)
> {
>@@ -1410,7 +1421,7 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
>
> lock_sock(listener);
>
>- if (sock->type != SOCK_STREAM) {
>+ if (!sock_type_connectible(sock->type)) {
> err = -EOPNOTSUPP;
> goto out;
> }
>@@ -1487,7 +1498,7 @@ static int vsock_listen(struct socket *sock, int
>backlog)
>
> lock_sock(sk);
>
>- if (sock->type != SOCK_STREAM) {
>+ if (!sock_type_connectible(sk->sk_type)) {
> err = -EOPNOTSUPP;
> goto out;
> }
>@@ -1531,11 +1542,11 @@ static void vsock_update_buffer_size(struct vsock_sock *vsk,
> vsk->buffer_size = val;
> }
>
>-static int vsock_stream_setsockopt(struct socket *sock,
>- int level,
>- int optname,
>- sockptr_t optval,
>- unsigned int optlen)
>+static int vsock_connectible_setsockopt(struct socket *sock,
>+ int level,
>+ int optname,
>+ sockptr_t optval,
>+ unsigned int optlen)
> {
> int err;
> struct sock *sk;
>@@ -1612,10 +1623,20 @@ static int vsock_stream_setsockopt(struct socket *sock,
> return err;
> }
>
>-static int vsock_stream_getsockopt(struct socket *sock,
>- int level, int optname,
>- char __user *optval,
>- int __user *optlen)
>+static int vsock_stream_setsockopt(struct socket *sock,
>+ int level,
>+ int optname,
>+ sockptr_t optval,
>+ unsigned int optlen)
>+{
>+ return vsock_connectible_setsockopt(sock, level, optname, optval,
>+ optlen);
>+}
As before, I think you can directly use vsock_connectible_setsockopt in
'vsock_stream_ops'.
>+
>+static int vsock_connectible_getsockopt(struct socket *sock,
>+ int level, int optname,
>+ char __user *optval,
>+ int __user *optlen)
> {
> int err;
> int len;
>@@ -1683,8 +1704,17 @@ static int vsock_stream_getsockopt(struct socket *sock,
> return 0;
> }
>
>-static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>- size_t len)
>+static int vsock_stream_getsockopt(struct socket *sock,
>+ int level, int optname,
>+ char __user *optval,
>+ int __user *optlen)
>+{
>+ return vsock_connectible_getsockopt(sock, level, optname, optval,
>+ optlen);
>+}
>+
Ditto.
>+static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>+ size_t len)
> {
> struct sock *sk;
> struct vsock_sock *vsk;
>@@ -1822,10 +1852,16 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> return err;
> }
>
>+static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>+ size_t len)
>+{
>+ return vsock_connectible_sendmsg(sock, msg, len);
>+}
>+
Ditto.
>
> static int
>-vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>- int flags)
>+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>+ int flags)
> {
> struct sock *sk;
> struct vsock_sock *vsk;
>@@ -1995,6 +2031,13 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> return err;
> }
>
>+static int
>+vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>+ int flags)
>+{
>+ return vsock_connectible_recvmsg(sock, msg, len, flags);
>+}
>+
Ditto.
> static const struct proto_ops vsock_stream_ops = {
> .family = PF_VSOCK,
> .owner = THIS_MODULE,
>--
>2.25.1
>
On Mon, Jan 25, 2021 at 02:11:57PM +0300, Arseny Krasnov wrote:
>This prepares 'vsock_connectible_recvmg()' to call SEQPACKET receive
>loop:
>1) Some shared check left in this function, then socket type
> specific receive loop is called.
>2) Stream receive loop is moved to separate function.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/af_vsock.c | 242 ++++++++++++++++++++++-----------------
> 1 file changed, 138 insertions(+), 104 deletions(-)
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index c9ce57db9554..524df8fc84cd 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -1858,65 +1858,69 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> return vsock_connectible_sendmsg(sock, msg, len);
> }
>
>-
>-static int
>-vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>- int flags)
>+static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
>+ long timeout,
>+ struct vsock_transport_recv_notify_data *recv_data,
>+ size_t target)
> {
>- struct sock *sk;
>+ int err = 0;
> struct vsock_sock *vsk;
> const struct vsock_transport *transport;
>- int err;
>- size_t target;
>- ssize_t copied;
>- long timeout;
>- struct vsock_transport_recv_notify_data recv_data;
>-
>- DEFINE_WAIT(wait);
>
>- sk = sock->sk;
> vsk = vsock_sk(sk);
> transport = vsk->transport;
>- err = 0;
>-
>- lock_sock(sk);
>-
>- if (!transport || sk->sk_state != TCP_ESTABLISHED) {
>- /* Recvmsg is supposed to return 0 if a peer performs an
>- * orderly shutdown. Differentiate between that case and when a
>- * peer has not connected or a local shutdown occured with the
>- * SOCK_DONE flag.
>- */
>- if (sock_flag(sk, SOCK_DONE))
>- err = 0;
>- else
>- err = -ENOTCONN;
>
>+ if (sk->sk_err != 0 ||
>+ (sk->sk_shutdown & RCV_SHUTDOWN) ||
>+ (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>+ err = -1;
> goto out;
> }
>-
>- if (flags & MSG_OOB) {
>- err = -EOPNOTSUPP;
>+ /* Don't wait for non-blocking sockets. */
>+ if (timeout == 0) {
>+ err = -EAGAIN;
> goto out;
> }
>
>- /* We don't check peer_shutdown flag here since peer may actually shut
>- * down, but there can be data in the queue that a local socket can
>- * receive.
>- */
>- if (sk->sk_shutdown & RCV_SHUTDOWN) {
>- err = 0;
>- goto out;
>+ if (recv_data) {
>+ err = transport->notify_recv_pre_block(vsk, target, recv_data);
>+ if (err < 0)
>+ goto out;
> }
>
>- /* It is valid on Linux to pass in a zero-length receive buffer. This
>- * is not an error. We may as well bail out now.
>- */
>- if (!len) {
>- err = 0;
>+ release_sock(sk);
>+ timeout = schedule_timeout(timeout);
>+ lock_sock(sk);
>+
>+ if (signal_pending(current)) {
>+ err = sock_intr_errno(timeout);
>+ goto out;
>+ } else if (timeout == 0) {
>+ err = -EAGAIN;
> goto out;
> }
>
>+out:
>+ finish_wait(sk_sleep(sk), wait);
>+ return err;
>+}
>+
>+static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
>+ size_t len, int flags)
>+{
>+ struct vsock_transport_recv_notify_data recv_data;
>+ const struct vsock_transport *transport;
>+ struct vsock_sock *vsk;
>+ ssize_t copied;
>+ size_t target;
>+ long timeout;
>+ int err;
>+
>+ DEFINE_WAIT(wait);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+
> /* We must not copy less than target bytes into the user's buffer
> * before returning successfully, so we wait for the consume queue to
> * have that much data to consume before dequeueing. Note that this
>@@ -1937,85 +1941,53 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>
>
> while (1) {
>+ ssize_t read;
> s64 ready;
>
> prepare_to_wait(sk_sleep(sk), &wait,
> TASK_INTERRUPTIBLE);
> ready = vsock_stream_has_data(vsk);
Maybe we can move also these lines in vsock_wait_data() that can return
'ready' or an error.
>
> if (ready == 0) {
>- if (sk->sk_err != 0 ||
>- (sk->sk_shutdown & RCV_SHUTDOWN) ||
>- (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>- finish_wait(sk_sleep(sk), &wait);
>- break;
>- }
>- /* Don't wait for non-blocking sockets. */
>- if (timeout == 0) {
>- err = -EAGAIN;
>- finish_wait(sk_sleep(sk), &wait);
>- break;
>- }
>-
>- err = transport->notify_recv_pre_block(
>- vsk, target, &recv_data);
>- if (err < 0) {
>- finish_wait(sk_sleep(sk), &wait);
>- break;
>- }
>- release_sock(sk);
>- timeout = schedule_timeout(timeout);
>- lock_sock(sk);
>-
>- if (signal_pending(current)) {
>- err = sock_intr_errno(timeout);
>- finish_wait(sk_sleep(sk), &wait);
>- break;
>- } else if (timeout == 0) {
>- err = -EAGAIN;
>- finish_wait(sk_sleep(sk), &wait);
>+ if (vsock_wait_data(sk, &wait, timeout, &recv_data, target))
> break;
>- }
>- } else {
>- ssize_t read;
>+ continue;
>+ }
>
>- finish_wait(sk_sleep(sk), &wait);
>+ finish_wait(sk_sleep(sk), &wait);
And also this one can be moved in vsock_wait_data().
>
>- if (ready < 0) {
>- /* Invalid queue pair content. XXX This should
>- * be changed to a connection reset in a later
>- * change.
>- */
>+ if (ready < 0) {
>+ /* Invalid queue pair content. XXX This should
>+ * be changed to a connection reset in a later
>+ * change.
>+ */
>
>- err = -ENOMEM;
>- goto out;
>- }
>+ err = -ENOMEM;
>+ goto out;
>+ }
>
>- err = transport->notify_recv_pre_dequeue(
>- vsk, target, &recv_data);
>- if (err < 0)
>- break;
>+ err = transport->notify_recv_pre_dequeue(vsk,
>+ target, &recv_data);
>+ if (err < 0)
>+ break;
>+ read = transport->stream_dequeue(vsk, msg, len - copied, flags);
>
>- read = transport->stream_dequeue(
>- vsk, msg,
>- len - copied, flags);
>- if (read < 0) {
>- err = -ENOMEM;
>- break;
>- }
>+ if (read < 0) {
>+ err = -ENOMEM;
>+ break;
>+ }
>
>- copied += read;
>+ copied += read;
>
>- err = transport->notify_recv_post_dequeue(
>- vsk, target, read,
>+ err = transport->notify_recv_post_dequeue(vsk,
>+ target, read,
> !(flags & MSG_PEEK), &recv_data);
>- if (err < 0)
>- goto out;
>+ if (err < 0)
>+ goto out;
>
>- if (read >= target || flags & MSG_PEEK)
>- break;
>+ if (read >= target || flags & MSG_PEEK)
>+ break;
>
>- target -= read;
>- }
>+ target -= read;
> }
>
> if (sk->sk_err)
>@@ -2031,6 +2003,68 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> return err;
> }
>
>+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
>+ size_t len, int flags)
>+{
>+ return -1;
>+}
>+
You can add this function later, when you implement it...
>+static int
>+vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>+ int flags)
>+{
>+ const struct vsock_transport *transport;
>+ struct vsock_sock *vsk;
>+ struct sock *sk;
>+ int err = 0;
>+
>+ sk = sock->sk;
>+
>+ lock_sock(sk);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+
>+ if (!transport || sk->sk_state != TCP_ESTABLISHED) {
>+ /* Recvmsg is supposed to return 0 if a peer performs an
>+ * orderly shutdown. Differentiate between that case and when a
>+ * peer has not connected or a local shutdown occurred
>with the
>+ * SOCK_DONE flag.
>+ */
>+ if (!sock_flag(sk, SOCK_DONE))
>+ err = -ENOTCONN;
>+
>+ goto out;
>+ }
>+
>+ if (flags & MSG_OOB) {
>+ err = -EOPNOTSUPP;
>+ goto out;
>+ }
>+
>+ /* We don't check peer_shutdown flag here since peer may actually shut
>+ * down, but there can be data in the queue that a local socket can
>+ * receive.
>+ */
>+ if (sk->sk_shutdown & RCV_SHUTDOWN)
>+ goto out;
>+
>+ /* It is valid on Linux to pass in a zero-length receive buffer. This
>+ * is not an error. We may as well bail out now.
>+ */
>+ if (!len)
>+ goto out;
>+
>+ if (sk->sk_type == SOCK_STREAM)
>+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
>+ else
>+ err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
...and also this 'else' branch.
>+
>+out:
>+ release_sock(sk);
>+ return err;
>+}
>+
> static int
> vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> int flags)
>--
>2.25.1
>
On Mon, Jan 25, 2021 at 02:12:36PM +0300, Arseny Krasnov wrote:
>This adds receive loop for SEQPACKET. It looks like receive loop for
>SEQPACKET, but there is a little bit difference:
>1) It doesn't call notify callbacks.
>2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
> there is no sense for these values in SEQPACKET case.
>3) It waits until whole record is received or error is found during
> receiving.
>4) It processes and sets 'MSG_TRUNC' flag.
>
>So to avoid extra conditions for two types of socket inside one loop, two
>independent functions were created.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/net/af_vsock.h | 5 ++
> net/vmw_vsock/af_vsock.c | 102 ++++++++++++++++++++++++++++++++++++++-
> 2 files changed, 106 insertions(+), 1 deletion(-)
>
>diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>index b1c717286993..46073842d489 100644
>--- a/include/net/af_vsock.h
>+++ b/include/net/af_vsock.h
>@@ -135,6 +135,11 @@ struct vsock_transport {
> bool (*stream_is_active)(struct vsock_sock *);
> bool (*stream_allow)(u32 cid, u32 port);
>
>+ /* SEQ_PACKET. */
>+ size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
>+ ssize_t (*seqpacket_dequeue)(struct vsock_sock *, struct msghdr *,
>+ size_t len, int flags);
>+
> /* Notification. */
> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index 524df8fc84cd..3b266880b7c8 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -2006,7 +2006,107 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
> static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
> size_t len, int flags)
> {
>- return -1;
>+ const struct vsock_transport *transport;
>+ const struct iovec *orig_iov;
>+ unsigned long orig_nr_segs;
>+ ssize_t dequeued_total = 0;
>+ struct vsock_sock *vsk;
>+ size_t record_len;
>+ long timeout;
>+ int err = 0;
>+ DEFINE_WAIT(wait);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+
>+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>+ msg->msg_flags &= ~MSG_EOR;
Maybe add a comment about why we need to clear MSG_EOR.
>+ orig_nr_segs = msg->msg_iter.nr_segs;
>+ orig_iov = msg->msg_iter.iov;
>+
>+ while (1) {
>+ ssize_t dequeued;
>+ s64 ready;
>+
>+ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>+ ready = vsock_stream_has_data(vsk);
>+
>+ if (ready == 0) {
>+ if (vsock_wait_data(sk, &wait, timeout, NULL, 0)) {
>+ /* In case of any loop break(timeout, signal
>+ * interrupt or shutdown), we report user that
>+ * nothing was copied.
>+ */
>+ dequeued_total = 0;
>+ break;
>+ }
>+ continue;
>+ }
>+
>+ finish_wait(sk_sleep(sk), &wait);
>+
>+ if (ready < 0) {
>+ err = -ENOMEM;
>+ goto out;
>+ }
>+
>+ if (dequeued_total == 0) {
>+ record_len =
>+ transport->seqpacket_seq_get_len(vsk);
>+
>+ if (record_len == 0)
>+ continue;
>+ }
>+
>+ /* 'msg_iter.count' is number of unused bytes in iov.
>+ * On every copy to iov iterator it is decremented at
>+ * size of data.
>+ */
>+ dequeued = transport->seqpacket_dequeue(vsk, msg,
>+ msg->msg_iter.count, flags);
^
Is this needed or 'msg' can be
used in the transport?
>+
>+ if (dequeued < 0) {
>+ dequeued_total = 0;
>+
>+ if (dequeued == -EAGAIN) {
>+ iov_iter_init(&msg->msg_iter, READ,
>+ orig_iov, orig_nr_segs,
>+ len);
>+ msg->msg_flags &= ~MSG_EOR;
>+ continue;
Why we need to reset MSG_EOR here?
>+ }
>+
>+ err = -ENOMEM;
>+ break;
>+ }
>+
>+ dequeued_total += dequeued;
>+
>+ if (dequeued_total >= record_len)
>+ break;
>+ }
Maybe a new line here.
>+ if (sk->sk_err)
>+ err = -sk->sk_err;
>+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
>+ err = 0;
>+
>+ if (dequeued_total > 0) {
>+ /* User sets MSG_TRUNC, so return real length of
>+ * packet.
>+ */
>+ if (flags & MSG_TRUNC)
>+ err = record_len;
>+ else
>+ err = len - msg->msg_iter.count;
>+
>+ /* Always set MSG_TRUNC if real length of packet is
>+ * bigger that user buffer.
s/that/than
>+ */
>+ if (record_len > len)
>+ msg->msg_flags |= MSG_TRUNC;
>+ }
>+out:
>+ return err;
> }
>
> static int
>--
>2.25.1
>
On Mon, Jan 25, 2021 at 02:13:18PM +0300, Arseny Krasnov wrote:
>This does rest of SOCK_SEQPACKET support:
>1) Adds socket ops for SEQPACKET type.
>2) Allows to create socket with SEQPACKET type.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> net/vmw_vsock/af_vsock.c | 71 ++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 71 insertions(+)
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index 4600c1ec3bb3..bbc3c31085aa 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -452,6 +452,7 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
> new_transport = transport_dgram;
> break;
> case SOCK_STREAM:
>+ case SOCK_SEQPACKET:
> if (vsock_use_local_transport(remote_cid))
> new_transport = transport_local;
> else if (remote_cid <= VMADDR_CID_HOST || !transport_h2g ||
>@@ -459,6 +460,13 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
> new_transport = transport_g2h;
> else
> new_transport = transport_h2g;
>+
>+ if (sk->sk_type == SOCK_SEQPACKET) {
I think you must also check that new_transport is not NULL.
>+ if (!new_transport->seqpacket_seq_send_len ||
>+ !new_transport->seqpacket_seq_get_len ||
>+ !new_transport->seqpacket_dequeue)
>+ return -ENODEV;
I'm not sure about ENODEV is the better choice in this case, since the
transport exists, but it doesn't support SOCK_SEQPACKET, so maybe is
better ESOCKTNOSUPPORT.
>+ }
> break;
> default:
> return -ESOCKTNOSUPPORT;
>@@ -684,6 +692,7 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
>
> switch (sk->sk_socket->type) {
> case SOCK_STREAM:
>+ case SOCK_SEQPACKET:
> spin_lock_bh(&vsock_table_lock);
> retval = __vsock_bind_connectible(vsk, addr);
> spin_unlock_bh(&vsock_table_lock);
>@@ -1406,6 +1415,12 @@ static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
> return vsock_connect(sock, addr, addr_len, flags);
> }
>
>+static int vsock_seqpacket_connect(struct socket *sock, struct sockaddr *addr,
>+ int addr_len, int flags)
>+{
>+ return vsock_connect(sock, addr, addr_len, flags);
>+}
>+
> static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
> bool kern)
> {
>@@ -1633,6 +1648,16 @@ static int vsock_stream_setsockopt(struct socket *sock,
> optlen);
> }
>
>+static int vsock_seqpacket_setsockopt(struct socket *sock,
>+ int level,
>+ int optname,
>+ sockptr_t optval,
>+ unsigned int optlen)
>+{
>+ return vsock_connectible_setsockopt(sock, level, optname, optval,
>+ optlen);
>+}
>+
> static int vsock_connectible_getsockopt(struct socket *sock,
> int level, int optname,
> char __user *optval,
>@@ -1713,6 +1738,15 @@ static int vsock_stream_getsockopt(struct socket *sock,
> optlen);
> }
>
>+static int vsock_seqpacket_getsockopt(struct socket *sock,
>+ int level, int optname,
>+ char __user *optval,
>+ int __user *optlen)
>+{
>+ return vsock_connectible_getsockopt(sock, level, optname, optval,
>+ optlen);
>+}
>+
> static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> size_t len)
> {
>@@ -1870,6 +1904,12 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> return vsock_connectible_sendmsg(sock, msg, len);
> }
>
>+static int vsock_seqpacket_sendmsg(struct socket *sock, struct msghdr *msg,
>+ size_t len)
>+{
>+ return vsock_connectible_sendmsg(sock, msg, len);
>+}
>+
> static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
> long timeout,
> struct vsock_transport_recv_notify_data *recv_data,
>@@ -2184,6 +2224,13 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> return vsock_connectible_recvmsg(sock, msg, len, flags);
> }
>
>+static int
>+vsock_seqpacket_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>+ int flags)
>+{
>+ return vsock_connectible_recvmsg(sock, msg, len, flags);
>+}
>+
As I said, I think you don't need to implement all of this helpers and
you can directly assign the vsock_connectible_* functions in the
'vsock_seqpacket_ops'.
> static const struct proto_ops vsock_stream_ops = {
> .family = PF_VSOCK,
> .owner = THIS_MODULE,
>@@ -2205,6 +2252,27 @@ static const struct proto_ops vsock_stream_ops = {
> .sendpage = sock_no_sendpage,
> };
>
>+static const struct proto_ops vsock_seqpacket_ops = {
>+ .family = PF_VSOCK,
>+ .owner = THIS_MODULE,
>+ .release = vsock_release,
>+ .bind = vsock_bind,
>+ .connect = vsock_seqpacket_connect,
>+ .socketpair = sock_no_socketpair,
>+ .accept = vsock_accept,
>+ .getname = vsock_getname,
>+ .poll = vsock_poll,
>+ .ioctl = sock_no_ioctl,
>+ .listen = vsock_listen,
>+ .shutdown = vsock_shutdown,
>+ .setsockopt = vsock_seqpacket_setsockopt,
>+ .getsockopt = vsock_seqpacket_getsockopt,
>+ .sendmsg = vsock_seqpacket_sendmsg,
>+ .recvmsg = vsock_seqpacket_recvmsg,
>+ .mmap = sock_no_mmap,
>+ .sendpage = sock_no_sendpage,
>+};
>+
> static int vsock_create(struct net *net, struct socket *sock,
> int protocol, int kern)
> {
>@@ -2225,6 +2293,9 @@ static int vsock_create(struct net *net, struct socket *sock,
> case SOCK_STREAM:
> sock->ops = &vsock_stream_ops;
> break;
>+ case SOCK_SEQPACKET:
>+ sock->ops = &vsock_seqpacket_ops;
>+ break;
> default:
> return -ESOCKTNOSUPPORT;
> }
>--
>2.25.1
>
Hi Arseny,
I reviewed a part, tomorrow I hope to finish the other patches.
Just a couple of comments in the TODOs below.
On Mon, Jan 25, 2021 at 02:09:00PM +0300, Arseny Krasnov wrote:
> This patchset impelements support of SOCK_SEQPACKET for virtio
>transport.
> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>do it, new packet operation was added: it marks start of record (with
>record length in header), such packet doesn't carry any data. To send
>record, packet with start marker is sent first, then all data is sent
>as usual 'RW' packets. On receiver's side, length of record is known
>from packet with start record marker. Now as packets of one socket
>are not reordered neither on vsock nor on vhost transport layers, such
>marker allows to restore original record on receiver's side. If user's
>buffer is smaller that record length, when all out of size data is
>dropped.
> Maximum length of datagram is not limited as in stream socket,
>because same credit logic is used. Difference with stream socket is
>that user is not woken up until whole record is received or error
>occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
> Tests also implemented.
>
> Arseny Krasnov (13):
> af_vsock: prepare for SOCK_SEQPACKET support
> af_vsock: prepare 'vsock_connectible_recvmsg()'
> af_vsock: implement SEQPACKET rx loop
> af_vsock: implement send logic for SOCK_SEQPACKET
> af_vsock: rest of SEQPACKET support
> af_vsock: update comments for stream sockets
> virtio/vsock: dequeue callback for SOCK_SEQPACKET
> virtio/vsock: fetch length for SEQPACKET record
> virtio/vsock: add SEQPACKET receive logic
> virtio/vsock: rest of SOCK_SEQPACKET support
> virtio/vsock: setup SEQPACKET ops for transport
> vhost/vsock: setup SEQPACKET ops for transport
> vsock_test: add SOCK_SEQPACKET tests
>
> drivers/vhost/vsock.c | 7 +-
> include/linux/virtio_vsock.h | 12 +
> include/net/af_vsock.h | 6 +
> include/uapi/linux/virtio_vsock.h | 9 +
> net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
> net/vmw_vsock/virtio_transport.c | 4 +
> net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
> tools/testing/vsock/util.c | 32 +-
> tools/testing/vsock/util.h | 3 +
> tools/testing/vsock/vsock_test.c | 126 +++++
> 10 files changed, 862 insertions(+), 175 deletions(-)
>
> TODO:
> - Support for record integrity control. As transport could drop some
> packets, something like "record-id" and record end marker need to
> be implemented. Idea is that SEQ_BEGIN packet carries both record
> length and record id, end marker(let it be SEQ_END) carries only
> record id. To be sure that no one packet was lost, receiver checks
> length of data between SEQ_BEGIN and SEQ_END(it must be same with
> value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
> means that both markers were not dropped. I think that easiest way
> to implement record id for SEQ_BEGIN is to reuse another field of
> packet header(SEQ_BEGIN already uses 'flags' as record length).For
> SEQ_END record id could be stored in 'flags'.
I don't really like the idea of reusing the 'flags' field for this
purpose.
> Another way to implement it, is to move metadata of both SEQ_END
> and SEQ_BEGIN to payload. But this approach has problem, because
> if we move something to payload, such payload is accounted by
> credit logic, which fragments payload, while payload with record
> length and id couldn't be fragmented. One way to overcome it is to
> ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
> is to update 'stream_has_space()' function: current implementation
> return non-zero when at least 1 byte is allowed to use,but updated
> version will have extra argument, which is needed length. For 'RW'
> packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
> record id) and for SEQ_END it is sizeof(record id).
Is the payload accounted by credit logic also if hdr.op is not
VIRTIO_VSOCK_OP_RW?
I think that we can define a specific header to put after the
virtio_vsock_hdr when hdr.op is SEQ_BEGIN or SEQ_END, and in this header
we can store the id and the length of the message.
>
> - What to do, when server doesn't support SOCK_SEQPACKET. In current
> implementation RST is replied in the same way when listening port
> is not found. I think that current RST is enough,because case when
> server doesn't support SEQ_PACKET is same when listener missed(e.g.
> no listener in both cases).
I think so, but I'll check better if we can have some issues.
Thanks,
Stefano
>
> v2 -> v3:
> - patches reorganized: split for prepare and implementation patches
> - local variables are declared in "Reverse Christmas tree" manner
> - virtio_transport_common.c: valid leXX_to_cpu() for vsock header
> fields access
> - af_vsock.c: 'vsock_connectible_*sockopt()' added as shared code
> between stream and seqpacket sockets.
> - af_vsock.c: loops in '__vsock_*_recvmsg()' refactored.
> - af_vsock.c: 'vsock_wait_data()' refactored.
>
> v1 -> v2:
> - patches reordered: af_vsock.c related changes now before virtio vsock
> - patches reorganized: more small patches, where +/- are not mixed
> - tests for SOCK_SEQPACKET added
> - all commit messages updated
> - af_vsock.c: 'vsock_pre_recv_check()' inlined to
> 'vsock_connectible_recvmsg()'
> - af_vsock.c: 'vsock_assign_transport()' returns ENODEV if transport
> was not found
> - virtio_transport_common.c: transport callback for seqpacket dequeue
> - virtio_transport_common.c: simplified
> 'virtio_transport_recv_connected()'
> - virtio_transport_common.c: send reset on socket and packet type
> mismatch.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>
>--
>2.25.1
>
On 28.01.2021 19:55, Stefano Garzarella wrote:
> On Mon, Jan 25, 2021 at 02:12:36PM +0300, Arseny Krasnov wrote:
>> This adds receive loop for SEQPACKET. It looks like receive loop for
>> SEQPACKET, but there is a little bit difference:
>> 1) It doesn't call notify callbacks.
>> 2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
>> there is no sense for these values in SEQPACKET case.
>> 3) It waits until whole record is received or error is found during
>> receiving.
>> 4) It processes and sets 'MSG_TRUNC' flag.
>>
>> So to avoid extra conditions for two types of socket inside one loop, two
>> independent functions were created.
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>> ---
>> include/net/af_vsock.h | 5 ++
>> net/vmw_vsock/af_vsock.c | 102 ++++++++++++++++++++++++++++++++++++++-
>> 2 files changed, 106 insertions(+), 1 deletion(-)
>>
>> diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>> index b1c717286993..46073842d489 100644
>> --- a/include/net/af_vsock.h
>> +++ b/include/net/af_vsock.h
>> @@ -135,6 +135,11 @@ struct vsock_transport {
>> bool (*stream_is_active)(struct vsock_sock *);
>> bool (*stream_allow)(u32 cid, u32 port);
>>
>> + /* SEQ_PACKET. */
>> + size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
>> + ssize_t (*seqpacket_dequeue)(struct vsock_sock *, struct msghdr *,
>> + size_t len, int flags);
>> +
>> /* Notification. */
>> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
>> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
>> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>> index 524df8fc84cd..3b266880b7c8 100644
>> --- a/net/vmw_vsock/af_vsock.c
>> +++ b/net/vmw_vsock/af_vsock.c
>> @@ -2006,7 +2006,107 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
>> static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
>> size_t len, int flags)
>> {
>> - return -1;
>> + const struct vsock_transport *transport;
>> + const struct iovec *orig_iov;
>> + unsigned long orig_nr_segs;
>> + ssize_t dequeued_total = 0;
>> + struct vsock_sock *vsk;
>> + size_t record_len;
>> + long timeout;
>> + int err = 0;
>> + DEFINE_WAIT(wait);
>> +
>> + vsk = vsock_sk(sk);
>> + transport = vsk->transport;
>> +
>> + timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>> + msg->msg_flags &= ~MSG_EOR;
> Maybe add a comment about why we need to clear MSG_EOR.
>
>> + orig_nr_segs = msg->msg_iter.nr_segs;
>> + orig_iov = msg->msg_iter.iov;
>> +
>> + while (1) {
>> + ssize_t dequeued;
>> + s64 ready;
>> +
>> + prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>> + ready = vsock_stream_has_data(vsk);
>> +
>> + if (ready == 0) {
>> + if (vsock_wait_data(sk, &wait, timeout, NULL, 0)) {
>> + /* In case of any loop break(timeout, signal
>> + * interrupt or shutdown), we report user that
>> + * nothing was copied.
>> + */
>> + dequeued_total = 0;
>> + break;
>> + }
>> + continue;
>> + }
>> +
>> + finish_wait(sk_sleep(sk), &wait);
>> +
>> + if (ready < 0) {
>> + err = -ENOMEM;
>> + goto out;
>> + }
>> +
>> + if (dequeued_total == 0) {
>> + record_len =
>> + transport->seqpacket_seq_get_len(vsk);
>> +
>> + if (record_len == 0)
>> + continue;
>> + }
>> +
>> + /* 'msg_iter.count' is number of unused bytes in iov.
>> + * On every copy to iov iterator it is decremented at
>> + * size of data.
>> + */
>> + dequeued = transport->seqpacket_dequeue(vsk, msg,
>> + msg->msg_iter.count, flags);
> ^
> Is this needed or 'msg' can be
> used in the transport?
Yes, right
>> +
>> + if (dequeued < 0) {
>> + dequeued_total = 0;
>> +
>> + if (dequeued == -EAGAIN) {
>> + iov_iter_init(&msg->msg_iter, READ,
>> + orig_iov, orig_nr_segs,
>> + len);
>> + msg->msg_flags &= ~MSG_EOR;
>> + continue;
> Why we need to reset MSG_EOR here?
Because if previous attempt to receive record was failed, but
MSG_EOR was set, so we clear it for next attempt to get record
>
>> + }
>> +
>> + err = -ENOMEM;
>> + break;
>> + }
>> +
>> + dequeued_total += dequeued;
>> +
>> + if (dequeued_total >= record_len)
>> + break;
>> + }
> Maybe a new line here.
>
>> + if (sk->sk_err)
>> + err = -sk->sk_err;
>> + else if (sk->sk_shutdown & RCV_SHUTDOWN)
>> + err = 0;
>> +
>> + if (dequeued_total > 0) {
>> + /* User sets MSG_TRUNC, so return real length of
>> + * packet.
>> + */
>> + if (flags & MSG_TRUNC)
>> + err = record_len;
>> + else
>> + err = len - msg->msg_iter.count;
>> +
>> + /* Always set MSG_TRUNC if real length of packet is
>> + * bigger that user buffer.
> s/that/than
>
>> + */
>> + if (record_len > len)
>> + msg->msg_flags |= MSG_TRUNC;
>> + }
>> +out:
>> + return err;
>> }
>>
>> static int
>> --
>> 2.25.1
>>
>
On 28.01.2021 20:19, Stefano Garzarella wrote:
> Hi Arseny,
> I reviewed a part, tomorrow I hope to finish the other patches.
>
> Just a couple of comments in the TODOs below.
>
> On Mon, Jan 25, 2021 at 02:09:00PM +0300, Arseny Krasnov wrote:
>> This patchset impelements support of SOCK_SEQPACKET for virtio
>> transport.
>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>> do it, new packet operation was added: it marks start of record (with
>> record length in header), such packet doesn't carry any data. To send
>> record, packet with start marker is sent first, then all data is sent
>> as usual 'RW' packets. On receiver's side, length of record is known
> >from packet with start record marker. Now as packets of one socket
>> are not reordered neither on vsock nor on vhost transport layers, such
>> marker allows to restore original record on receiver's side. If user's
>> buffer is smaller that record length, when all out of size data is
>> dropped.
>> Maximum length of datagram is not limited as in stream socket,
>> because same credit logic is used. Difference with stream socket is
>> that user is not woken up until whole record is received or error
>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>> Tests also implemented.
>>
>> Arseny Krasnov (13):
>> af_vsock: prepare for SOCK_SEQPACKET support
>> af_vsock: prepare 'vsock_connectible_recvmsg()'
>> af_vsock: implement SEQPACKET rx loop
>> af_vsock: implement send logic for SOCK_SEQPACKET
>> af_vsock: rest of SEQPACKET support
>> af_vsock: update comments for stream sockets
>> virtio/vsock: dequeue callback for SOCK_SEQPACKET
>> virtio/vsock: fetch length for SEQPACKET record
>> virtio/vsock: add SEQPACKET receive logic
>> virtio/vsock: rest of SOCK_SEQPACKET support
>> virtio/vsock: setup SEQPACKET ops for transport
>> vhost/vsock: setup SEQPACKET ops for transport
>> vsock_test: add SOCK_SEQPACKET tests
>>
>> drivers/vhost/vsock.c | 7 +-
>> include/linux/virtio_vsock.h | 12 +
>> include/net/af_vsock.h | 6 +
>> include/uapi/linux/virtio_vsock.h | 9 +
>> net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
>> net/vmw_vsock/virtio_transport.c | 4 +
>> net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
>> tools/testing/vsock/util.c | 32 +-
>> tools/testing/vsock/util.h | 3 +
>> tools/testing/vsock/vsock_test.c | 126 +++++
>> 10 files changed, 862 insertions(+), 175 deletions(-)
>>
>> TODO:
>> - Support for record integrity control. As transport could drop some
>> packets, something like "record-id" and record end marker need to
>> be implemented. Idea is that SEQ_BEGIN packet carries both record
>> length and record id, end marker(let it be SEQ_END) carries only
>> record id. To be sure that no one packet was lost, receiver checks
>> length of data between SEQ_BEGIN and SEQ_END(it must be same with
>> value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
>> means that both markers were not dropped. I think that easiest way
>> to implement record id for SEQ_BEGIN is to reuse another field of
>> packet header(SEQ_BEGIN already uses 'flags' as record length).For
>> SEQ_END record id could be stored in 'flags'.
> I don't really like the idea of reusing the 'flags' field for this
> purpose.
>
>> Another way to implement it, is to move metadata of both SEQ_END
>> and SEQ_BEGIN to payload. But this approach has problem, because
>> if we move something to payload, such payload is accounted by
>> credit logic, which fragments payload, while payload with record
>> length and id couldn't be fragmented. One way to overcome it is to
>> ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
>> is to update 'stream_has_space()' function: current implementation
>> return non-zero when at least 1 byte is allowed to use,but updated
>> version will have extra argument, which is needed length. For 'RW'
>> packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
>> record id) and for SEQ_END it is sizeof(record id).
> Is the payload accounted by credit logic also if hdr.op is not
> VIRTIO_VSOCK_OP_RW?
Yes, on send any packet with payload could be fragmented if
there is not enough space at receiver. On receive 'fwd_cnt' and
'buf_alloc' are updated with header of every packet. Of course,
to every such case i've described i can add check for 'RW'
packet, to exclude payload from credit accounting, but this is
bunch of dumb checks.
>
> I think that we can define a specific header to put after the
> virtio_vsock_hdr when hdr.op is SEQ_BEGIN or SEQ_END, and in this header
> we can store the id and the length of the message.
I think it is better than use payload and touch credit logic
>
>> - What to do, when server doesn't support SOCK_SEQPACKET. In current
>> implementation RST is replied in the same way when listening port
>> is not found. I think that current RST is enough,because case when
>> server doesn't support SEQ_PACKET is same when listener missed(e.g.
>> no listener in both cases).
> I think so, but I'll check better if we can have some issues.
>
> Thanks,
> Stefano
>
>> v2 -> v3:
>> - patches reorganized: split for prepare and implementation patches
>> - local variables are declared in "Reverse Christmas tree" manner
>> - virtio_transport_common.c: valid leXX_to_cpu() for vsock header
>> fields access
>> - af_vsock.c: 'vsock_connectible_*sockopt()' added as shared code
>> between stream and seqpacket sockets.
>> - af_vsock.c: loops in '__vsock_*_recvmsg()' refactored.
>> - af_vsock.c: 'vsock_wait_data()' refactored.
>>
>> v1 -> v2:
>> - patches reordered: af_vsock.c related changes now before virtio vsock
>> - patches reorganized: more small patches, where +/- are not mixed
>> - tests for SOCK_SEQPACKET added
>> - all commit messages updated
>> - af_vsock.c: 'vsock_pre_recv_check()' inlined to
>> 'vsock_connectible_recvmsg()'
>> - af_vsock.c: 'vsock_assign_transport()' returns ENODEV if transport
>> was not found
>> - virtio_transport_common.c: transport callback for seqpacket dequeue
>> - virtio_transport_common.c: simplified
>> 'virtio_transport_recv_connected()'
>> - virtio_transport_common.c: send reset on socket and packet type
>> mismatch.
>>
>> Signed-off-by: Arseny Krasnov <[email protected]>
>>
>> --
>> 2.25.1
>>
>
On Fri, Jan 29, 2021 at 09:41:50AM +0300, Arseny Krasnov wrote:
>
>On 28.01.2021 20:19, Stefano Garzarella wrote:
>> Hi Arseny,
>> I reviewed a part, tomorrow I hope to finish the other patches.
>>
>> Just a couple of comments in the TODOs below.
>>
>> On Mon, Jan 25, 2021 at 02:09:00PM +0300, Arseny Krasnov wrote:
>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>> transport.
>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>> do it, new packet operation was added: it marks start of record (with
>>> record length in header), such packet doesn't carry any data. To send
>>> record, packet with start marker is sent first, then all data is sent
>>> as usual 'RW' packets. On receiver's side, length of record is known
>> >from packet with start record marker. Now as packets of one socket
>>> are not reordered neither on vsock nor on vhost transport layers, such
>>> marker allows to restore original record on receiver's side. If user's
>>> buffer is smaller that record length, when all out of size data is
>>> dropped.
>>> Maximum length of datagram is not limited as in stream socket,
>>> because same credit logic is used. Difference with stream socket is
>>> that user is not woken up until whole record is received or error
>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>> Tests also implemented.
>>>
>>> Arseny Krasnov (13):
>>> af_vsock: prepare for SOCK_SEQPACKET support
>>> af_vsock: prepare 'vsock_connectible_recvmsg()'
>>> af_vsock: implement SEQPACKET rx loop
>>> af_vsock: implement send logic for SOCK_SEQPACKET
>>> af_vsock: rest of SEQPACKET support
>>> af_vsock: update comments for stream sockets
>>> virtio/vsock: dequeue callback for SOCK_SEQPACKET
>>> virtio/vsock: fetch length for SEQPACKET record
>>> virtio/vsock: add SEQPACKET receive logic
>>> virtio/vsock: rest of SOCK_SEQPACKET support
>>> virtio/vsock: setup SEQPACKET ops for transport
>>> vhost/vsock: setup SEQPACKET ops for transport
>>> vsock_test: add SOCK_SEQPACKET tests
>>>
>>> drivers/vhost/vsock.c | 7 +-
>>> include/linux/virtio_vsock.h | 12 +
>>> include/net/af_vsock.h | 6 +
>>> include/uapi/linux/virtio_vsock.h | 9 +
>>> net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
>>> net/vmw_vsock/virtio_transport.c | 4 +
>>> net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
>>> tools/testing/vsock/util.c | 32 +-
>>> tools/testing/vsock/util.h | 3 +
>>> tools/testing/vsock/vsock_test.c | 126 +++++
>>> 10 files changed, 862 insertions(+), 175 deletions(-)
>>>
>>> TODO:
>>> - Support for record integrity control. As transport could drop some
>>> packets, something like "record-id" and record end marker need to
>>> be implemented. Idea is that SEQ_BEGIN packet carries both record
>>> length and record id, end marker(let it be SEQ_END) carries only
>>> record id. To be sure that no one packet was lost, receiver checks
>>> length of data between SEQ_BEGIN and SEQ_END(it must be same with
>>> value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
>>> means that both markers were not dropped. I think that easiest way
>>> to implement record id for SEQ_BEGIN is to reuse another field of
>>> packet header(SEQ_BEGIN already uses 'flags' as record length).For
>>> SEQ_END record id could be stored in 'flags'.
>> I don't really like the idea of reusing the 'flags' field for this
>> purpose.
>>
>>> Another way to implement it, is to move metadata of both SEQ_END
>>> and SEQ_BEGIN to payload. But this approach has problem, because
>>> if we move something to payload, such payload is accounted by
>>> credit logic, which fragments payload, while payload with record
>>> length and id couldn't be fragmented. One way to overcome it is to
>>> ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
>>> is to update 'stream_has_space()' function: current implementation
>>> return non-zero when at least 1 byte is allowed to use,but updated
>>> version will have extra argument, which is needed length. For 'RW'
>>> packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
>>> record id) and for SEQ_END it is sizeof(record id).
>> Is the payload accounted by credit logic also if hdr.op is not
>> VIRTIO_VSOCK_OP_RW?
>
>Yes, on send any packet with payload could be fragmented if
>
>there is not enough space at receiver. On receive 'fwd_cnt' and
>
>'buf_alloc' are updated with header of every packet. Of course,
>
>to every such case i've described i can add check for 'RW'
>
>packet, to exclude payload from credit accounting, but this is
>
>bunch of dumb checks.
>
>>
>> I think that we can define a specific header to put after the
>> virtio_vsock_hdr when hdr.op is SEQ_BEGIN or SEQ_END, and in this header
>> we can store the id and the length of the message.
>
>I think it is better than use payload and touch credit logic
>
Cool, so let's try this option, hoping there aren't a lot of issues.
Another item for TODO could be to add the SOCK_SEQPACKET support also
for vsock_loopback. Should be simple since it also uses
virtio_transport_common APIs and it can be useful for testing and
debugging.
Thanks,
Stefano
On Fri, Jan 29, 2021 at 06:52:23PM +0300, Arseny Krasnov wrote:
>
>On 29.01.2021 12:26, Stefano Garzarella wrote:
>> On Fri, Jan 29, 2021 at 09:41:50AM +0300, Arseny Krasnov wrote:
>>> On 28.01.2021 20:19, Stefano Garzarella wrote:
>>>> Hi Arseny,
>>>> I reviewed a part, tomorrow I hope to finish the other patches.
>>>>
>>>> Just a couple of comments in the TODOs below.
>>>>
>>>> On Mon, Jan 25, 2021 at 02:09:00PM +0300, Arseny Krasnov wrote:
>>>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>>>> transport.
>>>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>>>> do it, new packet operation was added: it marks start of record (with
>>>>> record length in header), such packet doesn't carry any data. To send
>>>>> record, packet with start marker is sent first, then all data is sent
>>>>> as usual 'RW' packets. On receiver's side, length of record is known
>>>> >from packet with start record marker. Now as packets of one socket
>>>>> are not reordered neither on vsock nor on vhost transport layers, such
>>>>> marker allows to restore original record on receiver's side. If user's
>>>>> buffer is smaller that record length, when all out of size data is
>>>>> dropped.
>>>>> Maximum length of datagram is not limited as in stream socket,
>>>>> because same credit logic is used. Difference with stream socket is
>>>>> that user is not woken up until whole record is received or error
>>>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>>>> Tests also implemented.
>>>>>
>>>>> Arseny Krasnov (13):
>>>>> af_vsock: prepare for SOCK_SEQPACKET support
>>>>> af_vsock: prepare 'vsock_connectible_recvmsg()'
>>>>> af_vsock: implement SEQPACKET rx loop
>>>>> af_vsock: implement send logic for SOCK_SEQPACKET
>>>>> af_vsock: rest of SEQPACKET support
>>>>> af_vsock: update comments for stream sockets
>>>>> virtio/vsock: dequeue callback for SOCK_SEQPACKET
>>>>> virtio/vsock: fetch length for SEQPACKET record
>>>>> virtio/vsock: add SEQPACKET receive logic
>>>>> virtio/vsock: rest of SOCK_SEQPACKET support
>>>>> virtio/vsock: setup SEQPACKET ops for transport
>>>>> vhost/vsock: setup SEQPACKET ops for transport
>>>>> vsock_test: add SOCK_SEQPACKET tests
>>>>>
>>>>> drivers/vhost/vsock.c | 7 +-
>>>>> include/linux/virtio_vsock.h | 12 +
>>>>> include/net/af_vsock.h | 6 +
>>>>> include/uapi/linux/virtio_vsock.h | 9 +
>>>>> net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
>>>>> net/vmw_vsock/virtio_transport.c | 4 +
>>>>> net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
>>>>> tools/testing/vsock/util.c | 32 +-
>>>>> tools/testing/vsock/util.h | 3 +
>>>>> tools/testing/vsock/vsock_test.c | 126 +++++
>>>>> 10 files changed, 862 insertions(+), 175 deletions(-)
>>>>>
>>>>> TODO:
>>>>> - Support for record integrity control. As transport could drop some
>>>>> packets, something like "record-id" and record end marker need to
>>>>> be implemented. Idea is that SEQ_BEGIN packet carries both record
>>>>> length and record id, end marker(let it be SEQ_END) carries only
>>>>> record id. To be sure that no one packet was lost, receiver checks
>>>>> length of data between SEQ_BEGIN and SEQ_END(it must be same with
>>>>> value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
>>>>> means that both markers were not dropped. I think that easiest way
>>>>> to implement record id for SEQ_BEGIN is to reuse another field of
>>>>> packet header(SEQ_BEGIN already uses 'flags' as record length).For
>>>>> SEQ_END record id could be stored in 'flags'.
>>>> I don't really like the idea of reusing the 'flags' field for this
>>>> purpose.
>>>>
>>>>> Another way to implement it, is to move metadata of both SEQ_END
>>>>> and SEQ_BEGIN to payload. But this approach has problem, because
>>>>> if we move something to payload, such payload is accounted by
>>>>> credit logic, which fragments payload, while payload with record
>>>>> length and id couldn't be fragmented. One way to overcome it is to
>>>>> ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
>>>>> is to update 'stream_has_space()' function: current implementation
>>>>> return non-zero when at least 1 byte is allowed to use,but updated
>>>>> version will have extra argument, which is needed length. For 'RW'
>>>>> packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
>>>>> record id) and for SEQ_END it is sizeof(record id).
>>>> Is the payload accounted by credit logic also if hdr.op is not
>>>> VIRTIO_VSOCK_OP_RW?
>>> Yes, on send any packet with payload could be fragmented if
>>>
>>> there is not enough space at receiver. On receive 'fwd_cnt' and
>>>
>>> 'buf_alloc' are updated with header of every packet. Of course,
>>>
>>> to every such case i've described i can add check for 'RW'
>>>
>>> packet, to exclude payload from credit accounting, but this is
>>>
>>> bunch of dumb checks.
>>>
>>>> I think that we can define a specific header to put after the
>>>> virtio_vsock_hdr when hdr.op is SEQ_BEGIN or SEQ_END, and in this header
>>>> we can store the id and the length of the message.
>>> I think it is better than use payload and touch credit logic
>>>
>> Cool, so let's try this option, hoping there aren't a lot of issues.
>
>If i understand, current implementation has 'struct virtio_vsock_hdr',
>
>then i'll add 'struct virtio_vsock_hdr_seq' with message length and id.
>
>After that, in 'struct virtio_vsock_pkt' which describes packet, field for
>
>header(which is 'struct virtio_vsock_hdr') must be replaced with new
>
>structure which? contains both 'struct virtio_vsock_hdr' and 'struct
>
>virtio_vsock_hdr_seq', because header field of 'struct virtio_vsock_pkt'
>
>is buffer for virtio layer. After it all accesses to header(for example to
>
>'buf_alloc' field will go accross new? structure with both headers:
>
>pkt->hdr.buf_alloc?? ->?? pkt->extended_hdr.classic_hdr.buf_alloc
>
>May be to avoid this, packet's header could be allocated dynamically
>
>in the same manner as packet's buffer? Size of allocation is always
>
>sizeof(classic header) + sizeof(seq header). In 'struct virtio_vsock_pkt'
>
>such header will be implemented as union of two pointers: class header
>
>and extended header containing classic and seq header. Which pointer
>
>to use is depends on packet's op.
I think that the 'classic header' can stay as is, and the extended
header can be dynamically allocated, as we do for the payload.
But we have to be careful what happens if the other peer doesn't support
SEQPACKET and if it counts this extra header as a payload for the credit
mechanism.
I'll try to take a closer look in the next few days.
Thanks,
Stefano
On Fri, Jan 29, 2021 at 07:00:16PM +0300, Arseny Krasnov wrote:
>
>On 29.01.2021 14:03, Stefano Garzarella wrote:
>> On Mon, Jan 25, 2021 at 02:15:26PM +0300, Arseny Krasnov wrote:
>>> This adds rest of logic for SEQPACKET:
>>> 1) Shared functions for packet sending now set valid type of packet
>>> according socket type.
>>> 2) SEQPACKET specific function like SEQ_BEGIN send and data dequeue.
>>> 3) TAP support for SEQPACKET is not so easy if it is necessary to
>>> send whole record to TAP interface. This could be done by allocating
>>> new packet when whole record is received, data of record must be
>>> copied to TAP packet.
>>>
>>> Signed-off-by: Arseny Krasnov <[email protected]>
>>> ---
>>> include/linux/virtio_vsock.h | 7 ++++
>>> net/vmw_vsock/virtio_transport_common.c | 55 +++++++++++++++++++++----
>>> 2 files changed, 55 insertions(+), 7 deletions(-)
>>>
>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>> index af8705ea8b95..ad9783df97c9 100644
>>> --- a/include/linux/virtio_vsock.h
>>> +++ b/include/linux/virtio_vsock.h
>>> @@ -84,7 +84,14 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>> struct msghdr *msg,
>>> size_t len, int flags);
>>>
>>> +bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len);
>>> size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
>>> +ssize_t
>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>> + struct msghdr *msg,
>>> + size_t len,
>>> + int type);
>>> +
>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>
>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>>> index 90f9feef9d8f..fab14679ca7b 100644
>>> --- a/net/vmw_vsock/virtio_transport_common.c
>>> +++ b/net/vmw_vsock/virtio_transport_common.c
>>> @@ -139,6 +139,7 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
>>> break;
>>> case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
>>> case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
>>> + case VIRTIO_VSOCK_OP_SEQ_BEGIN:
>>> hdr->op = cpu_to_le16(AF_VSOCK_OP_CONTROL);
>>> break;
>>> default:
>>> @@ -157,6 +158,10 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
>>>
>>> void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
>>> {
>>> + /* TODO: implement tap support for SOCK_SEQPACKET. */
>> I think we should do this before merging SOCK_SEQPACKET support because
>> it can be very useful to use tcpdump to figure out what's going on, do
>> you think it's complicated?
>
>It is not complicated if for TAP interface we will consider seqpacket
>
>same as stream, e.g. TAP reader won't see whole record as one
>
>packet,it will see fragments of record as in stream socket. Implement
Maybe as initial support we can have this limitation and complete it
later.
>
>normal interpretation of seqpacket is not complicated, but requires
>
>some extra work. Ok, i'll check it
Thanks,
Stefano
On 01.02.2021 14:02, Stefano Garzarella wrote:
> On Fri, Jan 29, 2021 at 06:52:23PM +0300, Arseny Krasnov wrote:
>> On 29.01.2021 12:26, Stefano Garzarella wrote:
>>> On Fri, Jan 29, 2021 at 09:41:50AM +0300, Arseny Krasnov wrote:
>>>> On 28.01.2021 20:19, Stefano Garzarella wrote:
>>>>> Hi Arseny,
>>>>> I reviewed a part, tomorrow I hope to finish the other patches.
>>>>>
>>>>> Just a couple of comments in the TODOs below.
>>>>>
>>>>> On Mon, Jan 25, 2021 at 02:09:00PM +0300, Arseny Krasnov wrote:
>>>>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>>>>> transport.
>>>>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>>>>> do it, new packet operation was added: it marks start of record (with
>>>>>> record length in header), such packet doesn't carry any data. To send
>>>>>> record, packet with start marker is sent first, then all data is sent
>>>>>> as usual 'RW' packets. On receiver's side, length of record is known
>>>>> >from packet with start record marker. Now as packets of one socket
>>>>>> are not reordered neither on vsock nor on vhost transport layers, such
>>>>>> marker allows to restore original record on receiver's side. If user's
>>>>>> buffer is smaller that record length, when all out of size data is
>>>>>> dropped.
>>>>>> Maximum length of datagram is not limited as in stream socket,
>>>>>> because same credit logic is used. Difference with stream socket is
>>>>>> that user is not woken up until whole record is received or error
>>>>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>>>>> Tests also implemented.
>>>>>>
>>>>>> Arseny Krasnov (13):
>>>>>> af_vsock: prepare for SOCK_SEQPACKET support
>>>>>> af_vsock: prepare 'vsock_connectible_recvmsg()'
>>>>>> af_vsock: implement SEQPACKET rx loop
>>>>>> af_vsock: implement send logic for SOCK_SEQPACKET
>>>>>> af_vsock: rest of SEQPACKET support
>>>>>> af_vsock: update comments for stream sockets
>>>>>> virtio/vsock: dequeue callback for SOCK_SEQPACKET
>>>>>> virtio/vsock: fetch length for SEQPACKET record
>>>>>> virtio/vsock: add SEQPACKET receive logic
>>>>>> virtio/vsock: rest of SOCK_SEQPACKET support
>>>>>> virtio/vsock: setup SEQPACKET ops for transport
>>>>>> vhost/vsock: setup SEQPACKET ops for transport
>>>>>> vsock_test: add SOCK_SEQPACKET tests
>>>>>>
>>>>>> drivers/vhost/vsock.c | 7 +-
>>>>>> include/linux/virtio_vsock.h | 12 +
>>>>>> include/net/af_vsock.h | 6 +
>>>>>> include/uapi/linux/virtio_vsock.h | 9 +
>>>>>> net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
>>>>>> net/vmw_vsock/virtio_transport.c | 4 +
>>>>>> net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
>>>>>> tools/testing/vsock/util.c | 32 +-
>>>>>> tools/testing/vsock/util.h | 3 +
>>>>>> tools/testing/vsock/vsock_test.c | 126 +++++
>>>>>> 10 files changed, 862 insertions(+), 175 deletions(-)
>>>>>>
>>>>>> TODO:
>>>>>> - Support for record integrity control. As transport could drop some
>>>>>> packets, something like "record-id" and record end marker need to
>>>>>> be implemented. Idea is that SEQ_BEGIN packet carries both record
>>>>>> length and record id, end marker(let it be SEQ_END) carries only
>>>>>> record id. To be sure that no one packet was lost, receiver checks
>>>>>> length of data between SEQ_BEGIN and SEQ_END(it must be same with
>>>>>> value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
>>>>>> means that both markers were not dropped. I think that easiest way
>>>>>> to implement record id for SEQ_BEGIN is to reuse another field of
>>>>>> packet header(SEQ_BEGIN already uses 'flags' as record length).For
>>>>>> SEQ_END record id could be stored in 'flags'.
>>>>> I don't really like the idea of reusing the 'flags' field for this
>>>>> purpose.
>>>>>
>>>>>> Another way to implement it, is to move metadata of both SEQ_END
>>>>>> and SEQ_BEGIN to payload. But this approach has problem, because
>>>>>> if we move something to payload, such payload is accounted by
>>>>>> credit logic, which fragments payload, while payload with record
>>>>>> length and id couldn't be fragmented. One way to overcome it is to
>>>>>> ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
>>>>>> is to update 'stream_has_space()' function: current implementation
>>>>>> return non-zero when at least 1 byte is allowed to use,but updated
>>>>>> version will have extra argument, which is needed length. For 'RW'
>>>>>> packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
>>>>>> record id) and for SEQ_END it is sizeof(record id).
>>>>> Is the payload accounted by credit logic also if hdr.op is not
>>>>> VIRTIO_VSOCK_OP_RW?
>>>> Yes, on send any packet with payload could be fragmented if
>>>>
>>>> there is not enough space at receiver. On receive 'fwd_cnt' and
>>>>
>>>> 'buf_alloc' are updated with header of every packet. Of course,
>>>>
>>>> to every such case i've described i can add check for 'RW'
>>>>
>>>> packet, to exclude payload from credit accounting, but this is
>>>>
>>>> bunch of dumb checks.
>>>>
>>>>> I think that we can define a specific header to put after the
>>>>> virtio_vsock_hdr when hdr.op is SEQ_BEGIN or SEQ_END, and in this header
>>>>> we can store the id and the length of the message.
>>>> I think it is better than use payload and touch credit logic
>>>>
>>> Cool, so let's try this option, hoping there aren't a lot of issues.
>> If i understand, current implementation has 'struct virtio_vsock_hdr',
>>
>> then i'll add 'struct virtio_vsock_hdr_seq' with message length and id.
>>
>> After that, in 'struct virtio_vsock_pkt' which describes packet, field for
>>
>> header(which is 'struct virtio_vsock_hdr') must be replaced with new
>>
>> structure which? contains both 'struct virtio_vsock_hdr' and 'struct
>>
>> virtio_vsock_hdr_seq', because header field of 'struct virtio_vsock_pkt'
>>
>> is buffer for virtio layer. After it all accesses to header(for example to
>>
>> 'buf_alloc' field will go accross new? structure with both headers:
>>
>> pkt->hdr.buf_alloc?? ->?? pkt->extended_hdr.classic_hdr.buf_alloc
>>
>> May be to avoid this, packet's header could be allocated dynamically
>>
>> in the same manner as packet's buffer? Size of allocation is always
>>
>> sizeof(classic header) + sizeof(seq header). In 'struct virtio_vsock_pkt'
>>
>> such header will be implemented as union of two pointers: class header
>>
>> and extended header containing classic and seq header. Which pointer
>>
>> to use is depends on packet's op.
> I think that the 'classic header' can stay as is, and the extended
> header can be dynamically allocated, as we do for the payload.
>
> But we have to be careful what happens if the other peer doesn't support
> SEQPACKET and if it counts this extra header as a payload for the credit
> mechanism.
You mean put extra header to payload(buffer of second virtio desc),
in this way on send/receive auxiliary 'if's are needed to avoid credit
logic(or set length field in header of such packets to 0). But what
about placing extra header after classic header in buffer of first virtio
desc? In this case extra header is not payload and credit works as is.
Or it is critical, that size of first buffer will be not same as size of
classic header?
>
> I'll try to take a closer look in the next few days.
>
> Thanks,
> Stefano
>
>
On Mon, Feb 01, 2021 at 04:57:18PM +0300, Arseny Krasnov wrote:
>
>On 01.02.2021 14:02, Stefano Garzarella wrote:
>> On Fri, Jan 29, 2021 at 06:52:23PM +0300, Arseny Krasnov wrote:
>>> On 29.01.2021 12:26, Stefano Garzarella wrote:
>>>> On Fri, Jan 29, 2021 at 09:41:50AM +0300, Arseny Krasnov wrote:
>>>>> On 28.01.2021 20:19, Stefano Garzarella wrote:
>>>>>> Hi Arseny,
>>>>>> I reviewed a part, tomorrow I hope to finish the other patches.
>>>>>>
>>>>>> Just a couple of comments in the TODOs below.
>>>>>>
>>>>>> On Mon, Jan 25, 2021 at 02:09:00PM +0300, Arseny Krasnov wrote:
>>>>>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>>>>>> transport.
>>>>>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>>>>>> do it, new packet operation was added: it marks start of record (with
>>>>>>> record length in header), such packet doesn't carry any data. To send
>>>>>>> record, packet with start marker is sent first, then all data is sent
>>>>>>> as usual 'RW' packets. On receiver's side, length of record is known
>>>>>> >from packet with start record marker. Now as packets of one socket
>>>>>>> are not reordered neither on vsock nor on vhost transport layers, such
>>>>>>> marker allows to restore original record on receiver's side. If user's
>>>>>>> buffer is smaller that record length, when all out of size data is
>>>>>>> dropped.
>>>>>>> Maximum length of datagram is not limited as in stream socket,
>>>>>>> because same credit logic is used. Difference with stream socket is
>>>>>>> that user is not woken up until whole record is received or error
>>>>>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>>>>>> Tests also implemented.
>>>>>>>
>>>>>>> Arseny Krasnov (13):
>>>>>>> af_vsock: prepare for SOCK_SEQPACKET support
>>>>>>> af_vsock: prepare 'vsock_connectible_recvmsg()'
>>>>>>> af_vsock: implement SEQPACKET rx loop
>>>>>>> af_vsock: implement send logic for SOCK_SEQPACKET
>>>>>>> af_vsock: rest of SEQPACKET support
>>>>>>> af_vsock: update comments for stream sockets
>>>>>>> virtio/vsock: dequeue callback for SOCK_SEQPACKET
>>>>>>> virtio/vsock: fetch length for SEQPACKET record
>>>>>>> virtio/vsock: add SEQPACKET receive logic
>>>>>>> virtio/vsock: rest of SOCK_SEQPACKET support
>>>>>>> virtio/vsock: setup SEQPACKET ops for transport
>>>>>>> vhost/vsock: setup SEQPACKET ops for transport
>>>>>>> vsock_test: add SOCK_SEQPACKET tests
>>>>>>>
>>>>>>> drivers/vhost/vsock.c | 7 +-
>>>>>>> include/linux/virtio_vsock.h | 12 +
>>>>>>> include/net/af_vsock.h | 6 +
>>>>>>> include/uapi/linux/virtio_vsock.h | 9 +
>>>>>>> net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
>>>>>>> net/vmw_vsock/virtio_transport.c | 4 +
>>>>>>> net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
>>>>>>> tools/testing/vsock/util.c | 32 +-
>>>>>>> tools/testing/vsock/util.h | 3 +
>>>>>>> tools/testing/vsock/vsock_test.c | 126 +++++
>>>>>>> 10 files changed, 862 insertions(+), 175 deletions(-)
>>>>>>>
>>>>>>> TODO:
>>>>>>> - Support for record integrity control. As transport could drop some
>>>>>>> packets, something like "record-id" and record end marker need to
>>>>>>> be implemented. Idea is that SEQ_BEGIN packet carries both record
>>>>>>> length and record id, end marker(let it be SEQ_END) carries only
>>>>>>> record id. To be sure that no one packet was lost, receiver checks
>>>>>>> length of data between SEQ_BEGIN and SEQ_END(it must be same with
>>>>>>> value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
>>>>>>> means that both markers were not dropped. I think that easiest way
>>>>>>> to implement record id for SEQ_BEGIN is to reuse another field of
>>>>>>> packet header(SEQ_BEGIN already uses 'flags' as record length).For
>>>>>>> SEQ_END record id could be stored in 'flags'.
>>>>>> I don't really like the idea of reusing the 'flags' field for this
>>>>>> purpose.
>>>>>>
>>>>>>> Another way to implement it, is to move metadata of both SEQ_END
>>>>>>> and SEQ_BEGIN to payload. But this approach has problem, because
>>>>>>> if we move something to payload, such payload is accounted by
>>>>>>> credit logic, which fragments payload, while payload with record
>>>>>>> length and id couldn't be fragmented. One way to overcome it is to
>>>>>>> ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
>>>>>>> is to update 'stream_has_space()' function: current implementation
>>>>>>> return non-zero when at least 1 byte is allowed to use,but updated
>>>>>>> version will have extra argument, which is needed length. For 'RW'
>>>>>>> packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
>>>>>>> record id) and for SEQ_END it is sizeof(record id).
>>>>>> Is the payload accounted by credit logic also if hdr.op is not
>>>>>> VIRTIO_VSOCK_OP_RW?
>>>>> Yes, on send any packet with payload could be fragmented if
>>>>>
>>>>> there is not enough space at receiver. On receive 'fwd_cnt' and
>>>>>
>>>>> 'buf_alloc' are updated with header of every packet. Of course,
>>>>>
>>>>> to every such case i've described i can add check for 'RW'
>>>>>
>>>>> packet, to exclude payload from credit accounting, but this is
>>>>>
>>>>> bunch of dumb checks.
>>>>>
>>>>>> I think that we can define a specific header to put after the
>>>>>> virtio_vsock_hdr when hdr.op is SEQ_BEGIN or SEQ_END, and in this header
>>>>>> we can store the id and the length of the message.
>>>>> I think it is better than use payload and touch credit logic
>>>>>
>>>> Cool, so let's try this option, hoping there aren't a lot of issues.
>>> If i understand, current implementation has 'struct
>>> virtio_vsock_hdr',
>>>
>>> then i'll add 'struct virtio_vsock_hdr_seq' with message length and id.
>>>
>>> After that, in 'struct virtio_vsock_pkt' which describes packet, field for
>>>
>>> header(which is 'struct virtio_vsock_hdr') must be replaced with new
>>>
>>> structure which? contains both 'struct virtio_vsock_hdr' and 'struct
>>>
>>> virtio_vsock_hdr_seq', because header field of 'struct virtio_vsock_pkt'
>>>
>>> is buffer for virtio layer. After it all accesses to header(for example to
>>>
>>> 'buf_alloc' field will go accross new? structure with both headers:
>>>
>>> pkt->hdr.buf_alloc?? ->?? pkt->extended_hdr.classic_hdr.buf_alloc
>>>
>>> May be to avoid this, packet's header could be allocated dynamically
>>>
>>> in the same manner as packet's buffer? Size of allocation is always
>>>
>>> sizeof(classic header) + sizeof(seq header). In 'struct virtio_vsock_pkt'
>>>
>>> such header will be implemented as union of two pointers: class header
>>>
>>> and extended header containing classic and seq header. Which pointer
>>>
>>> to use is depends on packet's op.
>> I think that the 'classic header' can stay as is, and the extended
>> header can be dynamically allocated, as we do for the payload.
>>
>> But we have to be careful what happens if the other peer doesn't support
>> SEQPACKET and if it counts this extra header as a payload for the credit
>> mechanism.
>
>You mean put extra header to payload(buffer of second virtio desc),
>
>in this way on send/receive auxiliary 'if's are needed to avoid credit
>
>logic(or set length field in header of such packets to 0). But what
>
>about placing extra header after classic header in buffer of first virtio
>
>desc? In this case extra header is not payload and credit works as is.
>
>Or it is critical, that size of first buffer will be not same as size of
>
>classic header?
We need to think about compatibility with old drivers.
What would happen in this case?
I think it's easier to use the second buffer, usually used for the
payload, to carry the extra header. Also, we can leave hdr.len = 0, so
we are sure that it is not counted in credit mechanism.
If the driver supports SEQPACKET, it knows it must fetch extra header
when it must handle SEQ_BEGIN/SEQ_END.
If it is not clear, I'll try to provide a simple PoC of a patch.
Thanks,
Stefano
On 01.02.2021 17:23, Stefano Garzarella wrote:
> On Mon, Feb 01, 2021 at 04:57:18PM +0300, Arseny Krasnov wrote:
>> On 01.02.2021 14:02, Stefano Garzarella wrote:
>>> On Fri, Jan 29, 2021 at 06:52:23PM +0300, Arseny Krasnov wrote:
>>>> On 29.01.2021 12:26, Stefano Garzarella wrote:
>>>>> On Fri, Jan 29, 2021 at 09:41:50AM +0300, Arseny Krasnov wrote:
>>>>>> On 28.01.2021 20:19, Stefano Garzarella wrote:
>>>>>>> Hi Arseny,
>>>>>>> I reviewed a part, tomorrow I hope to finish the other patches.
>>>>>>>
>>>>>>> Just a couple of comments in the TODOs below.
>>>>>>>
>>>>>>> On Mon, Jan 25, 2021 at 02:09:00PM +0300, Arseny Krasnov wrote:
>>>>>>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>>>>>>> transport.
>>>>>>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>>>>>>> do it, new packet operation was added: it marks start of record (with
>>>>>>>> record length in header), such packet doesn't carry any data. To send
>>>>>>>> record, packet with start marker is sent first, then all data is sent
>>>>>>>> as usual 'RW' packets. On receiver's side, length of record is known
>>>>>>> >from packet with start record marker. Now as packets of one socket
>>>>>>>> are not reordered neither on vsock nor on vhost transport layers, such
>>>>>>>> marker allows to restore original record on receiver's side. If user's
>>>>>>>> buffer is smaller that record length, when all out of size data is
>>>>>>>> dropped.
>>>>>>>> Maximum length of datagram is not limited as in stream socket,
>>>>>>>> because same credit logic is used. Difference with stream socket is
>>>>>>>> that user is not woken up until whole record is received or error
>>>>>>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>>>>>>> Tests also implemented.
>>>>>>>>
>>>>>>>> Arseny Krasnov (13):
>>>>>>>> af_vsock: prepare for SOCK_SEQPACKET support
>>>>>>>> af_vsock: prepare 'vsock_connectible_recvmsg()'
>>>>>>>> af_vsock: implement SEQPACKET rx loop
>>>>>>>> af_vsock: implement send logic for SOCK_SEQPACKET
>>>>>>>> af_vsock: rest of SEQPACKET support
>>>>>>>> af_vsock: update comments for stream sockets
>>>>>>>> virtio/vsock: dequeue callback for SOCK_SEQPACKET
>>>>>>>> virtio/vsock: fetch length for SEQPACKET record
>>>>>>>> virtio/vsock: add SEQPACKET receive logic
>>>>>>>> virtio/vsock: rest of SOCK_SEQPACKET support
>>>>>>>> virtio/vsock: setup SEQPACKET ops for transport
>>>>>>>> vhost/vsock: setup SEQPACKET ops for transport
>>>>>>>> vsock_test: add SOCK_SEQPACKET tests
>>>>>>>>
>>>>>>>> drivers/vhost/vsock.c | 7 +-
>>>>>>>> include/linux/virtio_vsock.h | 12 +
>>>>>>>> include/net/af_vsock.h | 6 +
>>>>>>>> include/uapi/linux/virtio_vsock.h | 9 +
>>>>>>>> net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
>>>>>>>> net/vmw_vsock/virtio_transport.c | 4 +
>>>>>>>> net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
>>>>>>>> tools/testing/vsock/util.c | 32 +-
>>>>>>>> tools/testing/vsock/util.h | 3 +
>>>>>>>> tools/testing/vsock/vsock_test.c | 126 +++++
>>>>>>>> 10 files changed, 862 insertions(+), 175 deletions(-)
>>>>>>>>
>>>>>>>> TODO:
>>>>>>>> - Support for record integrity control. As transport could drop some
>>>>>>>> packets, something like "record-id" and record end marker need to
>>>>>>>> be implemented. Idea is that SEQ_BEGIN packet carries both record
>>>>>>>> length and record id, end marker(let it be SEQ_END) carries only
>>>>>>>> record id. To be sure that no one packet was lost, receiver checks
>>>>>>>> length of data between SEQ_BEGIN and SEQ_END(it must be same with
>>>>>>>> value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
>>>>>>>> means that both markers were not dropped. I think that easiest way
>>>>>>>> to implement record id for SEQ_BEGIN is to reuse another field of
>>>>>>>> packet header(SEQ_BEGIN already uses 'flags' as record length).For
>>>>>>>> SEQ_END record id could be stored in 'flags'.
>>>>>>> I don't really like the idea of reusing the 'flags' field for this
>>>>>>> purpose.
>>>>>>>
>>>>>>>> Another way to implement it, is to move metadata of both SEQ_END
>>>>>>>> and SEQ_BEGIN to payload. But this approach has problem, because
>>>>>>>> if we move something to payload, such payload is accounted by
>>>>>>>> credit logic, which fragments payload, while payload with record
>>>>>>>> length and id couldn't be fragmented. One way to overcome it is to
>>>>>>>> ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
>>>>>>>> is to update 'stream_has_space()' function: current implementation
>>>>>>>> return non-zero when at least 1 byte is allowed to use,but updated
>>>>>>>> version will have extra argument, which is needed length. For 'RW'
>>>>>>>> packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
>>>>>>>> record id) and for SEQ_END it is sizeof(record id).
>>>>>>> Is the payload accounted by credit logic also if hdr.op is not
>>>>>>> VIRTIO_VSOCK_OP_RW?
>>>>>> Yes, on send any packet with payload could be fragmented if
>>>>>>
>>>>>> there is not enough space at receiver. On receive 'fwd_cnt' and
>>>>>>
>>>>>> 'buf_alloc' are updated with header of every packet. Of course,
>>>>>>
>>>>>> to every such case i've described i can add check for 'RW'
>>>>>>
>>>>>> packet, to exclude payload from credit accounting, but this is
>>>>>>
>>>>>> bunch of dumb checks.
>>>>>>
>>>>>>> I think that we can define a specific header to put after the
>>>>>>> virtio_vsock_hdr when hdr.op is SEQ_BEGIN or SEQ_END, and in this header
>>>>>>> we can store the id and the length of the message.
>>>>>> I think it is better than use payload and touch credit logic
>>>>>>
>>>>> Cool, so let's try this option, hoping there aren't a lot of issues.
>>>> If i understand, current implementation has 'struct
>>>> virtio_vsock_hdr',
>>>>
>>>> then i'll add 'struct virtio_vsock_hdr_seq' with message length and id.
>>>>
>>>> After that, in 'struct virtio_vsock_pkt' which describes packet, field for
>>>>
>>>> header(which is 'struct virtio_vsock_hdr') must be replaced with new
>>>>
>>>> structure which? contains both 'struct virtio_vsock_hdr' and 'struct
>>>>
>>>> virtio_vsock_hdr_seq', because header field of 'struct virtio_vsock_pkt'
>>>>
>>>> is buffer for virtio layer. After it all accesses to header(for example to
>>>>
>>>> 'buf_alloc' field will go accross new? structure with both headers:
>>>>
>>>> pkt->hdr.buf_alloc?? ->?? pkt->extended_hdr.classic_hdr.buf_alloc
>>>>
>>>> May be to avoid this, packet's header could be allocated dynamically
>>>>
>>>> in the same manner as packet's buffer? Size of allocation is always
>>>>
>>>> sizeof(classic header) + sizeof(seq header). In 'struct virtio_vsock_pkt'
>>>>
>>>> such header will be implemented as union of two pointers: class header
>>>>
>>>> and extended header containing classic and seq header. Which pointer
>>>>
>>>> to use is depends on packet's op.
>>> I think that the 'classic header' can stay as is, and the extended
>>> header can be dynamically allocated, as we do for the payload.
>>>
>>> But we have to be careful what happens if the other peer doesn't support
>>> SEQPACKET and if it counts this extra header as a payload for the credit
>>> mechanism.
>> You mean put extra header to payload(buffer of second virtio desc),
>>
>> in this way on send/receive auxiliary 'if's are needed to avoid credit
>>
>> logic(or set length field in header of such packets to 0). But what
>>
>> about placing extra header after classic header in buffer of first virtio
>>
>> desc? In this case extra header is not payload and credit works as is.
>>
>> Or it is critical, that size of first buffer will be not same as size of
>>
>> classic header?
> We need to think about compatibility with old drivers.
Yes, compatibility seems to be a trouble.
>
> What would happen in this case?
>
> I think it's easier to use the second buffer, usually used for the
> payload, to carry the extra header. Also, we can leave hdr.len = 0, so
> we are sure that it is not counted in credit mechanism.
Ok, that one of possible solutions. I just wanted to inform you,
that way i'll use in v4
> If the driver supports SEQPACKET, it knows it must fetch extra header
> when it must handle SEQ_BEGIN/SEQ_END.
>
> If it is not clear, I'll try to provide a simple PoC of a patch.
No, it is clear for me, i'll implement it in v4 also take care of
review comments.
Thank You
>
> Thanks,
> Stefano
>
>
On Mon, Feb 01, 2021 at 05:32:00PM +0300, Arseny Krasnov wrote:
>
>On 01.02.2021 17:23, Stefano Garzarella wrote:
>> On Mon, Feb 01, 2021 at 04:57:18PM +0300, Arseny Krasnov wrote:
>>> On 01.02.2021 14:02, Stefano Garzarella wrote:
>>>> On Fri, Jan 29, 2021 at 06:52:23PM +0300, Arseny Krasnov wrote:
>>>>> On 29.01.2021 12:26, Stefano Garzarella wrote:
>>>>>> On Fri, Jan 29, 2021 at 09:41:50AM +0300, Arseny Krasnov wrote:
>>>>>>> On 28.01.2021 20:19, Stefano Garzarella wrote:
>>>>>>>> Hi Arseny,
>>>>>>>> I reviewed a part, tomorrow I hope to finish the other patches.
>>>>>>>>
>>>>>>>> Just a couple of comments in the TODOs below.
>>>>>>>>
>>>>>>>> On Mon, Jan 25, 2021 at 02:09:00PM +0300, Arseny Krasnov wrote:
>>>>>>>>> This patchset impelements support of SOCK_SEQPACKET for virtio
>>>>>>>>> transport.
>>>>>>>>> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>>>>>>>>> do it, new packet operation was added: it marks start of record (with
>>>>>>>>> record length in header), such packet doesn't carry any data. To send
>>>>>>>>> record, packet with start marker is sent first, then all data is sent
>>>>>>>>> as usual 'RW' packets. On receiver's side, length of record is known
>>>>>>>> >from packet with start record marker. Now as packets of one socket
>>>>>>>>> are not reordered neither on vsock nor on vhost transport layers, such
>>>>>>>>> marker allows to restore original record on receiver's side. If user's
>>>>>>>>> buffer is smaller that record length, when all out of size data is
>>>>>>>>> dropped.
>>>>>>>>> Maximum length of datagram is not limited as in stream socket,
>>>>>>>>> because same credit logic is used. Difference with stream socket is
>>>>>>>>> that user is not woken up until whole record is received or error
>>>>>>>>> occurred. Implementation also supports 'MSG_EOR' and 'MSG_TRUNC' flags.
>>>>>>>>> Tests also implemented.
>>>>>>>>>
>>>>>>>>> Arseny Krasnov (13):
>>>>>>>>> af_vsock: prepare for SOCK_SEQPACKET support
>>>>>>>>> af_vsock: prepare 'vsock_connectible_recvmsg()'
>>>>>>>>> af_vsock: implement SEQPACKET rx loop
>>>>>>>>> af_vsock: implement send logic for SOCK_SEQPACKET
>>>>>>>>> af_vsock: rest of SEQPACKET support
>>>>>>>>> af_vsock: update comments for stream sockets
>>>>>>>>> virtio/vsock: dequeue callback for SOCK_SEQPACKET
>>>>>>>>> virtio/vsock: fetch length for SEQPACKET record
>>>>>>>>> virtio/vsock: add SEQPACKET receive logic
>>>>>>>>> virtio/vsock: rest of SOCK_SEQPACKET support
>>>>>>>>> virtio/vsock: setup SEQPACKET ops for transport
>>>>>>>>> vhost/vsock: setup SEQPACKET ops for transport
>>>>>>>>> vsock_test: add SOCK_SEQPACKET tests
>>>>>>>>>
>>>>>>>>> drivers/vhost/vsock.c | 7 +-
>>>>>>>>> include/linux/virtio_vsock.h | 12 +
>>>>>>>>> include/net/af_vsock.h | 6 +
>>>>>>>>> include/uapi/linux/virtio_vsock.h | 9 +
>>>>>>>>> net/vmw_vsock/af_vsock.c | 543 ++++++++++++++++------
>>>>>>>>> net/vmw_vsock/virtio_transport.c | 4 +
>>>>>>>>> net/vmw_vsock/virtio_transport_common.c | 295 ++++++++++--
>>>>>>>>> tools/testing/vsock/util.c | 32 +-
>>>>>>>>> tools/testing/vsock/util.h | 3 +
>>>>>>>>> tools/testing/vsock/vsock_test.c | 126 +++++
>>>>>>>>> 10 files changed, 862 insertions(+), 175 deletions(-)
>>>>>>>>>
>>>>>>>>> TODO:
>>>>>>>>> - Support for record integrity control. As transport could drop some
>>>>>>>>> packets, something like "record-id" and record end marker need to
>>>>>>>>> be implemented. Idea is that SEQ_BEGIN packet carries both record
>>>>>>>>> length and record id, end marker(let it be SEQ_END) carries only
>>>>>>>>> record id. To be sure that no one packet was lost, receiver checks
>>>>>>>>> length of data between SEQ_BEGIN and SEQ_END(it must be same with
>>>>>>>>> value in SEQ_BEGIN) and record ids of SEQ_BEGIN and SEQ_END(this
>>>>>>>>> means that both markers were not dropped. I think that easiest way
>>>>>>>>> to implement record id for SEQ_BEGIN is to reuse another field of
>>>>>>>>> packet header(SEQ_BEGIN already uses 'flags' as record length).For
>>>>>>>>> SEQ_END record id could be stored in 'flags'.
>>>>>>>> I don't really like the idea of reusing the 'flags' field for this
>>>>>>>> purpose.
>>>>>>>>
>>>>>>>>> Another way to implement it, is to move metadata of both SEQ_END
>>>>>>>>> and SEQ_BEGIN to payload. But this approach has problem, because
>>>>>>>>> if we move something to payload, such payload is accounted by
>>>>>>>>> credit logic, which fragments payload, while payload with record
>>>>>>>>> length and id couldn't be fragmented. One way to overcome it is to
>>>>>>>>> ignore credit update for SEQ_BEGIN/SEQ_END packet.Another solution
>>>>>>>>> is to update 'stream_has_space()' function: current implementation
>>>>>>>>> return non-zero when at least 1 byte is allowed to use,but updated
>>>>>>>>> version will have extra argument, which is needed length. For 'RW'
>>>>>>>>> packet this argument is 1, for SEQ_BEGIN it is sizeof(record len +
>>>>>>>>> record id) and for SEQ_END it is sizeof(record id).
>>>>>>>> Is the payload accounted by credit logic also if hdr.op is not
>>>>>>>> VIRTIO_VSOCK_OP_RW?
>>>>>>> Yes, on send any packet with payload could be fragmented if
>>>>>>>
>>>>>>> there is not enough space at receiver. On receive 'fwd_cnt' and
>>>>>>>
>>>>>>> 'buf_alloc' are updated with header of every packet. Of course,
>>>>>>>
>>>>>>> to every such case i've described i can add check for 'RW'
>>>>>>>
>>>>>>> packet, to exclude payload from credit accounting, but this is
>>>>>>>
>>>>>>> bunch of dumb checks.
>>>>>>>
>>>>>>>> I think that we can define a specific header to put after the
>>>>>>>> virtio_vsock_hdr when hdr.op is SEQ_BEGIN or SEQ_END, and in this header
>>>>>>>> we can store the id and the length of the message.
>>>>>>> I think it is better than use payload and touch credit logic
>>>>>>>
>>>>>> Cool, so let's try this option, hoping there aren't a lot of issues.
>>>>> If i understand, current implementation has 'struct
>>>>> virtio_vsock_hdr',
>>>>>
>>>>> then i'll add 'struct virtio_vsock_hdr_seq' with message length and id.
>>>>>
>>>>> After that, in 'struct virtio_vsock_pkt' which describes packet, field for
>>>>>
>>>>> header(which is 'struct virtio_vsock_hdr') must be replaced with new
>>>>>
>>>>> structure which? contains both 'struct virtio_vsock_hdr' and 'struct
>>>>>
>>>>> virtio_vsock_hdr_seq', because header field of 'struct virtio_vsock_pkt'
>>>>>
>>>>> is buffer for virtio layer. After it all accesses to header(for example to
>>>>>
>>>>> 'buf_alloc' field will go accross new? structure with both headers:
>>>>>
>>>>> pkt->hdr.buf_alloc?? ->?? pkt->extended_hdr.classic_hdr.buf_alloc
>>>>>
>>>>> May be to avoid this, packet's header could be allocated dynamically
>>>>>
>>>>> in the same manner as packet's buffer? Size of allocation is always
>>>>>
>>>>> sizeof(classic header) + sizeof(seq header). In 'struct virtio_vsock_pkt'
>>>>>
>>>>> such header will be implemented as union of two pointers: class header
>>>>>
>>>>> and extended header containing classic and seq header. Which pointer
>>>>>
>>>>> to use is depends on packet's op.
>>>> I think that the 'classic header' can stay as is, and the extended
>>>> header can be dynamically allocated, as we do for the payload.
>>>>
>>>> But we have to be careful what happens if the other peer doesn't support
>>>> SEQPACKET and if it counts this extra header as a payload for the credit
>>>> mechanism.
>>> You mean put extra header to payload(buffer of second virtio desc),
>>>
>>> in this way on send/receive auxiliary 'if's are needed to avoid credit
>>>
>>> logic(or set length field in header of such packets to 0). But what
>>>
>>> about placing extra header after classic header in buffer of first virtio
>>>
>>> desc? In this case extra header is not payload and credit works as is.
>>>
>>> Or it is critical, that size of first buffer will be not same as size of
>>>
>>> classic header?
>> We need to think about compatibility with old drivers.
>Yes, compatibility seems to be a trouble.
>>
>> What would happen in this case?
>>
>> I think it's easier to use the second buffer, usually used for the
>> payload, to carry the extra header. Also, we can leave hdr.len = 0, so
>> we are sure that it is not counted in credit mechanism.
>
>Ok, that one of possible solutions. I just wanted to inform you,
>
>that way i'll use in v4
>
>> If the driver supports SEQPACKET, it knows it must fetch extra header
>> when it must handle SEQ_BEGIN/SEQ_END.
>>
>> If it is not clear, I'll try to provide a simple PoC of a patch.
>
>No, it is clear for me, i'll implement it in v4 also take care of
>
>review comments.
Great! Let me know if any issues we haven't considered come up.
Stefano