As SOCK_SEQPACKET guarantees to save record boundaries, so to
do it, new packet operation was added: it marks start of record (with
record length in header). To send record, packet with start marker is
sent first, then all data is transmitted as 'RW' packets. On receiver's
side, length of record is known from packet with start record marker.
Now as packets of one socket are not reordered neither on vsock nor on
vhost transport layers, these marker allows to restore original record
on receiver's side. When each 'RW' packet is inserted to rx queue of
receiver, user is woken up, data is copied to user's buffer and credit
update message is sent. If there is no user waiting for data, credit
won't be updated and sender will wait. Also, if user's buffer is full,
and record is bigger, all unneeded data will be dropped (with sending of
credit update message).
'MSG_EOR' flag is implemented with special value of 'flags' field
in packet header. When record is received with such flags, 'MSG_EOR' is
set in 'recvmsg()' flags. 'MSG_TRUNC' flag is also supported.
In this implementation maximum length of datagram is not limited
as in stream socket.
drivers/vhost/vsock.c | 6 +-
include/linux/virtio_vsock.h | 7 +
include/net/af_vsock.h | 4 +
include/uapi/linux/virtio_vsock.h | 9 +
net/vmw_vsock/af_vsock.c | 457 +++++++++++++++++++-----
net/vmw_vsock/virtio_transport.c | 3 +
net/vmw_vsock/virtio_transport_common.c | 323 ++++++++++++++---
7 files changed, 673 insertions(+), 136 deletions(-)
--
2.25.1
This extends rx loop for SOCK_SEQPACKET packets and implements
callback which user calls to copy data to its buffer.
Signed-off-by: Arseny Krasnov <[email protected]>
---
include/linux/virtio_vsock.h | 7 +
include/net/af_vsock.h | 4 +
include/uapi/linux/virtio_vsock.h | 9 +
net/vmw_vsock/virtio_transport.c | 3 +
net/vmw_vsock/virtio_transport_common.c | 323 +++++++++++++++++++++---
5 files changed, 305 insertions(+), 41 deletions(-)
diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index dc636b727179..4902d71b3252 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -36,6 +36,10 @@ struct virtio_vsock_sock {
u32 rx_bytes;
u32 buf_alloc;
struct list_head rx_queue;
+
+ /* For SOCK_SEQPACKET */
+ u32 user_read_seq_len;
+ u32 user_read_copied;
};
struct virtio_vsock_pkt {
@@ -80,6 +84,9 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags);
+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len);
+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
+
s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index b1c717286993..792ea7b66574 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -135,6 +135,10 @@ struct vsock_transport {
bool (*stream_is_active)(struct vsock_sock *);
bool (*stream_allow)(u32 cid, u32 port);
+ /* SEQ_PACKET. */
+ bool (*seqpacket_seq_send_len)(struct vsock_sock *, size_t len);
+ size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
+
/* Notification. */
int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
index 1d57ed3d84d2..058908bc19fc 100644
--- a/include/uapi/linux/virtio_vsock.h
+++ b/include/uapi/linux/virtio_vsock.h
@@ -65,6 +65,7 @@ struct virtio_vsock_hdr {
enum virtio_vsock_type {
VIRTIO_VSOCK_TYPE_STREAM = 1,
+ VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
};
enum virtio_vsock_op {
@@ -83,6 +84,9 @@ enum virtio_vsock_op {
VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
/* Request the peer to send the credit info to us */
VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
+
+ /* Record begin for SOCK_SEQPACKET */
+ VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
};
/* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
@@ -91,4 +95,9 @@ enum virtio_vsock_shutdown {
VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
};
+/* VIRTIO_VSOCK_OP_RW flags values for SOCK_SEQPACKET type */
+enum virtio_vsock_rw_seqpacket {
+ VIRTIO_VSOCK_RW_EOR = 1,
+};
+
#endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
index 2700a63ab095..2bd3f7cbffcb 100644
--- a/net/vmw_vsock/virtio_transport.c
+++ b/net/vmw_vsock/virtio_transport.c
@@ -469,6 +469,9 @@ static struct virtio_transport virtio_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,
+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 5956939eebb7..77c42004e422 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -139,6 +139,7 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
break;
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN:
hdr->op = cpu_to_le16(AF_VSOCK_OP_CONTROL);
break;
default:
@@ -157,6 +158,10 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
{
+ /* TODO: implement tap support for SOCK_SEQPACKET. */
+ if (le32_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET)
+ return;
+
if (pkt->tap_delivered)
return;
@@ -230,10 +235,10 @@ static bool virtio_transport_inc_rx_pkt(struct virtio_vsock_sock *vvs,
}
static void virtio_transport_dec_rx_pkt(struct virtio_vsock_sock *vvs,
- struct virtio_vsock_pkt *pkt)
+ u32 len)
{
- vvs->rx_bytes -= pkt->len;
- vvs->fwd_cnt += pkt->len;
+ vvs->rx_bytes -= len;
+ vvs->fwd_cnt += len;
}
void virtio_transport_inc_tx_pkt(struct virtio_vsock_sock *vvs, struct virtio_vsock_pkt *pkt)
@@ -365,7 +370,7 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
total += bytes;
pkt->off += bytes;
if (pkt->off == pkt->len) {
- virtio_transport_dec_rx_pkt(vvs, pkt);
+ virtio_transport_dec_rx_pkt(vvs, pkt->len);
list_del(&pkt->list);
virtio_transport_free_pkt(pkt);
}
@@ -397,15 +402,202 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
return err;
}
+static u16 virtio_transport_get_type(struct sock *sk)
+{
+ if (sk->sk_type == SOCK_STREAM)
+ return VIRTIO_VSOCK_TYPE_STREAM;
+ else
+ return VIRTIO_VSOCK_TYPE_SEQPACKET;
+}
+
+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len)
+{
+ struct virtio_vsock_pkt_info info = {
+ .type = VIRTIO_VSOCK_TYPE_SEQPACKET,
+ .op = VIRTIO_VSOCK_OP_SEQ_BEGIN,
+ .vsk = vsk,
+ .flags = len
+ };
+
+ return virtio_transport_send_pkt_info(vsk, &info);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_send_len);
+
+static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
+{
+ list_del(&pkt->list);
+ virtio_transport_free_pkt(pkt);
+}
+
+static size_t virtio_transport_drop_until_seq_begin(struct virtio_vsock_sock *vvs)
+{
+ struct virtio_vsock_pkt *pkt, *n;
+ size_t bytes_dropped = 0;
+
+ list_for_each_entry_safe(pkt, n, &vvs->rx_queue, list) {
+ if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_SEQ_BEGIN)
+ break;
+
+ bytes_dropped += le32_to_cpu(pkt->hdr.len);
+ virtio_transport_dec_rx_pkt(vvs, pkt->len);
+ virtio_transport_del_n_free_pkt(pkt);
+ }
+
+ return bytes_dropped;
+}
+
+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk)
+{
+ struct virtio_vsock_sock *vvs = vsk->trans;
+ struct virtio_vsock_pkt *pkt;
+ size_t bytes_dropped;
+
+ spin_lock_bh(&vvs->rx_lock);
+
+ /* Fetch all orphaned 'RW', packets, and
+ * send credit update.
+ */
+ bytes_dropped = virtio_transport_drop_until_seq_begin(vvs);
+
+ if (list_empty(&vvs->rx_queue))
+ goto out;
+
+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+ vvs->user_read_copied = 0;
+ vvs->user_read_seq_len = le32_to_cpu(pkt->hdr.flags);
+ virtio_transport_del_n_free_pkt(pkt);
+out:
+ spin_unlock_bh(&vvs->rx_lock);
+
+ if (bytes_dropped)
+ virtio_transport_send_credit_update(vsk,
+ VIRTIO_VSOCK_TYPE_SEQPACKET,
+ NULL);
+
+ return vvs->user_read_seq_len;
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_get_len);
+
+static ssize_t virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
+ struct msghdr *msg,
+ size_t user_buf_len)
+{
+ struct virtio_vsock_sock *vvs = vsk->trans;
+ struct virtio_vsock_pkt *pkt;
+ size_t bytes_handled = 0;
+ int err = 0;
+
+ spin_lock_bh(&vvs->rx_lock);
+
+ if (user_buf_len == 0) {
+ /* User's buffer is full, we processing rest of
+ * record and drop it. If 'SEQ_BEGIN' is found
+ * while iterating, user will be woken up,
+ * because record is already copied, and we
+ * don't care about absent of some tail RW packets
+ * of it. Return number of bytes(rest of record),
+ * but ignore credit update for such absent bytes.
+ */
+ bytes_handled = virtio_transport_drop_until_seq_begin(vvs);
+ vvs->user_read_copied += bytes_handled;
+
+ if (!list_empty(&vvs->rx_queue) &&
+ vvs->user_read_copied < vvs->user_read_seq_len) {
+ /* 'SEQ_BEGIN' found, but record isn't complete.
+ * Set number of copied bytes to fit record size
+ * and force counters to finish receiving.
+ */
+ bytes_handled += (vvs->user_read_seq_len - vvs->user_read_copied);
+ vvs->user_read_copied = vvs->user_read_seq_len;
+ }
+ }
+
+ /* Now start copying. */
+ while (vvs->user_read_copied < vvs->user_read_seq_len &&
+ vvs->rx_bytes &&
+ user_buf_len &&
+ !err) {
+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+ switch (le16_to_cpu(pkt->hdr.op)) {
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
+ /* Unexpected 'SEQ_BEGIN' during record copy:
+ * Leave receive loop, 'EAGAIN' will restart it from
+ * outer receive loop, packet is still in queue and
+ * counters are cleared. So in next loop enter,
+ * 'SEQ_BEGIN' will be dequeued first. User's iov
+ * iterator will be reset in outer loop. Also
+ * send credit update, because some bytes could be
+ * copied. User will never see unfinished record.
+ */
+ err = -EAGAIN;
+ break;
+ }
+ case VIRTIO_VSOCK_OP_RW: {
+ size_t bytes_to_copy;
+ size_t pkt_len;
+
+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
+ bytes_to_copy = min(user_buf_len, pkt_len);
+
+ /* sk_lock is held by caller so no one else can dequeue.
+ * Unlock rx_lock since memcpy_to_msg() may sleep.
+ */
+ spin_unlock_bh(&vvs->rx_lock);
+
+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
+ spin_lock_bh(&vvs->rx_lock);
+ err = -EINVAL;
+ break;
+ }
+
+ spin_lock_bh(&vvs->rx_lock);
+ user_buf_len -= bytes_to_copy;
+ bytes_handled += pkt->len;
+ vvs->user_read_copied += bytes_to_copy;
+
+ if (le16_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_RW_EOR)
+ msg->msg_flags |= MSG_EOR;
+ break;
+ }
+ default:
+ ;
+ }
+
+ /* For unexpected 'SEQ_BEGIN', keep such packet in queue,
+ * but drop any other type of packet.
+ */
+ if (le16_to_cpu(pkt->hdr.op) != VIRTIO_VSOCK_OP_SEQ_BEGIN) {
+ virtio_transport_dec_rx_pkt(vvs, pkt->len);
+ virtio_transport_del_n_free_pkt(pkt);
+ }
+ }
+
+ spin_unlock_bh(&vvs->rx_lock);
+
+ virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_SEQPACKET,
+ NULL);
+
+ return err ?: bytes_handled;
+}
+
ssize_t
virtio_transport_stream_dequeue(struct vsock_sock *vsk,
struct msghdr *msg,
size_t len, int flags)
{
- if (flags & MSG_PEEK)
- return virtio_transport_stream_do_peek(vsk, msg, len);
- else
+ if (virtio_transport_get_type(sk_vsock(vsk)) == VIRTIO_VSOCK_TYPE_SEQPACKET) {
+ if (flags & MSG_PEEK)
+ return -EOPNOTSUPP;
+
+ return virtio_transport_seqpacket_do_dequeue(vsk, msg, len);
+ } else {
+ if (flags & MSG_PEEK)
+ return virtio_transport_stream_do_peek(vsk, msg, len);
+
return virtio_transport_stream_do_dequeue(vsk, msg, len);
+ }
}
EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
@@ -481,6 +673,8 @@ int virtio_transport_do_socket_init(struct vsock_sock *vsk,
spin_lock_init(&vvs->rx_lock);
spin_lock_init(&vvs->tx_lock);
INIT_LIST_HEAD(&vvs->rx_queue);
+ vvs->user_read_copied = 0;
+ vvs->user_read_seq_len = 0;
return 0;
}
@@ -490,13 +684,16 @@ EXPORT_SYMBOL_GPL(virtio_transport_do_socket_init);
void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)
{
struct virtio_vsock_sock *vvs = vsk->trans;
+ int type;
if (*val > VIRTIO_VSOCK_MAX_BUF_SIZE)
*val = VIRTIO_VSOCK_MAX_BUF_SIZE;
vvs->buf_alloc = *val;
- virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_STREAM,
+ type = virtio_transport_get_type(sk_vsock(vsk));
+
+ virtio_transport_send_credit_update(vsk, type,
NULL);
}
EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);
@@ -624,10 +821,11 @@ int virtio_transport_connect(struct vsock_sock *vsk)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_REQUEST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.vsk = vsk,
};
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
EXPORT_SYMBOL_GPL(virtio_transport_connect);
@@ -636,7 +834,6 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_SHUTDOWN,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.flags = (mode & RCV_SHUTDOWN ?
VIRTIO_VSOCK_SHUTDOWN_RCV : 0) |
(mode & SEND_SHUTDOWN ?
@@ -644,6 +841,8 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
.vsk = vsk,
};
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
EXPORT_SYMBOL_GPL(virtio_transport_shutdown);
@@ -665,12 +864,18 @@ virtio_transport_stream_enqueue(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RW,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.msg = msg,
.pkt_len = len,
.vsk = vsk,
+ .flags = 0,
};
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
+ if (info.type == VIRTIO_VSOCK_TYPE_SEQPACKET &&
+ msg->msg_flags & MSG_EOR)
+ info.flags |= VIRTIO_VSOCK_RW_EOR;
+
return virtio_transport_send_pkt_info(vsk, &info);
}
EXPORT_SYMBOL_GPL(virtio_transport_stream_enqueue);
@@ -688,7 +893,6 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RST,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.reply = !!pkt,
.vsk = vsk,
};
@@ -697,6 +901,8 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
if (pkt && le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RST)
return 0;
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
@@ -884,44 +1090,59 @@ virtio_transport_recv_connecting(struct sock *sk,
return err;
}
-static void
+static bool
virtio_transport_recv_enqueue(struct vsock_sock *vsk,
struct virtio_vsock_pkt *pkt)
{
struct virtio_vsock_sock *vvs = vsk->trans;
- bool can_enqueue, free_pkt = false;
+ bool data_ready = false;
+ bool free_pkt = false;
- pkt->len = le32_to_cpu(pkt->hdr.len);
pkt->off = 0;
+ pkt->len = le32_to_cpu(pkt->hdr.len);
spin_lock_bh(&vvs->rx_lock);
- can_enqueue = virtio_transport_inc_rx_pkt(vvs, pkt);
- if (!can_enqueue) {
- free_pkt = true;
- goto out;
- }
+ switch (le32_to_cpu(pkt->hdr.type)) {
+ case VIRTIO_VSOCK_TYPE_STREAM: {
+ if (!virtio_transport_inc_rx_pkt(vvs, pkt)) {
+ free_pkt = true;
+ goto out;
+ }
- /* Try to copy small packets into the buffer of last packet queued,
- * to avoid wasting memory queueing the entire buffer with a small
- * payload.
- */
- if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
- struct virtio_vsock_pkt *last_pkt;
+ /* Try to copy small packets into the buffer of last packet queued,
+ * to avoid wasting memory queueing the entire buffer with a small
+ * payload.
+ */
+ if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
+ struct virtio_vsock_pkt *last_pkt;
- last_pkt = list_last_entry(&vvs->rx_queue,
- struct virtio_vsock_pkt, list);
+ last_pkt = list_last_entry(&vvs->rx_queue,
+ struct virtio_vsock_pkt, list);
- /* If there is space in the last packet queued, we copy the
- * new packet in its buffer.
- */
- if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
- memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
- pkt->len);
- last_pkt->len += pkt->len;
- free_pkt = true;
- goto out;
+ /* If there is space in the last packet queued, we copy the
+ * new packet in its buffer.
+ */
+ if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
+ memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
+ pkt->len);
+ last_pkt->len += pkt->len;
+ free_pkt = true;
+ goto out;
+ }
}
+
+ data_ready = true;
+ break;
+ }
+
+ case VIRTIO_VSOCK_TYPE_SEQPACKET: {
+ data_ready = true;
+ vvs->rx_bytes += pkt->len;
+ break;
+ }
+ default:
+ goto out;
}
list_add_tail(&pkt->list, &vvs->rx_queue);
@@ -930,6 +1151,8 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
spin_unlock_bh(&vvs->rx_lock);
if (free_pkt)
virtio_transport_free_pkt(pkt);
+
+ return data_ready;
}
static int
@@ -940,9 +1163,17 @@ virtio_transport_recv_connected(struct sock *sk,
int err = 0;
switch (le16_to_cpu(pkt->hdr.op)) {
+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
+ struct virtio_vsock_sock *vvs = vsk->trans;
+
+ spin_lock_bh(&vvs->rx_lock);
+ list_add_tail(&pkt->list, &vvs->rx_queue);
+ spin_unlock_bh(&vvs->rx_lock);
+ return err;
+ }
case VIRTIO_VSOCK_OP_RW:
- virtio_transport_recv_enqueue(vsk, pkt);
- sk->sk_data_ready(sk);
+ if (virtio_transport_recv_enqueue(vsk, pkt))
+ sk->sk_data_ready(sk);
return err;
case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
sk->sk_write_space(sk);
@@ -990,13 +1221,14 @@ virtio_transport_send_response(struct vsock_sock *vsk,
{
struct virtio_vsock_pkt_info info = {
.op = VIRTIO_VSOCK_OP_RESPONSE,
- .type = VIRTIO_VSOCK_TYPE_STREAM,
.remote_cid = le64_to_cpu(pkt->hdr.src_cid),
.remote_port = le32_to_cpu(pkt->hdr.src_port),
.reply = true,
.vsk = vsk,
};
+ info.type = virtio_transport_get_type(sk_vsock(vsk));
+
return virtio_transport_send_pkt_info(vsk, &info);
}
@@ -1086,6 +1318,12 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
return 0;
}
+static bool virtio_transport_valid_type(u16 type)
+{
+ return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
+ (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
+}
+
/* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
* lock.
*/
@@ -1111,7 +1349,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
le32_to_cpu(pkt->hdr.buf_alloc),
le32_to_cpu(pkt->hdr.fwd_cnt));
- if (le16_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_STREAM) {
+ if (!virtio_transport_valid_type(le16_to_cpu(pkt->hdr.type))) {
(void)virtio_transport_reset_no_sock(t, pkt);
goto free_pkt;
}
@@ -1128,6 +1366,9 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
}
}
+ if (virtio_transport_get_type(sk) != le16_to_cpu(pkt->hdr.type))
+ goto free_pkt;
+
vsk = vsock_sk(sk);
space_available = virtio_transport_space_update(sk, pkt);
--
2.25.1
This patch simply adds transport ops and removes
ignore of non-stream type of packets.
Signed-off-by: Arseny Krasnov <[email protected]>
---
drivers/vhost/vsock.c | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index a483cec31d5c..4a36ef1c52d0 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -346,8 +346,7 @@ vhost_vsock_alloc_pkt(struct vhost_virtqueue *vq,
return NULL;
}
- if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM)
- pkt->len = le32_to_cpu(pkt->hdr.len);
+ pkt->len = le32_to_cpu(pkt->hdr.len);
/* No payload */
if (!pkt->len)
@@ -416,6 +415,9 @@ static struct virtio_transport vhost_transport = {
.stream_is_active = virtio_transport_stream_is_active,
.stream_allow = virtio_transport_stream_allow,
+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
+
.notify_poll_in = virtio_transport_notify_poll_in,
.notify_poll_out = virtio_transport_notify_poll_out,
.notify_recv_init = virtio_transport_notify_recv_init,
--
2.25.1
From: Arseniy Krasnov <[email protected]>
---
net/vmw_vsock/af_vsock.c | 107 +++++++++++++++++++++++++++++++++------
1 file changed, 91 insertions(+), 16 deletions(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 7ff00449a9a2..30caad9815f7 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -452,6 +452,7 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
new_transport = transport_dgram;
break;
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
if (vsock_use_local_transport(remote_cid))
new_transport = transport_local;
else if (remote_cid <= VMADDR_CID_HOST || !transport_h2g ||
@@ -459,6 +460,12 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
new_transport = transport_g2h;
else
new_transport = transport_h2g;
+
+ if (sk->sk_type == SOCK_SEQPACKET) {
+ if (!new_transport->seqpacket_seq_send_len ||
+ !new_transport->seqpacket_seq_get_len)
+ return -ENODEV;
+ }
break;
default:
return -ESOCKTNOSUPPORT;
@@ -604,8 +611,8 @@ static void vsock_pending_work(struct work_struct *work)
/**** SOCKET OPERATIONS ****/
-static int __vsock_bind_stream(struct vsock_sock *vsk,
- struct sockaddr_vm *addr)
+static int __vsock_bind_connectible(struct vsock_sock *vsk,
+ struct sockaddr_vm *addr)
{
static u32 port;
struct sockaddr_vm new_addr;
@@ -684,8 +691,9 @@ static int __vsock_bind(struct sock *sk, struct sockaddr_vm *addr)
switch (sk->sk_socket->type) {
case SOCK_STREAM:
+ case SOCK_SEQPACKET:
spin_lock_bh(&vsock_table_lock);
- retval = __vsock_bind_stream(vsk, addr);
+ retval = __vsock_bind_connectible(vsk, addr);
spin_unlock_bh(&vsock_table_lock);
break;
@@ -767,6 +775,11 @@ static struct sock *__vsock_create(struct net *net,
return sk;
}
+static bool sock_type_connectible(u16 type)
+{
+ return (type == SOCK_STREAM || type == SOCK_SEQPACKET);
+}
+
static void __vsock_release(struct sock *sk, int level)
{
if (sk) {
@@ -785,7 +798,7 @@ static void __vsock_release(struct sock *sk, int level)
if (vsk->transport)
vsk->transport->release(vsk);
- else if (sk->sk_type == SOCK_STREAM)
+ else if (sock_type_connectible(sk->sk_type))
vsock_remove_sock(vsk);
sock_orphan(sk);
@@ -945,7 +958,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
sk = sock->sk;
if (sock->state == SS_UNCONNECTED) {
err = -ENOTCONN;
- if (sk->sk_type == SOCK_STREAM)
+ if (sock_type_connectible(sk->sk_type))
return err;
} else {
sock->state = SS_DISCONNECTING;
@@ -960,7 +973,7 @@ static int vsock_shutdown(struct socket *sock, int mode)
sk->sk_state_change(sk);
release_sock(sk);
- if (sk->sk_type == SOCK_STREAM) {
+ if (sock_type_connectible(sk->sk_type)) {
sock_reset_flag(sk, SOCK_DONE);
vsock_send_shutdown(sk, mode);
}
@@ -1013,7 +1026,7 @@ static __poll_t vsock_poll(struct file *file, struct socket *sock,
if (!(sk->sk_shutdown & SEND_SHUTDOWN))
mask |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND;
- } else if (sock->type == SOCK_STREAM) {
+ } else if (sock_type_connectible(sk->sk_type)) {
const struct vsock_transport *transport = vsk->transport;
lock_sock(sk);
@@ -1259,8 +1272,8 @@ static void vsock_connect_timeout(struct work_struct *work)
sock_put(sk);
}
-static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
- int addr_len, int flags)
+static int vsock_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
{
int err;
struct sock *sk;
@@ -1410,7 +1423,7 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
lock_sock(listener);
- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sock->type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1477,6 +1490,18 @@ static int vsock_accept(struct socket *sock, struct socket *newsock, int flags,
return err;
}
+static int vsock_stream_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
+{
+ return vsock_connect(sock, addr, addr_len, flags);
+}
+
+static int vsock_seqpacket_connect(struct socket *sock, struct sockaddr *addr,
+ int addr_len, int flags)
+{
+ return vsock_connect(sock, addr, addr_len, flags);
+}
+
static int vsock_listen(struct socket *sock, int backlog)
{
int err;
@@ -1487,7 +1512,7 @@ static int vsock_listen(struct socket *sock, int backlog)
lock_sock(sk);
- if (sock->type != SOCK_STREAM) {
+ if (!sock_type_connectible(sk->sk_type)) {
err = -EOPNOTSUPP;
goto out;
}
@@ -1531,11 +1556,11 @@ static void vsock_update_buffer_size(struct vsock_sock *vsk,
vsk->buffer_size = val;
}
-static int vsock_stream_setsockopt(struct socket *sock,
- int level,
- int optname,
- sockptr_t optval,
- unsigned int optlen)
+static int vsock_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
{
int err;
struct sock *sk;
@@ -1612,6 +1637,24 @@ static int vsock_stream_setsockopt(struct socket *sock,
return err;
}
+static int vsock_seqpacket_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
+{
+ return vsock_setsockopt(sock, level, optname, optval, optlen);
+}
+
+static int vsock_stream_setsockopt(struct socket *sock,
+ int level,
+ int optname,
+ sockptr_t optval,
+ unsigned int optlen)
+{
+ return vsock_setsockopt(sock, level, optname, optval, optlen);
+}
+
static int vsock_stream_getsockopt(struct socket *sock,
int level, int optname,
char __user *optval,
@@ -1683,6 +1726,14 @@ static int vsock_stream_getsockopt(struct socket *sock,
return 0;
}
+static int vsock_seqpacket_getsockopt(struct socket *sock,
+ int level, int optname,
+ char __user *optval,
+ int __user *optlen)
+{
+ return vsock_stream_getsockopt(sock, level, optname, optval, optlen);
+}
+
static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
size_t len)
{
@@ -2209,6 +2260,27 @@ static const struct proto_ops vsock_stream_ops = {
.sendpage = sock_no_sendpage,
};
+static const struct proto_ops vsock_seqpacket_ops = {
+ .family = PF_VSOCK,
+ .owner = THIS_MODULE,
+ .release = vsock_release,
+ .bind = vsock_bind,
+ .connect = vsock_seqpacket_connect,
+ .socketpair = sock_no_socketpair,
+ .accept = vsock_accept,
+ .getname = vsock_getname,
+ .poll = vsock_poll,
+ .ioctl = sock_no_ioctl,
+ .listen = vsock_listen,
+ .shutdown = vsock_shutdown,
+ .setsockopt = vsock_seqpacket_setsockopt,
+ .getsockopt = vsock_seqpacket_getsockopt,
+ .sendmsg = vsock_seqpacket_sendmsg,
+ .recvmsg = vsock_seqpacket_recvmsg,
+ .mmap = sock_no_mmap,
+ .sendpage = sock_no_sendpage,
+};
+
static int vsock_create(struct net *net, struct socket *sock,
int protocol, int kern)
{
@@ -2229,6 +2301,9 @@ static int vsock_create(struct net *net, struct socket *sock,
case SOCK_STREAM:
sock->ops = &vsock_stream_ops;
break;
+ case SOCK_SEQPACKET:
+ sock->ops = &vsock_seqpacket_ops;
+ break;
default:
return -ESOCKTNOSUPPORT;
}
--
2.25.1
From: Arseniy Krasnov <[email protected]>
For send, this patch adds:
1) Send of record begin marker with record length.
2) Return error if send of whole record is failed.
For receive, this patch adds another loop, it looks like
stream loop, but:
1) It doesn't call notify callbacks.
2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT'
values.
3) It waits until whole record is received or error is
found during receiving.
3) It processes and sets 'MSG_TRUNC' flag.
---
net/vmw_vsock/af_vsock.c | 319 +++++++++++++++++++++++++++++++--------
1 file changed, 256 insertions(+), 63 deletions(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index b12d3a322242..7ff00449a9a2 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1683,8 +1683,8 @@ static int vsock_stream_getsockopt(struct socket *sock,
return 0;
}
-static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
- size_t len)
+static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
{
struct sock *sk;
struct vsock_sock *vsk;
@@ -1737,6 +1737,12 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
if (err < 0)
goto out;
+ if (sk->sk_type == SOCK_SEQPACKET) {
+ err = transport->seqpacket_seq_send_len(vsk, len);
+ if (err < 0)
+ goto out;
+ }
+
while (total_written < len) {
ssize_t written;
@@ -1796,10 +1802,8 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
* smaller than the queue size. It is the caller's
* responsibility to check how many bytes we were able to send.
*/
-
- written = transport->stream_enqueue(
- vsk, msg,
- len - total_written);
+ written = transport->stream_enqueue(vsk, msg,
+ len - total_written);
if (written < 0) {
err = -ENOMEM;
goto out_err;
@@ -1815,36 +1819,96 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
}
out_err:
- if (total_written > 0)
- err = total_written;
+ if (total_written > 0) {
+ /* Return number of written bytes only if:
+ * 1) SOCK_STREAM socket.
+ * 2) SOCK_SEQPACKET socket when whole buffer is sent.
+ */
+ if (sk->sk_type == SOCK_STREAM || total_written == len)
+ err = total_written;
+ }
out:
release_sock(sk);
return err;
}
+static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
+{
+ return vsock_connectible_sendmsg(sock, msg, len);
+}
-static int
-vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
- int flags)
+static int vsock_seqpacket_sendmsg(struct socket *sock, struct msghdr *msg,
+ size_t len)
{
- struct sock *sk;
+ return vsock_connectible_sendmsg(sock, msg, len);
+}
+
+static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
+ long timeout,
+ struct vsock_transport_recv_notify_data *recv_data,
+ size_t target)
+{
+ int err = 0;
struct vsock_sock *vsk;
const struct vsock_transport *transport;
- int err;
- size_t target;
- ssize_t copied;
- long timeout;
- struct vsock_transport_recv_notify_data recv_data;
-
- DEFINE_WAIT(wait);
- sk = sock->sk;
vsk = vsock_sk(sk);
transport = vsk->transport;
- err = 0;
+ if (sk->sk_err != 0 ||
+ (sk->sk_shutdown & RCV_SHUTDOWN) ||
+ (vsk->peer_shutdown & SEND_SHUTDOWN)) {
+ finish_wait(sk_sleep(sk), wait);
+ return -1;
+ }
+ /* Don't wait for non-blocking sockets. */
+ if (timeout == 0) {
+ err = -EAGAIN;
+ finish_wait(sk_sleep(sk), wait);
+ return err;
+ }
+
+ if (sk->sk_type == SOCK_STREAM) {
+ err = transport->notify_recv_pre_block(vsk, target, recv_data);
+ if (err < 0) {
+ finish_wait(sk_sleep(sk), wait);
+ return err;
+ }
+ }
+
+ release_sock(sk);
+ timeout = schedule_timeout(timeout);
lock_sock(sk);
+ if (signal_pending(current)) {
+ err = sock_intr_errno(timeout);
+ finish_wait(sk_sleep(sk), wait);
+ } else if (timeout == 0) {
+ err = -EAGAIN;
+ finish_wait(sk_sleep(sk), wait);
+ }
+
+ return err;
+}
+
+static int vsock_wait_data_seqpacket(struct sock *sk, struct wait_queue_entry *wait,
+ long timeout)
+{
+ return vsock_wait_data(sk, wait, timeout, NULL, 0);
+}
+
+static int vsock_pre_recv_check(struct socket *sock,
+ int flags, size_t len, int *err)
+{
+ struct sock *sk;
+ struct vsock_sock *vsk;
+ const struct vsock_transport *transport;
+
+ sk = sock->sk;
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
if (!transport || sk->sk_state != TCP_ESTABLISHED) {
/* Recvmsg is supposed to return 0 if a peer performs an
* orderly shutdown. Differentiate between that case and when a
@@ -1852,16 +1916,16 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
* SOCK_DONE flag.
*/
if (sock_flag(sk, SOCK_DONE))
- err = 0;
+ *err = 0;
else
- err = -ENOTCONN;
+ *err = -ENOTCONN;
- goto out;
+ return false;
}
if (flags & MSG_OOB) {
- err = -EOPNOTSUPP;
- goto out;
+ *err = -EOPNOTSUPP;
+ return false;
}
/* We don't check peer_shutdown flag here since peer may actually shut
@@ -1869,17 +1933,143 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
* receive.
*/
if (sk->sk_shutdown & RCV_SHUTDOWN) {
- err = 0;
- goto out;
+ *err = 0;
+ return false;
}
/* It is valid on Linux to pass in a zero-length receive buffer. This
* is not an error. We may as well bail out now.
*/
if (!len) {
+ *err = 0;
+ return false;
+ }
+
+ return true;
+}
+
+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
+{
+ int err = 0;
+ size_t record_len;
+ struct vsock_sock *vsk;
+ const struct vsock_transport *transport;
+ long timeout;
+ ssize_t dequeued_total = 0;
+ unsigned long orig_nr_segs;
+ const struct iovec *orig_iov;
+ DEFINE_WAIT(wait);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
+
+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ msg->msg_flags &= ~MSG_EOR;
+ orig_nr_segs = msg->msg_iter.nr_segs;
+ orig_iov = msg->msg_iter.iov;
+
+ while (1) {
+ s64 ready;
+
+ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
+ ready = vsock_stream_has_data(vsk);
+
+ if (ready == 0) {
+ if (vsock_wait_data_seqpacket(sk, &wait, timeout)) {
+ /* In case of any loop break(timeout, signal
+ * interrupt or shutdown), we report user that
+ * nothing was copied.
+ */
+ dequeued_total = 0;
+ break;
+ }
+ } else {
+ ssize_t dequeued;
+
+ finish_wait(sk_sleep(sk), &wait);
+
+ if (ready < 0) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ if (dequeued_total == 0) {
+ record_len =
+ transport->seqpacket_seq_get_len(vsk);
+
+ if (record_len == 0)
+ continue;
+ }
+
+ /* 'msg_iter.count' is number of unused bytes in iov.
+ * On every copy to iov iterator it is decremented at
+ * size of data.
+ */
+ dequeued = transport->stream_dequeue(vsk, msg,
+ msg->msg_iter.count, flags);
+
+ if (dequeued < 0) {
+ dequeued_total = 0;
+
+ if (dequeued == -EAGAIN) {
+ iov_iter_init(&msg->msg_iter, READ,
+ orig_iov, orig_nr_segs,
+ len);
+ msg->msg_flags &= ~MSG_EOR;
+ continue;
+ }
+
+ err = -ENOMEM;
+ break;
+ }
+
+ dequeued_total += dequeued;
+
+ if (dequeued_total >= record_len)
+ break;
+ }
+ }
+
+ if (sk->sk_err)
+ err = -sk->sk_err;
+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
err = 0;
- goto out;
+
+ if (dequeued_total > 0) {
+ /* User sets MSG_TRUNC, so return real length of
+ * packet.
+ */
+ if (flags & MSG_TRUNC)
+ err = record_len;
+ else
+ err = len - msg->msg_iter.count;
+
+ /* Always set MSG_TRUNC if real length of packet is
+ * bigger that user buffer.
+ */
+ if (record_len > len)
+ msg->msg_flags |= MSG_TRUNC;
}
+out:
+ return err;
+}
+
+static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
+ size_t len, int flags)
+{
+ int err;
+ const struct vsock_transport *transport;
+ struct vsock_sock *vsk;
+ size_t target;
+ struct vsock_transport_recv_notify_data recv_data;
+ long timeout;
+ ssize_t copied;
+
+ DEFINE_WAIT(wait);
+
+ vsk = vsock_sk(sk);
+ transport = vsk->transport;
/* We must not copy less than target bytes into the user's buffer
* before returning successfully, so we wait for the consume queue to
@@ -1907,38 +2097,8 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
ready = vsock_stream_has_data(vsk);
if (ready == 0) {
- if (sk->sk_err != 0 ||
- (sk->sk_shutdown & RCV_SHUTDOWN) ||
- (vsk->peer_shutdown & SEND_SHUTDOWN)) {
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
- /* Don't wait for non-blocking sockets. */
- if (timeout == 0) {
- err = -EAGAIN;
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
-
- err = transport->notify_recv_pre_block(
- vsk, target, &recv_data);
- if (err < 0) {
- finish_wait(sk_sleep(sk), &wait);
+ if (vsock_wait_data(sk, &wait, timeout, &recv_data, target))
break;
- }
- release_sock(sk);
- timeout = schedule_timeout(timeout);
- lock_sock(sk);
-
- if (signal_pending(current)) {
- err = sock_intr_errno(timeout);
- finish_wait(sk_sleep(sk), &wait);
- break;
- } else if (timeout == 0) {
- err = -EAGAIN;
- finish_wait(sk_sleep(sk), &wait);
- break;
- }
} else {
ssize_t read;
@@ -1959,9 +2119,8 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
if (err < 0)
break;
- read = transport->stream_dequeue(
- vsk, msg,
- len - copied, flags);
+ read = transport->stream_dequeue(vsk, msg, len - copied, flags);
+
if (read < 0) {
err = -ENOMEM;
break;
@@ -1990,11 +2149,45 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
if (copied > 0)
err = copied;
+out:
+ return err;
+}
+
+static int vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg,
+ size_t len, int flags)
+{
+ struct sock *sk;
+ int err;
+
+ sk = sock->sk;
+
+ lock_sock(sk);
+
+ if (!vsock_pre_recv_check(sock, flags, len, &err))
+ goto out;
+
+ if (sk->sk_type == SOCK_STREAM)
+ err = __vsock_stream_recvmsg(sk, msg, len, flags);
+ else
+ err = __vsock_seqpacket_recvmsg(sk, msg, len, flags);
+
out:
release_sock(sk);
return err;
}
+static int vsock_seqpacket_recvmsg(struct socket *sock, struct msghdr *msg,
+ size_t len, int flags)
+{
+ return vsock_connectible_recvmsg(sock, msg, len, flags);
+}
+
+static int vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg,
+ size_t len, int flags)
+{
+ return vsock_connectible_recvmsg(sock, msg, len, flags);
+}
+
static const struct proto_ops vsock_stream_ops = {
.family = PF_VSOCK,
.owner = THIS_MODULE,
--
2.25.1
From: Arseniy Krasnov <[email protected]>
Replace 'stream' to 'connect oriented' as SOCK_SEQPACKET
support is also connect oriented.
---
net/vmw_vsock/af_vsock.c | 31 +++++++++++++++++--------------
1 file changed, 17 insertions(+), 14 deletions(-)
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 30caad9815f7..063d428e9ec2 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -415,8 +415,8 @@ static void vsock_deassign_transport(struct vsock_sock *vsk)
/* Assign a transport to a socket and call the .init transport callback.
*
- * Note: for stream socket this must be called when vsk->remote_addr is set
- * (e.g. during the connect() or when a connection request on a listener
+ * Note: for connect oriented socket this must be called when vsk->remote_addr
+ * is set (e.g. during the connect() or when a connection request on a listener
* socket is received).
* The vsk->remote_addr is used to decide which transport to use:
* - remote CID == VMADDR_CID_LOCAL or g2h->local_cid or VMADDR_CID_HOST if
@@ -476,10 +476,10 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
return 0;
/* transport->release() must be called with sock lock acquired.
- * This path can only be taken during vsock_stream_connect(),
- * where we have already held the sock lock.
- * In the other cases, this function is called on a new socket
- * which is not assigned to any transport.
+ * This path can only be taken during vsock_connect(), where we
+ * have already held the sock lock. In the other cases, this
+ * function is called on a new socket which is not assigned to
+ * any transport.
*/
vsk->transport->release(vsk);
vsock_deassign_transport(vsk);
@@ -656,9 +656,10 @@ static int __vsock_bind_connectible(struct vsock_sock *vsk,
vsock_addr_init(&vsk->local_addr, new_addr.svm_cid, new_addr.svm_port);
- /* Remove stream sockets from the unbound list and add them to the hash
- * table for easy lookup by its address. The unbound list is simply an
- * extra entry at the end of the hash table, a trick used by AF_UNIX.
+ /* Remove connect oriented sockets from the unbound list and add them
+ * to the hash table for easy lookup by its address. The unbound list
+ * is simply an extra entry at the end of the hash table, a trick used
+ * by AF_UNIX.
*/
__vsock_remove_bound(vsk);
__vsock_insert_bound(vsock_bound_sockets(&vsk->local_addr), vsk);
@@ -949,10 +950,10 @@ static int vsock_shutdown(struct socket *sock, int mode)
if ((mode & ~SHUTDOWN_MASK) || !mode)
return -EINVAL;
- /* If this is a STREAM socket and it is not connected then bail out
- * immediately. If it is a DGRAM socket then we must first kick the
- * socket so that it wakes up from any sleeping calls, for example
- * recv(), and then afterwards return the error.
+ /* If this is a connect oriented socket and it is not connected then
+ * bail out immediately. If it is a DGRAM socket then we must first
+ * kick the socket so that it wakes up from any sleeping calls, for
+ * example recv(), and then afterwards return the error.
*/
sk = sock->sk;
@@ -1757,7 +1758,9 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
lock_sock(sk);
- /* Callers should not provide a destination with stream sockets. */
+ /* Callers should not provide a destination with connect oriented
+ * sockets.
+ */
if (msg->msg_namelen) {
err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
goto out;
--
2.25.1
Hi Arseny!
03.01.2021 23:03, Arseny Krasnov пишет:
> From: Arseniy Krasnov <[email protected]>
>
> For send, this patch adds:
> 1) Send of record begin marker with record length.
> 2) Return error if send of whole record is failed.
>
> For receive, this patch adds another loop, it looks like
> stream loop, but:
> 1) It doesn't call notify callbacks.
> 2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT'
> values.
> 3) It waits until whole record is received or error is
> found during receiving.
> 3) It processes and sets 'MSG_TRUNC' flag.
> ---
> net/vmw_vsock/af_vsock.c | 319 +++++++++++++++++++++++++++++++--------
> 1 file changed, 256 insertions(+), 63 deletions(-)
>
> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
> index b12d3a322242..7ff00449a9a2 100644
> --- a/net/vmw_vsock/af_vsock.c
> +++ b/net/vmw_vsock/af_vsock.c
> @@ -1683,8 +1683,8 @@ static int vsock_stream_getsockopt(struct socket *sock,
> return 0;
> }
>
> -static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> - size_t len)
> +static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> + size_t len)
> {
> struct sock *sk;
> struct vsock_sock *vsk;
> @@ -1737,6 +1737,12 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> if (err < 0)
> goto out;
>
> + if (sk->sk_type == SOCK_SEQPACKET) {
> + err = transport->seqpacket_seq_send_len(vsk, len);
> + if (err < 0)
> + goto out;
> + }
> +
> while (total_written < len) {
> ssize_t written;
>
> @@ -1796,10 +1802,8 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> * smaller than the queue size. It is the caller's
> * responsibility to check how many bytes we were able to send.
> */
> -
> - written = transport->stream_enqueue(
> - vsk, msg,
> - len - total_written);
> + written = transport->stream_enqueue(vsk, msg,
> + len - total_written);
White-space change?
> if (written < 0) {
> err = -ENOMEM;
> goto out_err;
> @@ -1815,36 +1819,96 @@ static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> }
>
> out_err:
> - if (total_written > 0)
> - err = total_written;
> + if (total_written > 0) {
> + /* Return number of written bytes only if:
> + * 1) SOCK_STREAM socket.
> + * 2) SOCK_SEQPACKET socket when whole buffer is sent.
> + */
> + if (sk->sk_type == SOCK_STREAM || total_written == len)
> + err = total_written;
> + }
> out:
> release_sock(sk);
> return err;
> }
>
> +static int vsock_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> + size_t len)
> +{
> + return vsock_connectible_sendmsg(sock, msg, len);
> +}
>
> -static int
> -vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> - int flags)
> +static int vsock_seqpacket_sendmsg(struct socket *sock, struct msghdr *msg,
> + size_t len)
> {
> - struct sock *sk;
> + return vsock_connectible_sendmsg(sock, msg, len);
> +}
> +
> +static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
> + long timeout,
> + struct vsock_transport_recv_notify_data *recv_data,
> + size_t target)
> +{
You patch looks quite large because
of this, so would it make sense to separate
out the refactoring part (vsock_wait_data()
and friends that you seem to copy out of
recvmsg() code) as the separate patch?
Currently its a bit difficult to see what was
added and what was "refactored".
> + int err = 0;
> struct vsock_sock *vsk;
> const struct vsock_transport *transport;
> - int err;
> - size_t target;
> - ssize_t copied;
> - long timeout;
> - struct vsock_transport_recv_notify_data recv_data;
> -
> - DEFINE_WAIT(wait);
>
> - sk = sock->sk;
> vsk = vsock_sk(sk);
> transport = vsk->transport;
> - err = 0;
>
> + if (sk->sk_err != 0 ||
> + (sk->sk_shutdown & RCV_SHUTDOWN) ||
> + (vsk->peer_shutdown & SEND_SHUTDOWN)) {
> + finish_wait(sk_sleep(sk), wait);
> + return -1;
> + }
> + /* Don't wait for non-blocking sockets. */
> + if (timeout == 0) {
> + err = -EAGAIN;
> + finish_wait(sk_sleep(sk), wait);
> + return err;
> + }
> +
> + if (sk->sk_type == SOCK_STREAM) {
> + err = transport->notify_recv_pre_block(vsk, target, recv_data);
> + if (err < 0) {
> + finish_wait(sk_sleep(sk), wait);
> + return err;
> + }
> + }
> +
> + release_sock(sk);
> + timeout = schedule_timeout(timeout);
> lock_sock(sk);
>
> + if (signal_pending(current)) {
> + err = sock_intr_errno(timeout);
> + finish_wait(sk_sleep(sk), wait);
> + } else if (timeout == 0) {
> + err = -EAGAIN;
> + finish_wait(sk_sleep(sk), wait);
> + }
> +
> + return err;
> +}
> +
> +static int vsock_wait_data_seqpacket(struct sock *sk, struct wait_queue_entry *wait,
> + long timeout)
> +{
> + return vsock_wait_data(sk, wait, timeout, NULL, 0);
Would it make sense to structure that
differently? If vsock_wait_data() does
"more things" than vsock_wait_data_seqpacket(),
then would it be possible to make
vsock_wait_data() to call vsock_wait_data_seqpacket()
(or some common part of both), rather
than to null out unused arguments?
> +}
> +
> +static int vsock_pre_recv_check(struct socket *sock,
> + int flags, size_t len, int *err)
> +{
> + struct sock *sk;
> + struct vsock_sock *vsk;
> + const struct vsock_transport *transport;
> +
> + sk = sock->sk;
> + vsk = vsock_sk(sk);
> + transport = vsk->transport;
> +
> if (!transport || sk->sk_state != TCP_ESTABLISHED) {
> /* Recvmsg is supposed to return 0 if a peer performs an
> * orderly shutdown. Differentiate between that case and when a
> @@ -1852,16 +1916,16 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> * SOCK_DONE flag.
> */
> if (sock_flag(sk, SOCK_DONE))
> - err = 0;
> + *err = 0;
> else
> - err = -ENOTCONN;
> + *err = -ENOTCONN;
>
> - goto out;
> + return false;
Hmm, are you sure you need to convert
"err" to the pointer, just to return true/false
as the return value?
How about still returning "err" itself?
> }
>
> if (flags & MSG_OOB) {
> - err = -EOPNOTSUPP;
> - goto out;
> + *err = -EOPNOTSUPP;
> + return false;
> }
>
> /* We don't check peer_shutdown flag here since peer may actually shut
> @@ -1869,17 +1933,143 @@ vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> * receive.
> */
> if (sk->sk_shutdown & RCV_SHUTDOWN) {
> - err = 0;
> - goto out;
> + *err = 0;
> + return false;
> }
>
> /* It is valid on Linux to pass in a zero-length receive buffer. This
> * is not an error. We may as well bail out now.
> */
> if (!len) {
> + *err = 0;
> + return false;
> + }
> +
> + return true;
> +}
> +
> +static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
> + size_t len, int flags)
> +{
> + int err = 0;
> + size_t record_len;
> + struct vsock_sock *vsk;
> + const struct vsock_transport *transport;
> + long timeout;
> + ssize_t dequeued_total = 0;
> + unsigned long orig_nr_segs;
> + const struct iovec *orig_iov;
> + DEFINE_WAIT(wait);
> +
> + vsk = vsock_sk(sk);
> + transport = vsk->transport;
> +
> + timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
> + msg->msg_flags &= ~MSG_EOR;
> + orig_nr_segs = msg->msg_iter.nr_segs;
> + orig_iov = msg->msg_iter.iov;
> +
> + while (1) {
> + s64 ready;
> +
> + prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
> + ready = vsock_stream_has_data(vsk);
> +
> + if (ready == 0) {
> + if (vsock_wait_data_seqpacket(sk, &wait, timeout)) {
> + /* In case of any loop break(timeout, signal
> + * interrupt or shutdown), we report user that
> + * nothing was copied.
> + */
> + dequeued_total = 0;
> + break;
> + }
> + } else {
> + ssize_t dequeued;
> +
> + finish_wait(sk_sleep(sk), &wait);
> +
> + if (ready < 0) {
> + err = -ENOMEM;
> + goto out;
> + }
> +
> + if (dequeued_total == 0) {
> + record_len =
> + transport->seqpacket_seq_get_len(vsk);
> +
> + if (record_len == 0)
> + continue;
> + }
> +
> + /* 'msg_iter.count' is number of unused bytes in iov.
> + * On every copy to iov iterator it is decremented at
> + * size of data.
> + */
> + dequeued = transport->stream_dequeue(vsk, msg,
> + msg->msg_iter.count, flags);
> +
> + if (dequeued < 0) {
> + dequeued_total = 0;
> +
> + if (dequeued == -EAGAIN) {
> + iov_iter_init(&msg->msg_iter, READ,
> + orig_iov, orig_nr_segs,
> + len);
> + msg->msg_flags &= ~MSG_EOR;
> + continue;
> + }
> +
> + err = -ENOMEM;
> + break;
> + }
> +
> + dequeued_total += dequeued;
> +
> + if (dequeued_total >= record_len)
> + break;
> + }
> + }
> +
> + if (sk->sk_err)
> + err = -sk->sk_err;
> + else if (sk->sk_shutdown & RCV_SHUTDOWN)
> err = 0;
> - goto out;
> +
> + if (dequeued_total > 0) {
> + /* User sets MSG_TRUNC, so return real length of
> + * packet.
> + */
> + if (flags & MSG_TRUNC)
> + err = record_len;
> + else
> + err = len - msg->msg_iter.count;
Its not very clear (only for me perhaps) how
dequeue_total and len correlate. Are they
equal here? Would you need to check that
dequeued_total >= record_len?
I mean, its just a bit strange that you check
dequeued_total>0 and no longer use that var
inside the block.
Hi Arseny, thanks for your work on this!
I did a small review in a hope it helps.
Also it may be cool to have the driver feature
for that (so that the host can see if its supported).
But I guess this was already said by Michael. :)
03.01.2021 22:54, Arseny Krasnov пишет:
> As SOCK_SEQPACKET guarantees to save record boundaries, so to
> do it, new packet operation was added: it marks start of record (with
> record length in header). To send record, packet with start marker is
> sent first, then all data is transmitted as 'RW' packets. On receiver's
> side, length of record is known from packet with start record marker.
> Now as packets of one socket are not reordered neither on vsock nor on
> vhost transport layers, these marker allows to restore original record
> on receiver's side. When each 'RW' packet is inserted to rx queue of
> receiver, user is woken up, data is copied to user's buffer and credit
> update message is sent. If there is no user waiting for data, credit
> won't be updated and sender will wait. Also, if user's buffer is full,
> and record is bigger, all unneeded data will be dropped (with sending of
> credit update message).
> 'MSG_EOR' flag is implemented with special value of 'flags' field
> in packet header. When record is received with such flags, 'MSG_EOR' is
> set in 'recvmsg()' flags. 'MSG_TRUNC' flag is also supported.
> In this implementation maximum length of datagram is not limited
> as in stream socket.
>
> drivers/vhost/vsock.c | 6 +-
> include/linux/virtio_vsock.h | 7 +
> include/net/af_vsock.h | 4 +
> include/uapi/linux/virtio_vsock.h | 9 +
> net/vmw_vsock/af_vsock.c | 457 +++++++++++++++++++-----
> net/vmw_vsock/virtio_transport.c | 3 +
> net/vmw_vsock/virtio_transport_common.c | 323 ++++++++++++++---
> 7 files changed, 673 insertions(+), 136 deletions(-)
>
03.01.2021 22:57, Arseny Krasnov пишет:
> This extends rx loop for SOCK_SEQPACKET packets and implements
> callback which user calls to copy data to its buffer.
>
> Signed-off-by: Arseny Krasnov <[email protected]>
> ---
> include/linux/virtio_vsock.h | 7 +
> include/net/af_vsock.h | 4 +
> include/uapi/linux/virtio_vsock.h | 9 +
> net/vmw_vsock/virtio_transport.c | 3 +
> net/vmw_vsock/virtio_transport_common.c | 323 +++++++++++++++++++++---
> 5 files changed, 305 insertions(+), 41 deletions(-)
>
> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
> index dc636b727179..4902d71b3252 100644
> --- a/include/linux/virtio_vsock.h
> +++ b/include/linux/virtio_vsock.h
> @@ -36,6 +36,10 @@ struct virtio_vsock_sock {
> u32 rx_bytes;
> u32 buf_alloc;
> struct list_head rx_queue;
> +
> + /* For SOCK_SEQPACKET */
> + u32 user_read_seq_len;
> + u32 user_read_copied;
> };
>
> struct virtio_vsock_pkt {
> @@ -80,6 +84,9 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags);
>
> +bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len);
> +size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
> +
> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>
> diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
> index b1c717286993..792ea7b66574 100644
> --- a/include/net/af_vsock.h
> +++ b/include/net/af_vsock.h
> @@ -135,6 +135,10 @@ struct vsock_transport {
> bool (*stream_is_active)(struct vsock_sock *);
> bool (*stream_allow)(u32 cid, u32 port);
>
> + /* SEQ_PACKET. */
> + bool (*seqpacket_seq_send_len)(struct vsock_sock *, size_t len);
> + size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
> +
> /* Notification. */
> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
> diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
> index 1d57ed3d84d2..058908bc19fc 100644
> --- a/include/uapi/linux/virtio_vsock.h
> +++ b/include/uapi/linux/virtio_vsock.h
> @@ -65,6 +65,7 @@ struct virtio_vsock_hdr {
>
> enum virtio_vsock_type {
> VIRTIO_VSOCK_TYPE_STREAM = 1,
> + VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
> };
>
> enum virtio_vsock_op {
> @@ -83,6 +84,9 @@ enum virtio_vsock_op {
> VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
> /* Request the peer to send the credit info to us */
> VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
> +
> + /* Record begin for SOCK_SEQPACKET */
> + VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
> };
>
> /* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
> @@ -91,4 +95,9 @@ enum virtio_vsock_shutdown {
> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
> };
>
> +/* VIRTIO_VSOCK_OP_RW flags values for SOCK_SEQPACKET type */
> +enum virtio_vsock_rw_seqpacket {
> + VIRTIO_VSOCK_RW_EOR = 1,
> +};
> +
> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
> diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
> index 2700a63ab095..2bd3f7cbffcb 100644
> --- a/net/vmw_vsock/virtio_transport.c
> +++ b/net/vmw_vsock/virtio_transport.c
> @@ -469,6 +469,9 @@ static struct virtio_transport virtio_transport = {
> .stream_is_active = virtio_transport_stream_is_active,
> .stream_allow = virtio_transport_stream_allow,
>
> + .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
> + .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
> +
> .notify_poll_in = virtio_transport_notify_poll_in,
> .notify_poll_out = virtio_transport_notify_poll_out,
> .notify_recv_init = virtio_transport_notify_recv_init,
> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
> index 5956939eebb7..77c42004e422 100644
> --- a/net/vmw_vsock/virtio_transport_common.c
> +++ b/net/vmw_vsock/virtio_transport_common.c
> @@ -139,6 +139,7 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
> break;
> case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
> case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
> + case VIRTIO_VSOCK_OP_SEQ_BEGIN:
> hdr->op = cpu_to_le16(AF_VSOCK_OP_CONTROL);
> break;
> default:
> @@ -157,6 +158,10 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
>
> void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
> {
> + /* TODO: implement tap support for SOCK_SEQPACKET. */
> + if (le32_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET)
> + return;
> +
> if (pkt->tap_delivered)
> return;
>
> @@ -230,10 +235,10 @@ static bool virtio_transport_inc_rx_pkt(struct virtio_vsock_sock *vvs,
> }
>
> static void virtio_transport_dec_rx_pkt(struct virtio_vsock_sock *vvs,
> - struct virtio_vsock_pkt *pkt)
> + u32 len)
> {
> - vvs->rx_bytes -= pkt->len;
> - vvs->fwd_cnt += pkt->len;
> + vvs->rx_bytes -= len;
> + vvs->fwd_cnt += len;
> }
>
> void virtio_transport_inc_tx_pkt(struct virtio_vsock_sock *vvs, struct virtio_vsock_pkt *pkt)
> @@ -365,7 +370,7 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> total += bytes;
> pkt->off += bytes;
> if (pkt->off == pkt->len) {
> - virtio_transport_dec_rx_pkt(vvs, pkt);
> + virtio_transport_dec_rx_pkt(vvs, pkt->len);
> list_del(&pkt->list);
> virtio_transport_free_pkt(pkt);
> }
> @@ -397,15 +402,202 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> return err;
> }
>
> +static u16 virtio_transport_get_type(struct sock *sk)
> +{
> + if (sk->sk_type == SOCK_STREAM)
> + return VIRTIO_VSOCK_TYPE_STREAM;
> + else
> + return VIRTIO_VSOCK_TYPE_SEQPACKET;
> +}
> +
> +bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len)
> +{
> + struct virtio_vsock_pkt_info info = {
> + .type = VIRTIO_VSOCK_TYPE_SEQPACKET,
> + .op = VIRTIO_VSOCK_OP_SEQ_BEGIN,
> + .vsk = vsk,
> + .flags = len
> + };
> +
> + return virtio_transport_send_pkt_info(vsk, &info);
> +}
> +EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_send_len);
> +
> +static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
> +{
> + list_del(&pkt->list);
> + virtio_transport_free_pkt(pkt);
> +}
> +
> +static size_t virtio_transport_drop_until_seq_begin(struct virtio_vsock_sock *vvs)
> +{
> + struct virtio_vsock_pkt *pkt, *n;
> + size_t bytes_dropped = 0;
> +
> + list_for_each_entry_safe(pkt, n, &vvs->rx_queue, list) {
> + if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_SEQ_BEGIN)
> + break;
> +
> + bytes_dropped += le32_to_cpu(pkt->hdr.len);
> + virtio_transport_dec_rx_pkt(vvs, pkt->len);
> + virtio_transport_del_n_free_pkt(pkt);
> + }
> +
> + return bytes_dropped;
> +}
> +
> +size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk)
> +{
> + struct virtio_vsock_sock *vvs = vsk->trans;
> + struct virtio_vsock_pkt *pkt;
> + size_t bytes_dropped;
> +
> + spin_lock_bh(&vvs->rx_lock);
> +
> + /* Fetch all orphaned 'RW', packets, and
> + * send credit update.
> + */
> + bytes_dropped = virtio_transport_drop_until_seq_begin(vvs);
> +
> + if (list_empty(&vvs->rx_queue))
> + goto out;
> +
> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
> +
> + vvs->user_read_copied = 0;
> + vvs->user_read_seq_len = le32_to_cpu(pkt->hdr.flags);
> + virtio_transport_del_n_free_pkt(pkt);
> +out:
> + spin_unlock_bh(&vvs->rx_lock);
> +
> + if (bytes_dropped)
> + virtio_transport_send_credit_update(vsk,
> + VIRTIO_VSOCK_TYPE_SEQPACKET,
> + NULL);
> +
> + return vvs->user_read_seq_len;
> +}
> +EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_get_len);
> +
> +static ssize_t virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
> + struct msghdr *msg,
> + size_t user_buf_len)
> +{
> + struct virtio_vsock_sock *vvs = vsk->trans;
> + struct virtio_vsock_pkt *pkt;
> + size_t bytes_handled = 0;
> + int err = 0;
> +
> + spin_lock_bh(&vvs->rx_lock);
> +
> + if (user_buf_len == 0) {
IMHO you can avoid this special-casing
by introducing yet another outer loop just
for draining the extra data from buffer.
Admittedly that may also require an extra
transport op.
> + /* User's buffer is full, we processing rest of
> + * record and drop it. If 'SEQ_BEGIN' is found
> + * while iterating, user will be woken up,
> + * because record is already copied, and we
> + * don't care about absent of some tail RW packets
> + * of it. Return number of bytes(rest of record),
> + * but ignore credit update for such absent bytes.
> + */
> + bytes_handled = virtio_transport_drop_until_seq_begin(vvs);
> + vvs->user_read_copied += bytes_handled;
> +
> + if (!list_empty(&vvs->rx_queue) &&
> + vvs->user_read_copied < vvs->user_read_seq_len) {
> + /* 'SEQ_BEGIN' found, but record isn't complete.
> + * Set number of copied bytes to fit record size
> + * and force counters to finish receiving.
> + */
> + bytes_handled += (vvs->user_read_seq_len - vvs->user_read_copied);
> + vvs->user_read_copied = vvs->user_read_seq_len;
> + }
> + }
> +
> + /* Now start copying. */
> + while (vvs->user_read_copied < vvs->user_read_seq_len &&
> + vvs->rx_bytes &&
> + user_buf_len &&
> + !err) {
> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
> +
> + switch (le16_to_cpu(pkt->hdr.op)) {
> + case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
> + /* Unexpected 'SEQ_BEGIN' during record copy:
> + * Leave receive loop, 'EAGAIN' will restart it from
> + * outer receive loop, packet is still in queue and
> + * counters are cleared. So in next loop enter,
> + * 'SEQ_BEGIN' will be dequeued first. User's iov
> + * iterator will be reset in outer loop. Also
> + * send credit update, because some bytes could be
> + * copied. User will never see unfinished record.
> + */
> + err = -EAGAIN;
> + break;
> + }
> + case VIRTIO_VSOCK_OP_RW: {
> + size_t bytes_to_copy;
> + size_t pkt_len;
> +
> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
> + bytes_to_copy = min(user_buf_len, pkt_len);
> +
> + /* sk_lock is held by caller so no one else can dequeue.
> + * Unlock rx_lock since memcpy_to_msg() may sleep.
> + */
> + spin_unlock_bh(&vvs->rx_lock);
> +
> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
> + spin_lock_bh(&vvs->rx_lock);
> + err = -EINVAL;
> + break;
> + }
> +
> + spin_lock_bh(&vvs->rx_lock);
> + user_buf_len -= bytes_to_copy;
> + bytes_handled += pkt->len;
> + vvs->user_read_copied += bytes_to_copy;
> +
> + if (le16_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_RW_EOR)
> + msg->msg_flags |= MSG_EOR;
> + break;
> + }
> + default:
> + ;
> + }
> +
> + /* For unexpected 'SEQ_BEGIN', keep such packet in queue,
> + * but drop any other type of packet.
> + */
> + if (le16_to_cpu(pkt->hdr.op) != VIRTIO_VSOCK_OP_SEQ_BEGIN) {
> + virtio_transport_dec_rx_pkt(vvs, pkt->len);
> + virtio_transport_del_n_free_pkt(pkt);
> + }
> + }
> +
> + spin_unlock_bh(&vvs->rx_lock);
> +
> + virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_SEQPACKET,
> + NULL);
> +
> + return err ?: bytes_handled;
> +}
> +
> ssize_t
> virtio_transport_stream_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags)
> {
> - if (flags & MSG_PEEK)
> - return virtio_transport_stream_do_peek(vsk, msg, len);
> - else
> + if (virtio_transport_get_type(sk_vsock(vsk)) == VIRTIO_VSOCK_TYPE_SEQPACKET) {
> + if (flags & MSG_PEEK)
> + return -EOPNOTSUPP;
> +
> + return virtio_transport_seqpacket_do_dequeue(vsk, msg, len);
> + } else {
> + if (flags & MSG_PEEK)
> + return virtio_transport_stream_do_peek(vsk, msg, len);
> +
> return virtio_transport_stream_do_dequeue(vsk, msg, len);
> + }
> }
> EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
>
> @@ -481,6 +673,8 @@ int virtio_transport_do_socket_init(struct vsock_sock *vsk,
> spin_lock_init(&vvs->rx_lock);
> spin_lock_init(&vvs->tx_lock);
> INIT_LIST_HEAD(&vvs->rx_queue);
> + vvs->user_read_copied = 0;
> + vvs->user_read_seq_len = 0;
>
> return 0;
> }
> @@ -490,13 +684,16 @@ EXPORT_SYMBOL_GPL(virtio_transport_do_socket_init);
> void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)
> {
> struct virtio_vsock_sock *vvs = vsk->trans;
> + int type;
>
> if (*val > VIRTIO_VSOCK_MAX_BUF_SIZE)
> *val = VIRTIO_VSOCK_MAX_BUF_SIZE;
>
> vvs->buf_alloc = *val;
>
> - virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_STREAM,
> + type = virtio_transport_get_type(sk_vsock(vsk));
> +
> + virtio_transport_send_credit_update(vsk, type,
> NULL);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);
> @@ -624,10 +821,11 @@ int virtio_transport_connect(struct vsock_sock *vsk)
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_REQUEST,
> - .type = VIRTIO_VSOCK_TYPE_STREAM,
> .vsk = vsk,
> };
>
> + info.type = virtio_transport_get_type(sk_vsock(vsk));
> +
> return virtio_transport_send_pkt_info(vsk, &info);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_connect);
> @@ -636,7 +834,6 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_SHUTDOWN,
> - .type = VIRTIO_VSOCK_TYPE_STREAM,
> .flags = (mode & RCV_SHUTDOWN ?
> VIRTIO_VSOCK_SHUTDOWN_RCV : 0) |
> (mode & SEND_SHUTDOWN ?
> @@ -644,6 +841,8 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
> .vsk = vsk,
> };
>
> + info.type = virtio_transport_get_type(sk_vsock(vsk));
> +
> return virtio_transport_send_pkt_info(vsk, &info);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_shutdown);
> @@ -665,12 +864,18 @@ virtio_transport_stream_enqueue(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RW,
> - .type = VIRTIO_VSOCK_TYPE_STREAM,
> .msg = msg,
> .pkt_len = len,
> .vsk = vsk,
> + .flags = 0,
> };
>
> + info.type = virtio_transport_get_type(sk_vsock(vsk));
> +
> + if (info.type == VIRTIO_VSOCK_TYPE_SEQPACKET &&
> + msg->msg_flags & MSG_EOR)
> + info.flags |= VIRTIO_VSOCK_RW_EOR;
> +
> return virtio_transport_send_pkt_info(vsk, &info);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_stream_enqueue);
> @@ -688,7 +893,6 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RST,
> - .type = VIRTIO_VSOCK_TYPE_STREAM,
> .reply = !!pkt,
> .vsk = vsk,
> };
> @@ -697,6 +901,8 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
> if (pkt && le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RST)
> return 0;
>
> + info.type = virtio_transport_get_type(sk_vsock(vsk));
> +
> return virtio_transport_send_pkt_info(vsk, &info);
> }
>
> @@ -884,44 +1090,59 @@ virtio_transport_recv_connecting(struct sock *sk,
> return err;
> }
>
> -static void
> +static bool
> virtio_transport_recv_enqueue(struct vsock_sock *vsk,
> struct virtio_vsock_pkt *pkt)
> {
> struct virtio_vsock_sock *vvs = vsk->trans;
> - bool can_enqueue, free_pkt = false;
> + bool data_ready = false;
> + bool free_pkt = false;
>
> - pkt->len = le32_to_cpu(pkt->hdr.len);
> pkt->off = 0;
> + pkt->len = le32_to_cpu(pkt->hdr.len);
>
> spin_lock_bh(&vvs->rx_lock);
>
> - can_enqueue = virtio_transport_inc_rx_pkt(vvs, pkt);
> - if (!can_enqueue) {
> - free_pkt = true;
> - goto out;
> - }
> + switch (le32_to_cpu(pkt->hdr.type)) {
> + case VIRTIO_VSOCK_TYPE_STREAM: {
> + if (!virtio_transport_inc_rx_pkt(vvs, pkt)) {
> + free_pkt = true;
> + goto out;
> + }
>
> - /* Try to copy small packets into the buffer of last packet queued,
> - * to avoid wasting memory queueing the entire buffer with a small
> - * payload.
> - */
> - if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
> - struct virtio_vsock_pkt *last_pkt;
> + /* Try to copy small packets into the buffer of last packet queued,
> + * to avoid wasting memory queueing the entire buffer with a small
> + * payload.
> + */
> + if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
> + struct virtio_vsock_pkt *last_pkt;
>
> - last_pkt = list_last_entry(&vvs->rx_queue,
> - struct virtio_vsock_pkt, list);
> + last_pkt = list_last_entry(&vvs->rx_queue,
> + struct virtio_vsock_pkt, list);
>
> - /* If there is space in the last packet queued, we copy the
> - * new packet in its buffer.
> - */
> - if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
> - memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
> - pkt->len);
> - last_pkt->len += pkt->len;
> - free_pkt = true;
> - goto out;
> + /* If there is space in the last packet queued, we copy the
> + * new packet in its buffer.
> + */
> + if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
> + memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
> + pkt->len);
> + last_pkt->len += pkt->len;
> + free_pkt = true;
> + goto out;
> + }
> }
> +
> + data_ready = true;
> + break;
> + }
> +
> + case VIRTIO_VSOCK_TYPE_SEQPACKET: {
> + data_ready = true;
> + vvs->rx_bytes += pkt->len;
> + break;
> + }
> + default:
> + goto out;
> }
>
> list_add_tail(&pkt->list, &vvs->rx_queue);
> @@ -930,6 +1151,8 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
> spin_unlock_bh(&vvs->rx_lock);
> if (free_pkt)
> virtio_transport_free_pkt(pkt);
> +
> + return data_ready;
> }
>
> static int
> @@ -940,9 +1163,17 @@ virtio_transport_recv_connected(struct sock *sk,
> int err = 0;
>
> switch (le16_to_cpu(pkt->hdr.op)) {
> + case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
> + struct virtio_vsock_sock *vvs = vsk->trans;
> +
> + spin_lock_bh(&vvs->rx_lock);
> + list_add_tail(&pkt->list, &vvs->rx_queue);
> + spin_unlock_bh(&vvs->rx_lock);
> + return err;
> + }
> case VIRTIO_VSOCK_OP_RW:
> - virtio_transport_recv_enqueue(vsk, pkt);
> - sk->sk_data_ready(sk);
> + if (virtio_transport_recv_enqueue(vsk, pkt))
> + sk->sk_data_ready(sk);
Why do you need this change?
(maybe its ok, just wondering)
> return err;
> case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
> sk->sk_write_space(sk);
> @@ -990,13 +1221,14 @@ virtio_transport_send_response(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RESPONSE,
> - .type = VIRTIO_VSOCK_TYPE_STREAM,
> .remote_cid = le64_to_cpu(pkt->hdr.src_cid),
> .remote_port = le32_to_cpu(pkt->hdr.src_port),
> .reply = true,
> .vsk = vsk,
> };
>
> + info.type = virtio_transport_get_type(sk_vsock(vsk));
> +
> return virtio_transport_send_pkt_info(vsk, &info);
> }
>
> @@ -1086,6 +1318,12 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
> return 0;
> }
>
> +static bool virtio_transport_valid_type(u16 type)
> +{
> + return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
> + (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
> +}
> +
> /* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
> * lock.
> */
> @@ -1111,7 +1349,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
> le32_to_cpu(pkt->hdr.buf_alloc),
> le32_to_cpu(pkt->hdr.fwd_cnt));
>
> - if (le16_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_STREAM) {
> + if (!virtio_transport_valid_type(le16_to_cpu(pkt->hdr.type))) {
> (void)virtio_transport_reset_no_sock(t, pkt);
> goto free_pkt;
> }
> @@ -1128,6 +1366,9 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
> }
> }
>
> + if (virtio_transport_get_type(sk) != le16_to_cpu(pkt->hdr.type))
> + goto free_pkt;
No need to reset here, like a few lines
above in a seemingly similar condition?
03.01.2021 23:04, Arseny Krasnov пишет:
> From: Arseniy Krasnov <[email protected]>
>
> ---
> net/vmw_vsock/af_vsock.c | 107 +++++++++++++++++++++++++++++++++------
> 1 file changed, 91 insertions(+), 16 deletions(-)
>
> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
> index 7ff00449a9a2..30caad9815f7 100644
> --- a/net/vmw_vsock/af_vsock.c
> +++ b/net/vmw_vsock/af_vsock.c
> @@ -452,6 +452,7 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
> new_transport = transport_dgram;
> break;
> case SOCK_STREAM:
> + case SOCK_SEQPACKET:
> if (vsock_use_local_transport(remote_cid))
> new_transport = transport_local;
> else if (remote_cid <= VMADDR_CID_HOST || !transport_h2g ||
> @@ -459,6 +460,12 @@ int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk)
> new_transport = transport_g2h;
> else
> new_transport = transport_h2g;
> +
> + if (sk->sk_type == SOCK_SEQPACKET) {
> + if (!new_transport->seqpacket_seq_send_len ||
> + !new_transport->seqpacket_seq_get_len)
> + return -ENODEV;
Is ENODEV the right error here?
Just a quick look at a man page, and
I am under impression something like
EPROTONOSUPPORT or ESOCKNOSUPPORT
may suit?
> IMHO you can avoid this special-casing
> by introducing yet another outer loop just
> for draining the extra data from buffer.
> Admittedly that may also require an extra
> transport op.
I'm not sure that extra tranport op is needed, may be i'll
try to put drain code inside copy loop, because only
difference is that copy length is 0.
> Why do you need this change?
> (maybe its ok, just wondering)
> No need to reset here, like a few lines
> above in a seemingly similar condition?
Yes, i think you are right.
> Is ENODEV the right error here?
> Just a quick look at a man page, and
> I am under impression something like
> EPROTONOSUPPORT or ESOCKNOSUPPORT
> may suit?
I used ENODEV because this code is returned some
lines below when !new_transport(e.g. valid transport
not found). But i think you codes will be also ok.
> Hi Arseny, thanks for your work on this!
> I did a small review in a hope it helps.
> Also it may be cool to have the driver feature
> for that (so that the host can see if its supported).
> But I guess this was already said by Michael. :)
Hello, thank You for your review, i'll prepare
v2 as soon as possible.
On Sun, Jan 03, 2021 at 10:57:50PM +0300, Arseny Krasnov wrote:
> This extends rx loop for SOCK_SEQPACKET packets and implements
>callback which user calls to copy data to its buffer.
Please write a better commit message explaining the changes, e.g. that
you are using 'flags' to transport lengths when 'op' ==
VIRTIO_VSOCK_OP_SEQ_BEGIN.
>
>Signed-off-by: Arseny Krasnov <[email protected]>
>---
> include/linux/virtio_vsock.h | 7 +
> include/net/af_vsock.h | 4 +
> include/uapi/linux/virtio_vsock.h | 9 +
> net/vmw_vsock/virtio_transport.c | 3 +
> net/vmw_vsock/virtio_transport_common.c | 323 +++++++++++++++++++++---
> 5 files changed, 305 insertions(+), 41 deletions(-)
>
>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>index dc636b727179..4902d71b3252 100644
>--- a/include/linux/virtio_vsock.h
>+++ b/include/linux/virtio_vsock.h
>@@ -36,6 +36,10 @@ struct virtio_vsock_sock {
> u32 rx_bytes;
> u32 buf_alloc;
> struct list_head rx_queue;
>+
>+ /* For SOCK_SEQPACKET */
>+ u32 user_read_seq_len;
>+ u32 user_read_copied;
> };
>
> struct virtio_vsock_pkt {
>@@ -80,6 +84,9 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags);
>
>+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len);
>+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk);
>+
> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>
>diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>index b1c717286993..792ea7b66574 100644
>--- a/include/net/af_vsock.h
>+++ b/include/net/af_vsock.h
>@@ -135,6 +135,10 @@ struct vsock_transport {
> bool (*stream_is_active)(struct vsock_sock *);
> bool (*stream_allow)(u32 cid, u32 port);
>
>+ /* SEQ_PACKET. */
>+ bool (*seqpacket_seq_send_len)(struct vsock_sock *, size_t len);
>+ size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
>+
These changes are related to the vsock core, so please move to another
patch.
> /* Notification. */
> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
>diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
>index 1d57ed3d84d2..058908bc19fc 100644
>--- a/include/uapi/linux/virtio_vsock.h
>+++ b/include/uapi/linux/virtio_vsock.h
>@@ -65,6 +65,7 @@ struct virtio_vsock_hdr {
>
> enum virtio_vsock_type {
> VIRTIO_VSOCK_TYPE_STREAM = 1,
>+ VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
> };
>
> enum virtio_vsock_op {
>@@ -83,6 +84,9 @@ enum virtio_vsock_op {
> VIRTIO_VSOCK_OP_CREDIT_UPDATE = 6,
> /* Request the peer to send the credit info to us */
> VIRTIO_VSOCK_OP_CREDIT_REQUEST = 7,
>+
>+ /* Record begin for SOCK_SEQPACKET */
>+ VIRTIO_VSOCK_OP_SEQ_BEGIN = 8,
> };
>
> /* VIRTIO_VSOCK_OP_SHUTDOWN flags values */
>@@ -91,4 +95,9 @@ enum virtio_vsock_shutdown {
> VIRTIO_VSOCK_SHUTDOWN_SEND = 2,
> };
>
>+/* VIRTIO_VSOCK_OP_RW flags values for SOCK_SEQPACKET type */
>+enum virtio_vsock_rw_seqpacket {
>+ VIRTIO_VSOCK_RW_EOR = 1,
>+};
>+
> #endif /* _UAPI_LINUX_VIRTIO_VSOCK_H */
>diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
>index 2700a63ab095..2bd3f7cbffcb 100644
>--- a/net/vmw_vsock/virtio_transport.c
>+++ b/net/vmw_vsock/virtio_transport.c
>@@ -469,6 +469,9 @@ static struct virtio_transport virtio_transport = {
> .stream_is_active = virtio_transport_stream_is_active,
> .stream_allow = virtio_transport_stream_allow,
>
>+ .seqpacket_seq_send_len = virtio_transport_seqpacket_seq_send_len,
>+ .seqpacket_seq_get_len = virtio_transport_seqpacket_seq_get_len,
>+
> .notify_poll_in = virtio_transport_notify_poll_in,
> .notify_poll_out = virtio_transport_notify_poll_out,
> .notify_recv_init = virtio_transport_notify_recv_init,
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index 5956939eebb7..77c42004e422 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -139,6 +139,7 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
> break;
> case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
> case VIRTIO_VSOCK_OP_CREDIT_REQUEST:
>+ case VIRTIO_VSOCK_OP_SEQ_BEGIN:
> hdr->op = cpu_to_le16(AF_VSOCK_OP_CONTROL);
> break;
> default:
>@@ -157,6 +158,10 @@ static struct sk_buff *virtio_transport_build_skb(void *opaque)
>
> void virtio_transport_deliver_tap_pkt(struct virtio_vsock_pkt *pkt)
> {
>+ /* TODO: implement tap support for SOCK_SEQPACKET. */
>+ if (le32_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET)
>+ return;
>+
> if (pkt->tap_delivered)
> return;
>
>@@ -230,10 +235,10 @@ static bool virtio_transport_inc_rx_pkt(struct virtio_vsock_sock *vvs,
> }
>
> static void virtio_transport_dec_rx_pkt(struct virtio_vsock_sock *vvs,
>- struct virtio_vsock_pkt *pkt)
>+ u32 len)
> {
>- vvs->rx_bytes -= pkt->len;
>- vvs->fwd_cnt += pkt->len;
>+ vvs->rx_bytes -= len;
>+ vvs->fwd_cnt += len;
> }
>
> void virtio_transport_inc_tx_pkt(struct virtio_vsock_sock *vvs, struct virtio_vsock_pkt *pkt)
>@@ -365,7 +370,7 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> total += bytes;
> pkt->off += bytes;
> if (pkt->off == pkt->len) {
>- virtio_transport_dec_rx_pkt(vvs, pkt);
>+ virtio_transport_dec_rx_pkt(vvs, pkt->len);
> list_del(&pkt->list);
> virtio_transport_free_pkt(pkt);
> }
>@@ -397,15 +402,202 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> return err;
> }
>
>+static u16 virtio_transport_get_type(struct sock *sk)
>+{
>+ if (sk->sk_type == SOCK_STREAM)
>+ return VIRTIO_VSOCK_TYPE_STREAM;
>+ else
>+ return VIRTIO_VSOCK_TYPE_SEQPACKET;
>+}
>+
>+bool virtio_transport_seqpacket_seq_send_len(struct vsock_sock *vsk, size_t len)
>+{
>+ struct virtio_vsock_pkt_info info = {
>+ .type = VIRTIO_VSOCK_TYPE_SEQPACKET,
>+ .op = VIRTIO_VSOCK_OP_SEQ_BEGIN,
>+ .vsk = vsk,
>+ .flags = len
>+ };
>+
>+ return virtio_transport_send_pkt_info(vsk, &info);
>+}
>+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_send_len);
>+
>+static inline void virtio_transport_del_n_free_pkt(struct virtio_vsock_pkt *pkt)
>+{
>+ list_del(&pkt->list);
>+ virtio_transport_free_pkt(pkt);
>+}
>+
>+static size_t virtio_transport_drop_until_seq_begin(struct virtio_vsock_sock *vvs)
>+{
>+ struct virtio_vsock_pkt *pkt, *n;
>+ size_t bytes_dropped = 0;
>+
>+ list_for_each_entry_safe(pkt, n, &vvs->rx_queue, list) {
>+ if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_SEQ_BEGIN)
>+ break;
>+
>+ bytes_dropped += le32_to_cpu(pkt->hdr.len);
>+ virtio_transport_dec_rx_pkt(vvs, pkt->len);
>+ virtio_transport_del_n_free_pkt(pkt);
>+ }
>+
>+ return bytes_dropped;
>+}
>+
>+size_t virtio_transport_seqpacket_seq_get_len(struct vsock_sock *vsk)
>+{
>+ struct virtio_vsock_sock *vvs = vsk->trans;
>+ struct virtio_vsock_pkt *pkt;
>+ size_t bytes_dropped;
>+
>+ spin_lock_bh(&vvs->rx_lock);
>+
>+ /* Fetch all orphaned 'RW', packets, and
>+ * send credit update.
>+ */
>+ bytes_dropped = virtio_transport_drop_until_seq_begin(vvs);
>+
>+ if (list_empty(&vvs->rx_queue))
>+ goto out;
>+
>+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>+
>+ vvs->user_read_copied = 0;
>+ vvs->user_read_seq_len = le32_to_cpu(pkt->hdr.flags);
>+ virtio_transport_del_n_free_pkt(pkt);
>+out:
>+ spin_unlock_bh(&vvs->rx_lock);
>+
>+ if (bytes_dropped)
>+ virtio_transport_send_credit_update(vsk,
>+ VIRTIO_VSOCK_TYPE_SEQPACKET,
>+ NULL);
>+
>+ return vvs->user_read_seq_len;
>+}
>+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_seq_get_len);
>+
>+static ssize_t virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>+ struct msghdr *msg,
>+ size_t user_buf_len)
>+{
>+ struct virtio_vsock_sock *vvs = vsk->trans;
>+ struct virtio_vsock_pkt *pkt;
>+ size_t bytes_handled = 0;
>+ int err = 0;
>+
>+ spin_lock_bh(&vvs->rx_lock);
>+
>+ if (user_buf_len == 0) {
>+ /* User's buffer is full, we processing rest of
>+ * record and drop it. If 'SEQ_BEGIN' is found
>+ * while iterating, user will be woken up,
>+ * because record is already copied, and we
>+ * don't care about absent of some tail RW packets
>+ * of it. Return number of bytes(rest of record),
>+ * but ignore credit update for such absent bytes.
>+ */
>+ bytes_handled = virtio_transport_drop_until_seq_begin(vvs);
>+ vvs->user_read_copied += bytes_handled;
>+
>+ if (!list_empty(&vvs->rx_queue) &&
>+ vvs->user_read_copied < vvs->user_read_seq_len) {
>+ /* 'SEQ_BEGIN' found, but record isn't complete.
>+ * Set number of copied bytes to fit record size
>+ * and force counters to finish receiving.
>+ */
>+ bytes_handled += (vvs->user_read_seq_len - vvs->user_read_copied);
>+ vvs->user_read_copied = vvs->user_read_seq_len;
>+ }
>+ }
>+
>+ /* Now start copying. */
>+ while (vvs->user_read_copied < vvs->user_read_seq_len &&
>+ vvs->rx_bytes &&
>+ user_buf_len &&
>+ !err) {
>+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>+
>+ switch (le16_to_cpu(pkt->hdr.op)) {
>+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
>+ /* Unexpected 'SEQ_BEGIN' during record copy:
>+ * Leave receive loop, 'EAGAIN' will restart it from
>+ * outer receive loop, packet is still in queue and
>+ * counters are cleared. So in next loop enter,
>+ * 'SEQ_BEGIN' will be dequeued first. User's iov
>+ * iterator will be reset in outer loop. Also
>+ * send credit update, because some bytes could be
>+ * copied. User will never see unfinished record.
>+ */
>+ err = -EAGAIN;
>+ break;
>+ }
>+ case VIRTIO_VSOCK_OP_RW: {
>+ size_t bytes_to_copy;
>+ size_t pkt_len;
>+
>+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>+ bytes_to_copy = min(user_buf_len, pkt_len);
>+
>+ /* sk_lock is held by caller so no one else can dequeue.
>+ * Unlock rx_lock since memcpy_to_msg() may sleep.
>+ */
>+ spin_unlock_bh(&vvs->rx_lock);
>+
>+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
>+ spin_lock_bh(&vvs->rx_lock);
>+ err = -EINVAL;
>+ break;
>+ }
>+
>+ spin_lock_bh(&vvs->rx_lock);
>+ user_buf_len -= bytes_to_copy;
>+ bytes_handled += pkt->len;
>+ vvs->user_read_copied += bytes_to_copy;
>+
>+ if (le16_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_RW_EOR)
>+ msg->msg_flags |= MSG_EOR;
>+ break;
>+ }
>+ default:
>+ ;
>+ }
>+
>+ /* For unexpected 'SEQ_BEGIN', keep such packet in queue,
>+ * but drop any other type of packet.
>+ */
>+ if (le16_to_cpu(pkt->hdr.op) != VIRTIO_VSOCK_OP_SEQ_BEGIN) {
>+ virtio_transport_dec_rx_pkt(vvs, pkt->len);
>+ virtio_transport_del_n_free_pkt(pkt);
>+ }
>+ }
>+
>+ spin_unlock_bh(&vvs->rx_lock);
>+
>+ virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_SEQPACKET,
>+ NULL);
>+
>+ return err ?: bytes_handled;
>+}
>+
> ssize_t
> virtio_transport_stream_dequeue(struct vsock_sock *vsk,
> struct msghdr *msg,
> size_t len, int flags)
> {
>- if (flags & MSG_PEEK)
>- return virtio_transport_stream_do_peek(vsk, msg, len);
>- else
>+ if (virtio_transport_get_type(sk_vsock(vsk)) == VIRTIO_VSOCK_TYPE_SEQPACKET) {
>+ if (flags & MSG_PEEK)
>+ return -EOPNOTSUPP;
>+
>+ return virtio_transport_seqpacket_do_dequeue(vsk, msg, len);
>+ } else {
>+ if (flags & MSG_PEEK)
>+ return virtio_transport_stream_do_peek(vsk, msg,
>len);
>+
> return virtio_transport_stream_do_dequeue(vsk, msg, len);
>+ }
Maybe is better to create two different functions here since we are
using two complete different paths:
virtio_transport_stream_dequeue()
virtio_transport_seqpacket_dequeue()
And adding a new 'seqpacket_dequeue' callback in the transport.
> }
> EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
>
>@@ -481,6 +673,8 @@ int virtio_transport_do_socket_init(struct vsock_sock *vsk,
> spin_lock_init(&vvs->rx_lock);
> spin_lock_init(&vvs->tx_lock);
> INIT_LIST_HEAD(&vvs->rx_queue);
>+ vvs->user_read_copied = 0;
>+ vvs->user_read_seq_len = 0;
>
> return 0;
> }
>@@ -490,13 +684,16 @@ EXPORT_SYMBOL_GPL(virtio_transport_do_socket_init);
> void virtio_transport_notify_buffer_size(struct vsock_sock *vsk, u64 *val)
> {
> struct virtio_vsock_sock *vvs = vsk->trans;
>+ int type;
>
> if (*val > VIRTIO_VSOCK_MAX_BUF_SIZE)
> *val = VIRTIO_VSOCK_MAX_BUF_SIZE;
>
> vvs->buf_alloc = *val;
>
>- virtio_transport_send_credit_update(vsk, VIRTIO_VSOCK_TYPE_STREAM,
>+ type = virtio_transport_get_type(sk_vsock(vsk));
>+
>+ virtio_transport_send_credit_update(vsk, type,
> NULL);
This line can now be merged with the previous one.
> }
> EXPORT_SYMBOL_GPL(virtio_transport_notify_buffer_size);
>@@ -624,10 +821,11 @@ int virtio_transport_connect(struct vsock_sock *vsk)
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_REQUEST,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .vsk = vsk,
> };
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_connect);
>@@ -636,7 +834,6 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_SHUTDOWN,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .flags = (mode & RCV_SHUTDOWN ?
> VIRTIO_VSOCK_SHUTDOWN_RCV : 0) |
> (mode & SEND_SHUTDOWN ?
>@@ -644,6 +841,8 @@ int virtio_transport_shutdown(struct vsock_sock *vsk, int mode)
> .vsk = vsk,
> };
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_shutdown);
>@@ -665,12 +864,18 @@ virtio_transport_stream_enqueue(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RW,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .msg = msg,
> .pkt_len = len,
> .vsk = vsk,
>+ .flags = 0,
> };
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
>+ if (info.type == VIRTIO_VSOCK_TYPE_SEQPACKET &&
>+ msg->msg_flags & MSG_EOR)
>+ info.flags |= VIRTIO_VSOCK_RW_EOR;
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
> EXPORT_SYMBOL_GPL(virtio_transport_stream_enqueue);
>@@ -688,7 +893,6 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RST,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .reply = !!pkt,
> .vsk = vsk,
> };
>@@ -697,6 +901,8 @@ static int virtio_transport_reset(struct vsock_sock *vsk,
> if (pkt && le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RST)
> return 0;
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
>
>@@ -884,44 +1090,59 @@ virtio_transport_recv_connecting(struct sock *sk,
> return err;
> }
>
>-static void
>+static bool
> virtio_transport_recv_enqueue(struct vsock_sock *vsk,
> struct virtio_vsock_pkt *pkt)
> {
> struct virtio_vsock_sock *vvs = vsk->trans;
>- bool can_enqueue, free_pkt = false;
>+ bool data_ready = false;
>+ bool free_pkt = false;
>
>- pkt->len = le32_to_cpu(pkt->hdr.len);
> pkt->off = 0;
>+ pkt->len = le32_to_cpu(pkt->hdr.len);
>
> spin_lock_bh(&vvs->rx_lock);
>
>- can_enqueue = virtio_transport_inc_rx_pkt(vvs, pkt);
>- if (!can_enqueue) {
>- free_pkt = true;
>- goto out;
>- }
>+ switch (le32_to_cpu(pkt->hdr.type)) {
>+ case VIRTIO_VSOCK_TYPE_STREAM: {
>+ if (!virtio_transport_inc_rx_pkt(vvs, pkt)) {
>+ free_pkt = true;
>+ goto out;
>+ }
>
>- /* Try to copy small packets into the buffer of last packet queued,
>- * to avoid wasting memory queueing the entire buffer with a small
>- * payload.
>- */
>- if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
>- struct virtio_vsock_pkt *last_pkt;
>+ /* Try to copy small packets into the buffer of last packet queued,
>+ * to avoid wasting memory queueing the entire buffer with a small
>+ * payload.
>+ */
>+ if (pkt->len <= GOOD_COPY_LEN && !list_empty(&vvs->rx_queue)) {
>+ struct virtio_vsock_pkt *last_pkt;
>
>- last_pkt = list_last_entry(&vvs->rx_queue,
>- struct virtio_vsock_pkt, list);
>+ last_pkt = list_last_entry(&vvs->rx_queue,
>+ struct virtio_vsock_pkt, list);
>
>- /* If there is space in the last packet queued, we copy the
>- * new packet in its buffer.
>- */
>- if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
>- memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
>- pkt->len);
>- last_pkt->len += pkt->len;
>- free_pkt = true;
>- goto out;
>+ /* If there is space in the last packet queued, we copy the
>+ * new packet in its buffer.
>+ */
>+ if (pkt->len <= last_pkt->buf_len - last_pkt->len) {
>+ memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
>+ pkt->len);
>+ last_pkt->len += pkt->len;
>+ free_pkt = true;
>+ goto out;
>+ }
> }
>+
>+ data_ready = true;
>+ break;
>+ }
>+
^
Unnecessary line break.
>+ case VIRTIO_VSOCK_TYPE_SEQPACKET: {
>+ data_ready = true;
>+ vvs->rx_bytes += pkt->len;
Why not using virtio_transport_inc_rx_pkt()?
>+ break;
>+ }
>+ default:
>+ goto out;
> }
>
> list_add_tail(&pkt->list, &vvs->rx_queue);
>@@ -930,6 +1151,8 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
> spin_unlock_bh(&vvs->rx_lock);
> if (free_pkt)
> virtio_transport_free_pkt(pkt);
>+
>+ return data_ready;
> }
>
> static int
>@@ -940,9 +1163,17 @@ virtio_transport_recv_connected(struct sock *sk,
> int err = 0;
>
> switch (le16_to_cpu(pkt->hdr.op)) {
>+ case VIRTIO_VSOCK_OP_SEQ_BEGIN: {
>+ struct virtio_vsock_sock *vvs = vsk->trans;
>+
>+ spin_lock_bh(&vvs->rx_lock);
>+ list_add_tail(&pkt->list, &vvs->rx_queue);
>+ spin_unlock_bh(&vvs->rx_lock);
>+ return err;
>+ }
> case VIRTIO_VSOCK_OP_RW:
>- virtio_transport_recv_enqueue(vsk, pkt);
>- sk->sk_data_ready(sk);
>+ if (virtio_transport_recv_enqueue(vsk, pkt))
>+ sk->sk_data_ready(sk);
> return err;
> case VIRTIO_VSOCK_OP_CREDIT_UPDATE:
> sk->sk_write_space(sk);
>@@ -990,13 +1221,14 @@ virtio_transport_send_response(struct vsock_sock *vsk,
> {
> struct virtio_vsock_pkt_info info = {
> .op = VIRTIO_VSOCK_OP_RESPONSE,
>- .type = VIRTIO_VSOCK_TYPE_STREAM,
> .remote_cid = le64_to_cpu(pkt->hdr.src_cid),
> .remote_port = le32_to_cpu(pkt->hdr.src_port),
> .reply = true,
> .vsk = vsk,
> };
>
>+ info.type = virtio_transport_get_type(sk_vsock(vsk));
>+
> return virtio_transport_send_pkt_info(vsk, &info);
> }
>
>@@ -1086,6 +1318,12 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
> return 0;
> }
>
>+static bool virtio_transport_valid_type(u16 type)
>+{
>+ return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
>+ (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
>+}
>+
> /* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
> * lock.
> */
>@@ -1111,7 +1349,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
> le32_to_cpu(pkt->hdr.buf_alloc),
> le32_to_cpu(pkt->hdr.fwd_cnt));
>
>- if (le16_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_STREAM) {
>+ if (!virtio_transport_valid_type(le16_to_cpu(pkt->hdr.type))) {
> (void)virtio_transport_reset_no_sock(t, pkt);
> goto free_pkt;
> }
>@@ -1128,6 +1366,9 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
> }
> }
>
>+ if (virtio_transport_get_type(sk) != le16_to_cpu(pkt->hdr.type))
>+ goto free_pkt;
>+
> vsk = vsock_sk(sk);
>
> space_available = virtio_transport_space_update(sk, pkt);
>--
>2.25.1
>
Hi Arseny,
On Sun, Jan 03, 2021 at 10:54:52PM +0300, Arseny Krasnov wrote:
> As SOCK_SEQPACKET guarantees to save record boundaries, so to
>do it, new packet operation was added: it marks start of record (with
>record length in header). To send record, packet with start marker is
>sent first, then all data is transmitted as 'RW' packets. On receiver's
>side, length of record is known from packet with start record marker.
>Now as packets of one socket are not reordered neither on vsock nor on
>vhost transport layers, these marker allows to restore original record
>on receiver's side. When each 'RW' packet is inserted to rx queue of
>receiver, user is woken up, data is copied to user's buffer and credit
>update message is sent. If there is no user waiting for data, credit
>won't be updated and sender will wait. Also, if user's buffer is full,
>and record is bigger, all unneeded data will be dropped (with sending of
>credit update message).
> 'MSG_EOR' flag is implemented with special value of 'flags' field
>in packet header. When record is received with such flags, 'MSG_EOR' is
>set in 'recvmsg()' flags. 'MSG_TRUNC' flag is also supported.
> In this implementation maximum length of datagram is not limited
>as in stream socket.
I did a a quick review. I like the idea of adding SOCK_SEQPACKET, but
the series needs more work.
Some patches miss the SoB, the commit messages are very minimal.
Anyway I like that you shared your patches, but please use RFC tag if
they are not ready to be merged.
Another suggestion is to move the patches that modify the core
(af_vsock.c) before the transport modifications to make the review
easier.
I'd also like to see new tests in tools/testing/vsock/vsock_test.c
Thanks,
Stefano
> Hmm, are you sure you need to convert
> "err" to the pointer, just to return true/false
> as the return value?
> How about still returning "err" itself?
In this case i need to reserve some value for
"err" as success, because both 0 and negative
values are passed to caller when this function
returns false(check failed). May be i will
inline this function.
> Its not very clear (only for me perhaps) how
> dequeue_total and len correlate. Are they
> equal here? Would you need to check that
> dequeued_total >= record_len?
> I mean, its just a bit strange that you check
> dequeued_total>0 and no longer use that var
> inside the block.
When "dequeued_total > 0" record copy is succeed.
"len" is length of user buffer. I think i can
replace "dequeued_total" to some flag, like "error",
because in SOCK_SEQPACKET mode record could be
copied whole or error returned.