In virtio-spec 1.1, new feature bit VIRTIO_F_IN_ORDER was introduced.
When this feature has been negotiated, virtio driver will use
descriptors in ring order: starting from offset 0 in the table, and
wrapping around at the end of the table. Vhost devices will always use
descriptors in the same order in which they have been made available.
This can reduce virtio accesses to used ring.
Based on updated virtio-spec, this series realized IN_ORDER prototype
in virtio driver and vhost. Currently IN_ORDER feature supported devices
are *vhost_test* and *vsock*, and IN_ORDER feature works well combined with
INDIRECT feature in this patch series.
Some work haven't been done in this patch series:
1. Virtio driver in_order support for packed vq is left for the future.
Guo Zhi (7):
vhost: expose used buffers
vhost_test: batch used buffer
vsock: batch buffers in tx
vsock: announce VIRTIO_F_IN_ORDER in vsock
virtio: unmask F_NEXT flag in desc_extra
virtio: in order support for virtio_ring
virtio: annouce VIRTIO_F_IN_ORDER support
drivers/vhost/test.c | 8 ++++-
drivers/vhost/vhost.c | 14 +++++++--
drivers/vhost/vhost.h | 1 +
drivers/vhost/vsock.c | 10 +++++-
drivers/virtio/virtio_ring.c | 61 ++++++++++++++++++++++++++++++------
5 files changed, 80 insertions(+), 14 deletions(-)
--
2.17.1
Vsock uses buffers in order, and for tx driver doesn't have to
know the length of the buffer. So we can do a batch for vsock if
in order negotiated, only write one used ring for a batch of buffers
Signed-off-by: Guo Zhi <[email protected]>
---
drivers/vhost/vsock.c | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index 368330417bde..b0108009c39a 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -500,6 +500,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
int head, pkts = 0, total_len = 0;
unsigned int out, in;
bool added = false;
+ int last_head = -1;
mutex_lock(&vq->mutex);
@@ -551,10 +552,16 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
else
virtio_transport_free_pkt(pkt);
- vhost_add_used(vq, head, 0);
+ if (!vhost_has_feature(vq, VIRTIO_F_IN_ORDER))
+ vhost_add_used(vq, head, 0);
+ else
+ last_head = head;
added = true;
} while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
+ /* If in order feature negotiaged, we can do a batch to increase performance */
+ if (vhost_has_feature(vq, VIRTIO_F_IN_ORDER) && last_head != -1)
+ vhost_add_used(vq, last_head, 0);
no_more_replies:
if (added)
vhost_signal(&vsock->dev, vq);
--
2.17.1
In order feature is used by vsock now, since vsock already use buffer in
order.
Signed-off-by: Guo Zhi <[email protected]>
---
drivers/vhost/vsock.c | 1 +
1 file changed, 1 insertion(+)
diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index b0108009c39a..3dd3a2a27d27 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -32,6 +32,7 @@
enum {
VHOST_VSOCK_FEATURES = VHOST_FEATURES |
(1ULL << VIRTIO_F_ACCESS_PLATFORM) |
+ (1ULL << VIRTIO_F_IN_ORDER) |
(1ULL << VIRTIO_VSOCK_F_SEQPACKET)
};
--
2.17.1
In order feature is supported by default in virtio.
Signed-off-by: Guo Zhi <[email protected]>
---
drivers/virtio/virtio_ring.c | 2 ++
1 file changed, 2 insertions(+)
diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
index 143184ebb5a1..50aa361abd65 100644
--- a/drivers/virtio/virtio_ring.c
+++ b/drivers/virtio/virtio_ring.c
@@ -2416,6 +2416,8 @@ void vring_transport_features(struct virtio_device *vdev)
break;
case VIRTIO_F_ORDER_PLATFORM:
break;
+ case VIRTIO_F_IN_ORDER:
+ break;
default:
/* We don't understand this bit. */
__virtio_clear_bit(vdev, i);
--
2.17.1
We didn't unmask F_NEXT flag in desc_extra in the end of a chain,
unmask it so that we can access desc_extra to get next information.
Signed-off-by: Guo Zhi <[email protected]>
---
drivers/virtio/virtio_ring.c | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
index a5ec724c01d8..1c1b3fa376a2 100644
--- a/drivers/virtio/virtio_ring.c
+++ b/drivers/virtio/virtio_ring.c
@@ -567,7 +567,7 @@ static inline int virtqueue_add_split(struct virtqueue *_vq,
}
/* Last one doesn't continue. */
desc[prev].flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT);
- if (!indirect && vq->use_dma_api)
+ if (!indirect)
vq->split.desc_extra[prev & (vq->split.vring.num - 1)].flags &=
~VRING_DESC_F_NEXT;
@@ -584,6 +584,8 @@ static inline int virtqueue_add_split(struct virtqueue *_vq,
total_sg * sizeof(struct vring_desc),
VRING_DESC_F_INDIRECT,
false);
+ vq->split.desc_extra[head & (vq->split.vring.num - 1)].flags &=
+ ~VRING_DESC_F_NEXT;
}
/* We're using some buffers from the free list. */
@@ -693,7 +695,7 @@ static void detach_buf_split(struct vring_virtqueue *vq, unsigned int head,
/* Put back on free list: unmap first-level descriptors and find end */
i = head;
- while (vq->split.vring.desc[i].flags & nextflag) {
+ while (vq->split.desc_extra[i].flags & nextflag) {
vring_unmap_one_split(vq, i);
i = vq->split.desc_extra[i].next;
vq->vq.num_free++;
--
2.17.1
Only add to used ring when a batch of buffer have all been used. And if
in order feature negotiated, only add the last used descriptor for a
batch of buffer.
Signed-off-by: Guo Zhi <[email protected]>
---
drivers/vhost/test.c | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/drivers/vhost/test.c b/drivers/vhost/test.c
index bc8e7fb1e635..57cdb3a3edf6 100644
--- a/drivers/vhost/test.c
+++ b/drivers/vhost/test.c
@@ -43,6 +43,9 @@ struct vhost_test {
static void handle_vq(struct vhost_test *n)
{
struct vhost_virtqueue *vq = &n->vqs[VHOST_TEST_VQ];
+ struct vring_used_elem *heads = kmalloc(sizeof(*heads)
+ * vq->num, GFP_KERNEL);
+ int batch_idx = 0;
unsigned out, in;
int head;
size_t len, total_len = 0;
@@ -84,11 +87,14 @@ static void handle_vq(struct vhost_test *n)
vq_err(vq, "Unexpected 0 len for TX\n");
break;
}
- vhost_add_used_and_signal(&n->dev, vq, head, 0);
+ heads[batch_idx].id = cpu_to_vhost32(vq, head);
+ heads[batch_idx++].len = cpu_to_vhost32(vq, len);
total_len += len;
if (unlikely(vhost_exceeds_weight(vq, 0, total_len)))
break;
}
+ if (batch_idx)
+ vhost_add_used_and_signal_n(&n->dev, vq, heads, batch_idx);
mutex_unlock(&vq->mutex);
}
--
2.17.1
If in order feature negotiated, we can skip the used ring to get
buffer's desc id sequentially.
Signed-off-by: Guo Zhi <[email protected]>
---
drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
1 file changed, 45 insertions(+), 8 deletions(-)
diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
index 1c1b3fa376a2..143184ebb5a1 100644
--- a/drivers/virtio/virtio_ring.c
+++ b/drivers/virtio/virtio_ring.c
@@ -144,6 +144,9 @@ struct vring_virtqueue {
/* DMA address and size information */
dma_addr_t queue_dma_addr;
size_t queue_size_in_bytes;
+
+ /* In order feature batch begin here */
+ u16 next_desc_begin;
} split;
/* Available for packed ring */
@@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq, unsigned int head,
}
vring_unmap_one_split(vq, i);
- vq->split.desc_extra[i].next = vq->free_head;
- vq->free_head = head;
+ /* In order feature use desc in order,
+ * that means, the next desc will always be free
+ */
+ if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
+ vq->split.desc_extra[i].next = vq->free_head;
+ vq->free_head = head;
+ }
/* Plus final descriptor */
vq->vq.num_free++;
@@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
{
struct vring_virtqueue *vq = to_vvq(_vq);
void *ret;
- unsigned int i;
+ unsigned int i, j;
u16 last_used;
START_USE(vq);
@@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
/* Only get used array entries after they have been exposed by host. */
virtio_rmb(vq->weak_barriers);
- last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
- i = virtio32_to_cpu(_vq->vdev,
- vq->split.vring.used->ring[last_used].id);
- *len = virtio32_to_cpu(_vq->vdev,
- vq->split.vring.used->ring[last_used].len);
+ if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
+ /* Skip used ring and get used desc in order*/
+ i = vq->split.next_desc_begin;
+ j = i;
+ /* Indirect only takes one descriptor in descriptor table */
+ while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
+ j = (j + 1) % vq->split.vring.num;
+ /* move to next */
+ j = (j + 1) % vq->split.vring.num;
+ /* Next buffer will use this descriptor in order */
+ vq->split.next_desc_begin = j;
+ if (!vq->indirect) {
+ *len = vq->split.desc_extra[i].len;
+ } else {
+ struct vring_desc *indir_desc =
+ vq->split.desc_state[i].indir_desc;
+ u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
+
+ if (indir_desc) {
+ for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
+ buffer_len += indir_desc[j].len;
+ }
+
+ *len = buffer_len;
+ }
+ } else {
+ last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
+ i = virtio32_to_cpu(_vq->vdev,
+ vq->split.vring.used->ring[last_used].id);
+ *len = virtio32_to_cpu(_vq->vdev,
+ vq->split.vring.used->ring[last_used].len);
+ }
if (unlikely(i >= vq->split.vring.num)) {
BAD_RING(vq, "id %u out of range\n", i);
@@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int index,
vq->split.avail_flags_shadow = 0;
vq->split.avail_idx_shadow = 0;
+ vq->split.next_desc_begin = 0;
+
/* No callback? Tell other side not to bother us. */
if (!callback) {
vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
--
2.17.1
----- Original Message -----
> From: "Xuan Zhuo" <[email protected]>
> To: "Guo Zhi" <[email protected]>
> Cc: "netdev" <[email protected]>, "linux-kernel" <[email protected]>, "kvm list" <[email protected]>,
> "virtualization" <[email protected]>, "Guo Zhi" <[email protected]>, "eperezma"
> <[email protected]>, "jasowang" <[email protected]>, "sgarzare" <[email protected]>, "Michael Tsirkin"
> <[email protected]>
> Sent: Thursday, August 18, 2022 11:05:37 AM
> Subject: Re: [RFC v2 5/7] virtio: unmask F_NEXT flag in desc_extra
> On Wed, 17 Aug 2022 21:57:16 +0800, Guo Zhi <[email protected]> wrote:
>> We didn't unmask F_NEXT flag in desc_extra in the end of a chain,
>> unmask it so that we can access desc_extra to get next information.
>
> I think we should state the purpose of this.
>
I have to unmask F_NEXT flag in desc_extra in this series, because if in order
negotiated, the driver has to iterate the descriptor chain to get chain length
from desc_extra. (The reason why we should use desc_extra is that descs may be
changed by malicious hypervisors, https://lkml.org/lkml/2022/7/26/224).
>>
>> Signed-off-by: Guo Zhi <[email protected]>
>> ---
>> drivers/virtio/virtio_ring.c | 6 ++++--
>> 1 file changed, 4 insertions(+), 2 deletions(-)
>>
>> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
>> index a5ec724c01d8..1c1b3fa376a2 100644
>> --- a/drivers/virtio/virtio_ring.c
>> +++ b/drivers/virtio/virtio_ring.c
>> @@ -567,7 +567,7 @@ static inline int virtqueue_add_split(struct virtqueue *_vq,
>> }
>> /* Last one doesn't continue. */
>> desc[prev].flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT);
>> - if (!indirect && vq->use_dma_api)
>> + if (!indirect)
>> vq->split.desc_extra[prev & (vq->split.vring.num - 1)].flags &=
>> ~VRING_DESC_F_NEXT;
>>
>> @@ -584,6 +584,8 @@ static inline int virtqueue_add_split(struct virtqueue *_vq,
>> total_sg * sizeof(struct vring_desc),
>> VRING_DESC_F_INDIRECT,
>> false);
>> + vq->split.desc_extra[head & (vq->split.vring.num - 1)].flags &=
>> + ~VRING_DESC_F_NEXT;
>
> This seems unnecessary.
>
>> }
>>
>> /* We're using some buffers from the free list. */
>> @@ -693,7 +695,7 @@ static void detach_buf_split(struct vring_virtqueue *vq,
>> unsigned int head,
>> /* Put back on free list: unmap first-level descriptors and find end */
>> i = head;
>>
>> - while (vq->split.vring.desc[i].flags & nextflag) {
>> + while (vq->split.desc_extra[i].flags & nextflag) {
>
> nextflag is __virtio16
>
> You can use VRING_DESC_F_NEXT directly.
>
> Thanks.
Sorry for the mistake, I will fix it.
>
>> vring_unmap_one_split(vq, i);
>> i = vq->split.desc_extra[i].next;
>> vq->vq.num_free++;
>> --
>> 2.17.1
>>
On Wed, 17 Aug 2022 21:57:17 +0800, Guo Zhi <[email protected]> wrote:
> If in order feature negotiated, we can skip the used ring to get
> buffer's desc id sequentially.
>
> Signed-off-by: Guo Zhi <[email protected]>
> ---
> drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
> 1 file changed, 45 insertions(+), 8 deletions(-)
>
> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
> index 1c1b3fa376a2..143184ebb5a1 100644
> --- a/drivers/virtio/virtio_ring.c
> +++ b/drivers/virtio/virtio_ring.c
> @@ -144,6 +144,9 @@ struct vring_virtqueue {
> /* DMA address and size information */
> dma_addr_t queue_dma_addr;
> size_t queue_size_in_bytes;
> +
> + /* In order feature batch begin here */
> + u16 next_desc_begin;
> } split;
>
> /* Available for packed ring */
> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq, unsigned int head,
> }
>
> vring_unmap_one_split(vq, i);
> - vq->split.desc_extra[i].next = vq->free_head;
> - vq->free_head = head;
> + /* In order feature use desc in order,
> + * that means, the next desc will always be free
> + */
> + if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
Call virtio_has_feature() here is not good.
Thanks.
> + vq->split.desc_extra[i].next = vq->free_head;
> + vq->free_head = head;
> + }
>
> /* Plus final descriptor */
> vq->vq.num_free++;
> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
> {
> struct vring_virtqueue *vq = to_vvq(_vq);
> void *ret;
> - unsigned int i;
> + unsigned int i, j;
> u16 last_used;
>
> START_USE(vq);
> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
> /* Only get used array entries after they have been exposed by host. */
> virtio_rmb(vq->weak_barriers);
>
> - last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
> - i = virtio32_to_cpu(_vq->vdev,
> - vq->split.vring.used->ring[last_used].id);
> - *len = virtio32_to_cpu(_vq->vdev,
> - vq->split.vring.used->ring[last_used].len);
> + if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
> + /* Skip used ring and get used desc in order*/
> + i = vq->split.next_desc_begin;
> + j = i;
> + /* Indirect only takes one descriptor in descriptor table */
> + while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
> + j = (j + 1) % vq->split.vring.num;
> + /* move to next */
> + j = (j + 1) % vq->split.vring.num;
> + /* Next buffer will use this descriptor in order */
> + vq->split.next_desc_begin = j;
> + if (!vq->indirect) {
> + *len = vq->split.desc_extra[i].len;
> + } else {
> + struct vring_desc *indir_desc =
> + vq->split.desc_state[i].indir_desc;
> + u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
> +
> + if (indir_desc) {
> + for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
> + buffer_len += indir_desc[j].len;
> + }
> +
> + *len = buffer_len;
> + }
> + } else {
> + last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
> + i = virtio32_to_cpu(_vq->vdev,
> + vq->split.vring.used->ring[last_used].id);
> + *len = virtio32_to_cpu(_vq->vdev,
> + vq->split.vring.used->ring[last_used].len);
> + }
>
> if (unlikely(i >= vq->split.vring.num)) {
> BAD_RING(vq, "id %u out of range\n", i);
> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int index,
> vq->split.avail_flags_shadow = 0;
> vq->split.avail_idx_shadow = 0;
>
> + vq->split.next_desc_begin = 0;
> +
> /* No callback? Tell other side not to bother us. */
> if (!callback) {
> vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
> --
> 2.17.1
>
On Wed, 17 Aug 2022 21:57:16 +0800, Guo Zhi <[email protected]> wrote:
> We didn't unmask F_NEXT flag in desc_extra in the end of a chain,
> unmask it so that we can access desc_extra to get next information.
I think we should state the purpose of this.
>
> Signed-off-by: Guo Zhi <[email protected]>
> ---
> drivers/virtio/virtio_ring.c | 6 ++++--
> 1 file changed, 4 insertions(+), 2 deletions(-)
>
> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
> index a5ec724c01d8..1c1b3fa376a2 100644
> --- a/drivers/virtio/virtio_ring.c
> +++ b/drivers/virtio/virtio_ring.c
> @@ -567,7 +567,7 @@ static inline int virtqueue_add_split(struct virtqueue *_vq,
> }
> /* Last one doesn't continue. */
> desc[prev].flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT);
> - if (!indirect && vq->use_dma_api)
> + if (!indirect)
> vq->split.desc_extra[prev & (vq->split.vring.num - 1)].flags &=
> ~VRING_DESC_F_NEXT;
>
> @@ -584,6 +584,8 @@ static inline int virtqueue_add_split(struct virtqueue *_vq,
> total_sg * sizeof(struct vring_desc),
> VRING_DESC_F_INDIRECT,
> false);
> + vq->split.desc_extra[head & (vq->split.vring.num - 1)].flags &=
> + ~VRING_DESC_F_NEXT;
This seems unnecessary.
> }
>
> /* We're using some buffers from the free list. */
> @@ -693,7 +695,7 @@ static void detach_buf_split(struct vring_virtqueue *vq, unsigned int head,
> /* Put back on free list: unmap first-level descriptors and find end */
> i = head;
>
> - while (vq->split.vring.desc[i].flags & nextflag) {
> + while (vq->split.desc_extra[i].flags & nextflag) {
nextflag is __virtio16
You can use VRING_DESC_F_NEXT directly.
Thanks.
> vring_unmap_one_split(vq, i);
> i = vq->split.desc_extra[i].next;
> vq->vq.num_free++;
> --
> 2.17.1
>
----- Original Message -----
> From: "Xuan Zhuo" <[email protected]>
> To: "Guo Zhi" <[email protected]>
> Cc: "netdev" <[email protected]>, "linux-kernel" <[email protected]>, "kvm list" <[email protected]>,
> "virtualization" <[email protected]>, "Guo Zhi" <[email protected]>, "eperezma"
> <[email protected]>, "jasowang" <[email protected]>, "sgarzare" <[email protected]>, "Michael Tsirkin"
> <[email protected]>
> Sent: Thursday, August 18, 2022 11:11:58 AM
> Subject: Re: [RFC v2 6/7] virtio: in order support for virtio_ring
> On Wed, 17 Aug 2022 21:57:17 +0800, Guo Zhi <[email protected]> wrote:
>> If in order feature negotiated, we can skip the used ring to get
>> buffer's desc id sequentially.
>>
>> Signed-off-by: Guo Zhi <[email protected]>
>> ---
>> drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
>> 1 file changed, 45 insertions(+), 8 deletions(-)
>>
>> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
>> index 1c1b3fa376a2..143184ebb5a1 100644
>> --- a/drivers/virtio/virtio_ring.c
>> +++ b/drivers/virtio/virtio_ring.c
>> @@ -144,6 +144,9 @@ struct vring_virtqueue {
>> /* DMA address and size information */
>> dma_addr_t queue_dma_addr;
>> size_t queue_size_in_bytes;
>> +
>> + /* In order feature batch begin here */
>> + u16 next_desc_begin;
>> } split;
>>
>> /* Available for packed ring */
>> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq,
>> unsigned int head,
>> }
>>
>> vring_unmap_one_split(vq, i);
>> - vq->split.desc_extra[i].next = vq->free_head;
>> - vq->free_head = head;
>> + /* In order feature use desc in order,
>> + * that means, the next desc will always be free
>> + */
>> + if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
>
> Call virtio_has_feature() here is not good.
>
> Thanks.
>
Maybe I can use a variable like vq->indiret?
Thanks.
>> + vq->split.desc_extra[i].next = vq->free_head;
>> + vq->free_head = head;
>> + }
>>
>> /* Plus final descriptor */
>> vq->vq.num_free++;
>> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> {
>> struct vring_virtqueue *vq = to_vvq(_vq);
>> void *ret;
>> - unsigned int i;
>> + unsigned int i, j;
>> u16 last_used;
>>
>> START_USE(vq);
>> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> /* Only get used array entries after they have been exposed by host. */
>> virtio_rmb(vq->weak_barriers);
>>
>> - last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> - i = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].id);
>> - *len = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].len);
>> + if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
>> + /* Skip used ring and get used desc in order*/
>> + i = vq->split.next_desc_begin;
>> + j = i;
>> + /* Indirect only takes one descriptor in descriptor table */
>> + while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
>> + j = (j + 1) % vq->split.vring.num;
>> + /* move to next */
>> + j = (j + 1) % vq->split.vring.num;
>> + /* Next buffer will use this descriptor in order */
>> + vq->split.next_desc_begin = j;
>> + if (!vq->indirect) {
>> + *len = vq->split.desc_extra[i].len;
>> + } else {
>> + struct vring_desc *indir_desc =
>> + vq->split.desc_state[i].indir_desc;
>> + u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
>> +
>> + if (indir_desc) {
>> + for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
>> + buffer_len += indir_desc[j].len;
>> + }
>> +
>> + *len = buffer_len;
>> + }
>> + } else {
>> + last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> + i = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].id);
>> + *len = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].len);
>> + }
>>
>> if (unlikely(i >= vq->split.vring.num)) {
>> BAD_RING(vq, "id %u out of range\n", i);
>> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int
>> index,
>> vq->split.avail_flags_shadow = 0;
>> vq->split.avail_idx_shadow = 0;
>>
>> + vq->split.next_desc_begin = 0;
>> +
>> /* No callback? Tell other side not to bother us. */
>> if (!callback) {
>> vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
>> --
>> 2.17.1
>>
On Wed, Aug 17, 2022 at 3:58 PM Guo Zhi <[email protected]> wrote:
>
> Only add to used ring when a batch of buffer have all been used. And if
> in order feature negotiated, only add the last used descriptor for a
> batch of buffer.
>
> Signed-off-by: Guo Zhi <[email protected]>
> ---
> drivers/vhost/test.c | 8 +++++++-
> 1 file changed, 7 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/vhost/test.c b/drivers/vhost/test.c
> index bc8e7fb1e635..57cdb3a3edf6 100644
> --- a/drivers/vhost/test.c
> +++ b/drivers/vhost/test.c
> @@ -43,6 +43,9 @@ struct vhost_test {
> static void handle_vq(struct vhost_test *n)
> {
> struct vhost_virtqueue *vq = &n->vqs[VHOST_TEST_VQ];
> + struct vring_used_elem *heads = kmalloc(sizeof(*heads)
> + * vq->num, GFP_KERNEL);
It seems to me we can use kmalloc_array here.
Thanks!
> + int batch_idx = 0;
> unsigned out, in;
> int head;
> size_t len, total_len = 0;
> @@ -84,11 +87,14 @@ static void handle_vq(struct vhost_test *n)
> vq_err(vq, "Unexpected 0 len for TX\n");
> break;
> }
> - vhost_add_used_and_signal(&n->dev, vq, head, 0);
> + heads[batch_idx].id = cpu_to_vhost32(vq, head);
> + heads[batch_idx++].len = cpu_to_vhost32(vq, len);
> total_len += len;
> if (unlikely(vhost_exceeds_weight(vq, 0, total_len)))
> break;
> }
> + if (batch_idx)
> + vhost_add_used_and_signal_n(&n->dev, vq, heads, batch_idx);
>
> mutex_unlock(&vq->mutex);
> }
> --
> 2.17.1
>
On Wed, Aug 17, 2022 at 3:58 PM Guo Zhi <[email protected]> wrote:
>
> Vsock uses buffers in order, and for tx driver doesn't have to
> know the length of the buffer. So we can do a batch for vsock if
> in order negotiated, only write one used ring for a batch of buffers
>
> Signed-off-by: Guo Zhi <[email protected]>
> ---
> drivers/vhost/vsock.c | 9 ++++++++-
> 1 file changed, 8 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
> index 368330417bde..b0108009c39a 100644
> --- a/drivers/vhost/vsock.c
> +++ b/drivers/vhost/vsock.c
> @@ -500,6 +500,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
> int head, pkts = 0, total_len = 0;
> unsigned int out, in;
> bool added = false;
> + int last_head = -1;
>
> mutex_lock(&vq->mutex);
>
> @@ -551,10 +552,16 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
> else
> virtio_transport_free_pkt(pkt);
>
> - vhost_add_used(vq, head, 0);
> + if (!vhost_has_feature(vq, VIRTIO_F_IN_ORDER))
> + vhost_add_used(vq, head, 0);
> + else
> + last_head = head;
> added = true;
> } while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
>
> + /* If in order feature negotiaged, we can do a batch to increase performance */
> + if (vhost_has_feature(vq, VIRTIO_F_IN_ORDER) && last_head != -1)
> + vhost_add_used(vq, last_head, 0);
Expanding my previous mail on patch 1, you can also use this in vsock
tx queue code. This way, no modifications to vhost.c functions are
needed.
Thanks!
> no_more_replies:
> if (added)
> vhost_signal(&vsock->dev, vq);
> --
> 2.17.1
>
----- Original Message -----
> From: "eperezma" <[email protected]>
> To: "Guo Zhi" <[email protected]>
> Cc: "jasowang" <[email protected]>, "sgarzare" <[email protected]>, "Michael Tsirkin" <[email protected]>, "netdev"
> <[email protected]>, "linux-kernel" <[email protected]>, "kvm list" <[email protected]>,
> "virtualization" <[email protected]>
> Sent: Thursday, August 18, 2022 2:19:29 PM
> Subject: Re: [RFC v2 3/7] vsock: batch buffers in tx
> On Wed, Aug 17, 2022 at 3:58 PM Guo Zhi <[email protected]> wrote:
>>
>> Vsock uses buffers in order, and for tx driver doesn't have to
>> know the length of the buffer. So we can do a batch for vsock if
>> in order negotiated, only write one used ring for a batch of buffers
>>
>> Signed-off-by: Guo Zhi <[email protected]>
>> ---
>> drivers/vhost/vsock.c | 9 ++++++++-
>> 1 file changed, 8 insertions(+), 1 deletion(-)
>>
>> diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
>> index 368330417bde..b0108009c39a 100644
>> --- a/drivers/vhost/vsock.c
>> +++ b/drivers/vhost/vsock.c
>> @@ -500,6 +500,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work
>> *work)
>> int head, pkts = 0, total_len = 0;
>> unsigned int out, in;
>> bool added = false;
>> + int last_head = -1;
>>
>> mutex_lock(&vq->mutex);
>>
>> @@ -551,10 +552,16 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work
>> *work)
>> else
>> virtio_transport_free_pkt(pkt);
>>
>> - vhost_add_used(vq, head, 0);
>> + if (!vhost_has_feature(vq, VIRTIO_F_IN_ORDER))
>> + vhost_add_used(vq, head, 0);
>> + else
>> + last_head = head;
>> added = true;
>> } while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
>>
>> + /* If in order feature negotiaged, we can do a batch to increase
>> performance */
>> + if (vhost_has_feature(vq, VIRTIO_F_IN_ORDER) && last_head != -1)
>> + vhost_add_used(vq, last_head, 0);
>
> Expanding my previous mail on patch 1, you can also use this in vsock
> tx queue code. This way, no modifications to vhost.c functions are
> needed.
>
> Thanks!
As replied in patch 1, no modification to vhost is not feasible.
Thanks!
>
>> no_more_replies:
>> if (added)
>> vhost_signal(&vsock->dev, vq);
>> --
>> 2.17.1
>>
在 2022/8/17 21:57, Guo Zhi 写道:
> Only add to used ring when a batch of buffer have all been used. And if
> in order feature negotiated, only add the last used descriptor for a
> batch of buffer.
>
> Signed-off-by: Guo Zhi <[email protected]>
> ---
> drivers/vhost/test.c | 8 +++++++-
> 1 file changed, 7 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/vhost/test.c b/drivers/vhost/test.c
> index bc8e7fb1e635..57cdb3a3edf6 100644
> --- a/drivers/vhost/test.c
> +++ b/drivers/vhost/test.c
> @@ -43,6 +43,9 @@ struct vhost_test {
> static void handle_vq(struct vhost_test *n)
> {
> struct vhost_virtqueue *vq = &n->vqs[VHOST_TEST_VQ];
> + struct vring_used_elem *heads = kmalloc(sizeof(*heads)
> + * vq->num, GFP_KERNEL);
Though it's a test device, it would be better to try avoid memory
allocation in the datapath.
And where is is freed?
Thanks
> + int batch_idx = 0;
> unsigned out, in;
> int head;
> size_t len, total_len = 0;
> @@ -84,11 +87,14 @@ static void handle_vq(struct vhost_test *n)
> vq_err(vq, "Unexpected 0 len for TX\n");
> break;
> }
> - vhost_add_used_and_signal(&n->dev, vq, head, 0);
> + heads[batch_idx].id = cpu_to_vhost32(vq, head);
> + heads[batch_idx++].len = cpu_to_vhost32(vq, len);
> total_len += len;
> if (unlikely(vhost_exceeds_weight(vq, 0, total_len)))
> break;
> }
> + if (batch_idx)
> + vhost_add_used_and_signal_n(&n->dev, vq, heads, batch_idx);
>
> mutex_unlock(&vq->mutex);
> }
在 2022/8/17 21:57, Guo Zhi 写道:
> Vsock uses buffers in order, and for tx driver doesn't have to
> know the length of the buffer. So we can do a batch for vsock if
> in order negotiated, only write one used ring for a batch of buffers
>
> Signed-off-by: Guo Zhi <[email protected]>
> ---
> drivers/vhost/vsock.c | 9 ++++++++-
> 1 file changed, 8 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
> index 368330417bde..b0108009c39a 100644
> --- a/drivers/vhost/vsock.c
> +++ b/drivers/vhost/vsock.c
> @@ -500,6 +500,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
> int head, pkts = 0, total_len = 0;
> unsigned int out, in;
> bool added = false;
> + int last_head = -1;
>
> mutex_lock(&vq->mutex);
>
> @@ -551,10 +552,16 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
> else
> virtio_transport_free_pkt(pkt);
>
> - vhost_add_used(vq, head, 0);
> + if (!vhost_has_feature(vq, VIRTIO_F_IN_ORDER))
> + vhost_add_used(vq, head, 0);
> + else
> + last_head = head;
> added = true;
> } while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
>
> + /* If in order feature negotiaged, we can do a batch to increase performance */
> + if (vhost_has_feature(vq, VIRTIO_F_IN_ORDER) && last_head != -1)
> + vhost_add_used(vq, last_head, 0);
I may miss something but spec said "The device then skips forward in the
ring according to the size of the batch. ".
I don't see how it is done here.
Thanks
> no_more_replies:
> if (added)
> vhost_signal(&vsock->dev, vq);
在 2022/8/17 21:57, Guo Zhi 写道:
> We didn't unmask F_NEXT flag in desc_extra in the end of a chain,
> unmask it so that we can access desc_extra to get next information.
>
> Signed-off-by: Guo Zhi <[email protected]>
I post a similar patch in the past.
Please share the perf numbers (e.g pps via pktgen).
Thanks
> ---
> drivers/virtio/virtio_ring.c | 6 ++++--
> 1 file changed, 4 insertions(+), 2 deletions(-)
>
> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
> index a5ec724c01d8..1c1b3fa376a2 100644
> --- a/drivers/virtio/virtio_ring.c
> +++ b/drivers/virtio/virtio_ring.c
> @@ -567,7 +567,7 @@ static inline int virtqueue_add_split(struct virtqueue *_vq,
> }
> /* Last one doesn't continue. */
> desc[prev].flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT);
> - if (!indirect && vq->use_dma_api)
> + if (!indirect)
> vq->split.desc_extra[prev & (vq->split.vring.num - 1)].flags &=
> ~VRING_DESC_F_NEXT;
>
> @@ -584,6 +584,8 @@ static inline int virtqueue_add_split(struct virtqueue *_vq,
> total_sg * sizeof(struct vring_desc),
> VRING_DESC_F_INDIRECT,
> false);
> + vq->split.desc_extra[head & (vq->split.vring.num - 1)].flags &=
> + ~VRING_DESC_F_NEXT;
> }
>
> /* We're using some buffers from the free list. */
> @@ -693,7 +695,7 @@ static void detach_buf_split(struct vring_virtqueue *vq, unsigned int head,
> /* Put back on free list: unmap first-level descriptors and find end */
> i = head;
>
> - while (vq->split.vring.desc[i].flags & nextflag) {
> + while (vq->split.desc_extra[i].flags & nextflag) {
> vring_unmap_one_split(vq, i);
> i = vq->split.desc_extra[i].next;
> vq->vq.num_free++;
在 2022/8/17 21:57, Guo Zhi 写道:
> If in order feature negotiated, we can skip the used ring to get
> buffer's desc id sequentially.
>
> Signed-off-by: Guo Zhi <[email protected]>
> ---
> drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
> 1 file changed, 45 insertions(+), 8 deletions(-)
>
> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
> index 1c1b3fa376a2..143184ebb5a1 100644
> --- a/drivers/virtio/virtio_ring.c
> +++ b/drivers/virtio/virtio_ring.c
> @@ -144,6 +144,9 @@ struct vring_virtqueue {
> /* DMA address and size information */
> dma_addr_t queue_dma_addr;
> size_t queue_size_in_bytes;
> +
> + /* In order feature batch begin here */
We need tweak the comment, it's not easy for me to understand the
meaning here.
> + u16 next_desc_begin;
> } split;
>
> /* Available for packed ring */
> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq, unsigned int head,
> }
>
> vring_unmap_one_split(vq, i);
> - vq->split.desc_extra[i].next = vq->free_head;
> - vq->free_head = head;
> + /* In order feature use desc in order,
> + * that means, the next desc will always be free
> + */
Maybe we should add something like "The descriptors are prepared in order".
> + if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
> + vq->split.desc_extra[i].next = vq->free_head;
> + vq->free_head = head;
> + }
>
> /* Plus final descriptor */
> vq->vq.num_free++;
> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
> {
> struct vring_virtqueue *vq = to_vvq(_vq);
> void *ret;
> - unsigned int i;
> + unsigned int i, j;
> u16 last_used;
>
> START_USE(vq);
> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
> /* Only get used array entries after they have been exposed by host. */
> virtio_rmb(vq->weak_barriers);
>
> - last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
> - i = virtio32_to_cpu(_vq->vdev,
> - vq->split.vring.used->ring[last_used].id);
> - *len = virtio32_to_cpu(_vq->vdev,
> - vq->split.vring.used->ring[last_used].len);
> + if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
> + /* Skip used ring and get used desc in order*/
> + i = vq->split.next_desc_begin;
> + j = i;
> + /* Indirect only takes one descriptor in descriptor table */
> + while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
> + j = (j + 1) % vq->split.vring.num;
Let's move the expensive mod outside the loop. Or it's split so we can
use and here actually since the size is guaranteed to be power of the
two? Another question, is it better to store the next_desc in e.g
desc_extra?
And this seems very expensive if the device doesn't do the batching
(which is not mandatory).
> + /* move to next */
> + j = (j + 1) % vq->split.vring.num;
> + /* Next buffer will use this descriptor in order */
> + vq->split.next_desc_begin = j;
> + if (!vq->indirect) {
> + *len = vq->split.desc_extra[i].len;
> + } else {
> + struct vring_desc *indir_desc =
> + vq->split.desc_state[i].indir_desc;
> + u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
> +
> + if (indir_desc) {
> + for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
> + buffer_len += indir_desc[j].len;
So I think we need to finalize this, then we can have much more stress
on the cache:
https://lkml.org/lkml/2021/10/26/1300
It was reverted since it's too aggressive, we should instead:
1) do the validation only for morden device
2) fail only when we enable the validation via (e.g a module parameter).
Thanks
> + }
> +
> + *len = buffer_len;
> + }
> + } else {
> + last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
> + i = virtio32_to_cpu(_vq->vdev,
> + vq->split.vring.used->ring[last_used].id);
> + *len = virtio32_to_cpu(_vq->vdev,
> + vq->split.vring.used->ring[last_used].len);
> + }
>
> if (unlikely(i >= vq->split.vring.num)) {
> BAD_RING(vq, "id %u out of range\n", i);
> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int index,
> vq->split.avail_flags_shadow = 0;
> vq->split.avail_idx_shadow = 0;
>
> + vq->split.next_desc_begin = 0;
> +
> /* No callback? Tell other side not to bother us. */
> if (!callback) {
> vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
----- Original Message -----
> From: "jasowang" <[email protected]>
> To: "Guo Zhi" <[email protected]>, "eperezma" <[email protected]>, "sgarzare" <[email protected]>, "Michael
> Tsirkin" <[email protected]>
> Cc: "netdev" <[email protected]>, "linux-kernel" <[email protected]>, "kvm list" <[email protected]>,
> "virtualization" <[email protected]>
> Sent: Thursday, August 25, 2022 3:08:58 PM
> Subject: Re: [RFC v2 3/7] vsock: batch buffers in tx
> ?? 2022/8/17 21:57, Guo Zhi д??:
>> Vsock uses buffers in order, and for tx driver doesn't have to
>> know the length of the buffer. So we can do a batch for vsock if
>> in order negotiated, only write one used ring for a batch of buffers
>>
>> Signed-off-by: Guo Zhi <[email protected]>
>> ---
>> drivers/vhost/vsock.c | 9 ++++++++-
>> 1 file changed, 8 insertions(+), 1 deletion(-)
>>
>> diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
>> index 368330417bde..b0108009c39a 100644
>> --- a/drivers/vhost/vsock.c
>> +++ b/drivers/vhost/vsock.c
>> @@ -500,6 +500,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work
>> *work)
>> int head, pkts = 0, total_len = 0;
>> unsigned int out, in;
>> bool added = false;
>> + int last_head = -1;
>>
>> mutex_lock(&vq->mutex);
>>
>> @@ -551,10 +552,16 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work
>> *work)
>> else
>> virtio_transport_free_pkt(pkt);
>>
>> - vhost_add_used(vq, head, 0);
>> + if (!vhost_has_feature(vq, VIRTIO_F_IN_ORDER))
>> + vhost_add_used(vq, head, 0);
>> + else
>> + last_head = head;
>> added = true;
>> } while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
>>
>> + /* If in order feature negotiaged, we can do a batch to increase performance
>> */
>> + if (vhost_has_feature(vq, VIRTIO_F_IN_ORDER) && last_head != -1)
>> + vhost_add_used(vq, last_head, 0);
>
>
> I may miss something but spec said "The device then skips forward in the
> ring according to the size of the batch. ".
>
> I don't see how it is done here.
>
> Thanks
>
It can skip them in __vhost_add_used_n if _F_IN_ORDER is negotiated.
last_used_idx will be added by size of the batch.
>
>> no_more_replies:
>> if (added)
>> vhost_signal(&vsock->dev, vq);
----- Original Message -----
> From: "jasowang" <[email protected]>
> To: "Guo Zhi" <[email protected]>, "eperezma" <[email protected]>, "sgarzare" <[email protected]>, "Michael
> Tsirkin" <[email protected]>
> Cc: "netdev" <[email protected]>, "linux-kernel" <[email protected]>, "kvm list" <[email protected]>,
> "virtualization" <[email protected]>
> Sent: Thursday, August 25, 2022 3:44:41 PM
> Subject: Re: [RFC v2 6/7] virtio: in order support for virtio_ring
> ?? 2022/8/17 21:57, Guo Zhi д??:
>> If in order feature negotiated, we can skip the used ring to get
>> buffer's desc id sequentially.
>>
>> Signed-off-by: Guo Zhi <[email protected]>
>> ---
>> drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
>> 1 file changed, 45 insertions(+), 8 deletions(-)
>>
>> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
>> index 1c1b3fa376a2..143184ebb5a1 100644
>> --- a/drivers/virtio/virtio_ring.c
>> +++ b/drivers/virtio/virtio_ring.c
>> @@ -144,6 +144,9 @@ struct vring_virtqueue {
>> /* DMA address and size information */
>> dma_addr_t queue_dma_addr;
>> size_t queue_size_in_bytes;
>> +
>> + /* In order feature batch begin here */
>
>
> We need tweak the comment, it's not easy for me to understand the
> meaning here.
>
>
>> + u16 next_desc_begin;
>> } split;
>>
>> /* Available for packed ring */
>> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq,
>> unsigned int head,
>> }
>>
>> vring_unmap_one_split(vq, i);
>> - vq->split.desc_extra[i].next = vq->free_head;
>> - vq->free_head = head;
>> + /* In order feature use desc in order,
>> + * that means, the next desc will always be free
>> + */
>
>
> Maybe we should add something like "The descriptors are prepared in order".
>
>
>> + if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
>> + vq->split.desc_extra[i].next = vq->free_head;
>> + vq->free_head = head;
>> + }
>>
>> /* Plus final descriptor */
>> vq->vq.num_free++;
>> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> {
>> struct vring_virtqueue *vq = to_vvq(_vq);
>> void *ret;
>> - unsigned int i;
>> + unsigned int i, j;
>> u16 last_used;
>>
>> START_USE(vq);
>> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> /* Only get used array entries after they have been exposed by host. */
>> virtio_rmb(vq->weak_barriers);
>>
>> - last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> - i = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].id);
>> - *len = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].len);
>> + if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
>> + /* Skip used ring and get used desc in order*/
>> + i = vq->split.next_desc_begin;
>> + j = i;
>> + /* Indirect only takes one descriptor in descriptor table */
>> + while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
>> + j = (j + 1) % vq->split.vring.num;
>
>
> Let's move the expensive mod outside the loop. Or it's split so we can
> use and here actually since the size is guaranteed to be power of the
> two? Another question, is it better to store the next_desc in e.g
> desc_extra?
>
> And this seems very expensive if the device doesn't do the batching
> (which is not mandatory).
>
>
>> + /* move to next */
>> + j = (j + 1) % vq->split.vring.num;
>> + /* Next buffer will use this descriptor in order */
>> + vq->split.next_desc_begin = j;
>> + if (!vq->indirect) {
>> + *len = vq->split.desc_extra[i].len;
>> + } else {
>> + struct vring_desc *indir_desc =
>> + vq->split.desc_state[i].indir_desc;
>> + u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
>> +
>> + if (indir_desc) {
>> + for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
>> + buffer_len += indir_desc[j].len;
>
>
> So I think we need to finalize this, then we can have much more stress
> on the cache:
>
> https://lkml.org/lkml/2021/10/26/1300
>
> It was reverted since it's too aggressive, we should instead:
>
> 1) do the validation only for morden device
>
> 2) fail only when we enable the validation via (e.g a module parameter).
>
> Thanks
>
Sorry for this obsolete implementation, we will not get buffer'len like this(in a loop).
Actually, for not skipped buffers, we can get length from used ring directly, for skipped buffers
I think we don??t have to get the length, because the driver is not interested in the skipped buffers(tx)?? length.
>
>> + }
>> +
>> + *len = buffer_len;
>> + }
>> + } else {
>> + last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> + i = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].id);
>> + *len = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].len);
>> + }
>>
>> if (unlikely(i >= vq->split.vring.num)) {
>> BAD_RING(vq, "id %u out of range\n", i);
>> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int
>> index,
>> vq->split.avail_flags_shadow = 0;
>> vq->split.avail_idx_shadow = 0;
>>
>> + vq->split.next_desc_begin = 0;
>> +
>> /* No callback? Tell other side not to bother us. */
>> if (!callback) {
>> vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
----- Original Message -----
> From: "jasowang" <[email protected]>
> To: "Guo Zhi" <[email protected]>, "eperezma" <[email protected]>, "sgarzare" <[email protected]>, "Michael
> Tsirkin" <[email protected]>
> Cc: "netdev" <[email protected]>, "linux-kernel" <[email protected]>, "kvm list" <[email protected]>,
> "virtualization" <[email protected]>
> Sent: Thursday, August 25, 2022 3:44:41 PM
> Subject: Re: [RFC v2 6/7] virtio: in order support for virtio_ring
> ?? 2022/8/17 21:57, Guo Zhi д??:
>> If in order feature negotiated, we can skip the used ring to get
>> buffer's desc id sequentially.
>>
>> Signed-off-by: Guo Zhi <[email protected]>
>> ---
>> drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
>> 1 file changed, 45 insertions(+), 8 deletions(-)
>>
>> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
>> index 1c1b3fa376a2..143184ebb5a1 100644
>> --- a/drivers/virtio/virtio_ring.c
>> +++ b/drivers/virtio/virtio_ring.c
>> @@ -144,6 +144,9 @@ struct vring_virtqueue {
>> /* DMA address and size information */
>> dma_addr_t queue_dma_addr;
>> size_t queue_size_in_bytes;
>> +
>> + /* In order feature batch begin here */
>
>
> We need tweak the comment, it's not easy for me to understand the
> meaning here.
>
How about this: if in_order feature is negotiated, is the next head to consume.
>
>> + u16 next_desc_begin;
>> } split;
>>
>> /* Available for packed ring */
>> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq,
>> unsigned int head,
>> }
>>
>> vring_unmap_one_split(vq, i);
>> - vq->split.desc_extra[i].next = vq->free_head;
>> - vq->free_head = head;
>> + /* In order feature use desc in order,
>> + * that means, the next desc will always be free
>> + */
>
>
> Maybe we should add something like "The descriptors are prepared in order".
>
I will change it to this: The descriptors are made available in order if the in order feature is used. Since the free_head is already a circular list, it must consume it sequentially.
>
>> + if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
>> + vq->split.desc_extra[i].next = vq->free_head;
>> + vq->free_head = head;
>> + }
>>
>> /* Plus final descriptor */
>> vq->vq.num_free++;
>> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> {
>> struct vring_virtqueue *vq = to_vvq(_vq);
>> void *ret;
>> - unsigned int i;
>> + unsigned int i, j;
>> u16 last_used;
>>
>> START_USE(vq);
>> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> /* Only get used array entries after they have been exposed by host. */
>> virtio_rmb(vq->weak_barriers);
>>
>> - last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> - i = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].id);
>> - *len = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].len);
>> + if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
>> + /* Skip used ring and get used desc in order*/
>> + i = vq->split.next_desc_begin;
>> + j = i;
>> + /* Indirect only takes one descriptor in descriptor table */
>> + while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
>> + j = (j + 1) % vq->split.vring.num;
>
>
> Let's move the expensive mod outside the loop. Or it's split so we can
> use and here actually since the size is guaranteed to be power of the
> two? Another question, is it better to store the next_desc in e.g
> desc_extra?
Thanks, I will use bit operation instead of mod.
We only use one next_desc_begin at the same time, there is no need to store it in desc_extra.
>
> And this seems very expensive if the device doesn't do the batching
> (which is not mandatory).
>
>
We will judge whether the device batched the buffer or not, we will only use this way for the batched buffer.
And Not in order is more expensive, because is following a linked list.
>> + /* move to next */
>> + j = (j + 1) % vq->split.vring.num;
>> + /* Next buffer will use this descriptor in order */
>> + vq->split.next_desc_begin = j;
>> + if (!vq->indirect) {
>> + *len = vq->split.desc_extra[i].len;
>> + } else {
>> + struct vring_desc *indir_desc =
>> + vq->split.desc_state[i].indir_desc;
>> + u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
>> +
>> + if (indir_desc) {
>> + for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
>> + buffer_len += indir_desc[j].len;
>
>
> So I think we need to finalize this, then we can have much more stress
> on the cache:
>
> https://lkml.org/lkml/2021/10/26/1300
>
> It was reverted since it's too aggressive, we should instead:
>
> 1) do the validation only for morden device
>
> 2) fail only when we enable the validation via (e.g a module parameter).
>
> Thanks
>
>
>> + }
>> +
>> + *len = buffer_len;
>> + }
>> + } else {
>> + last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> + i = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].id);
>> + *len = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].len);
>> + }
>>
>> if (unlikely(i >= vq->split.vring.num)) {
>> BAD_RING(vq, "id %u out of range\n", i);
>> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int
>> index,
>> vq->split.avail_flags_shadow = 0;
>> vq->split.avail_idx_shadow = 0;
>>
>> + vq->split.next_desc_begin = 0;
>> +
>> /* No callback? Tell other side not to bother us. */
>> if (!callback) {
>> vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;