In case of heavy cpu load, rx worker thread
not able to get cpu slot because it's bound
to kernel global work queue.
Glink client drivers will return timeout
error if they didn't receive response
within stipulated duration. Changing rx
work queue type to UNBOUND workqueue
ensures rx worker thread will be executed
as early as possible.
Signed-off-by: Manikanta Mylavarapu <[email protected]>
---
drivers/rpmsg/qcom_glink_native.c | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/drivers/rpmsg/qcom_glink_native.c b/drivers/rpmsg/qcom_glink_native.c
index 1beb40a1d3df..6f9a439e5046 100644
--- a/drivers/rpmsg/qcom_glink_native.c
+++ b/drivers/rpmsg/qcom_glink_native.c
@@ -81,6 +81,7 @@ struct glink_core_rx_intent {
* @rx_pipe: pipe object for receive FIFO
* @tx_pipe: pipe object for transmit FIFO
* @rx_work: worker for handling received control messages
+ * @rx_wq: work queue of rx_work
* @rx_lock: protects the @rx_queue
* @rx_queue: queue of received control messages to be processed in @rx_work
* @tx_lock: synchronizes operations on the tx fifo
@@ -100,6 +101,7 @@ struct qcom_glink {
struct qcom_glink_pipe *tx_pipe;
struct work_struct rx_work;
+ struct workqueue_struct *rx_wq;
spinlock_t rx_lock;
struct list_head rx_queue;
@@ -822,7 +824,7 @@ static int qcom_glink_rx_defer(struct qcom_glink *glink, size_t extra)
list_add_tail(&dcmd->node, &glink->rx_queue);
spin_unlock(&glink->rx_lock);
- schedule_work(&glink->rx_work);
+ queue_work(glink->rx_wq, &glink->rx_work);
qcom_glink_rx_advance(glink, sizeof(dcmd->msg) + extra);
return 0;
@@ -1665,6 +1667,8 @@ static void qcom_glink_cancel_rx_work(struct qcom_glink *glink)
/* cancel any pending deferred rx_work */
cancel_work_sync(&glink->rx_work);
+ destroy_workqueue(glink->rx_wq);
+
list_for_each_entry_safe(dcmd, tmp, &glink->rx_queue, node)
kfree(dcmd);
}
@@ -1750,6 +1754,10 @@ struct qcom_glink *qcom_glink_native_probe(struct device *dev,
INIT_WORK(&glink->rx_work, qcom_glink_work);
init_waitqueue_head(&glink->tx_avail_notify);
+ glink->rx_wq = alloc_workqueue("glink_rx_wq", WQ_UNBOUND, 1);
+ if (!glink->rx_wq)
+ return ERR_PTR(-ENOMEM);
+
spin_lock_init(&glink->idr_lock);
idr_init(&glink->lcids);
idr_init(&glink->rcids);
--
2.17.1
On 6/7/2023 5:47 PM, Manikanta Mylavarapu wrote:
> In case of heavy cpu load, rx worker thread
> not able to get cpu slot because it's bound
> to kernel global work queue.
>
> Glink client drivers will return timeout
> error if they didn't receive response
> within stipulated duration. Changing rx
> work queue type to UNBOUND workqueue
> ensures rx worker thread will be executed
> as early as possible.
>
> Signed-off-by: Manikanta Mylavarapu <[email protected]>
> ---
> drivers/rpmsg/qcom_glink_native.c | 10 +++++++++-
> 1 file changed, 9 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/rpmsg/qcom_glink_native.c b/drivers/rpmsg/qcom_glink_native.c
> index 1beb40a1d3df..6f9a439e5046 100644
> --- a/drivers/rpmsg/qcom_glink_native.c
> +++ b/drivers/rpmsg/qcom_glink_native.c
> @@ -81,6 +81,7 @@ struct glink_core_rx_intent {
> * @rx_pipe: pipe object for receive FIFO
> * @tx_pipe: pipe object for transmit FIFO
> * @rx_work: worker for handling received control messages
> + * @rx_wq: work queue of rx_work
> * @rx_lock: protects the @rx_queue
> * @rx_queue: queue of received control messages to be processed in @rx_work
> * @tx_lock: synchronizes operations on the tx fifo
> @@ -100,6 +101,7 @@ struct qcom_glink {
> struct qcom_glink_pipe *tx_pipe;
>
> struct work_struct rx_work;
> + struct workqueue_struct *rx_wq;
> spinlock_t rx_lock;
> struct list_head rx_queue;
>
> @@ -822,7 +824,7 @@ static int qcom_glink_rx_defer(struct qcom_glink *glink, size_t extra)
> list_add_tail(&dcmd->node, &glink->rx_queue);
> spin_unlock(&glink->rx_lock);
>
> - schedule_work(&glink->rx_work);
> + queue_work(glink->rx_wq, &glink->rx_work);
> qcom_glink_rx_advance(glink, sizeof(dcmd->msg) + extra);
>
> return 0;
> @@ -1665,6 +1667,8 @@ static void qcom_glink_cancel_rx_work(struct qcom_glink *glink)
> /* cancel any pending deferred rx_work */
> cancel_work_sync(&glink->rx_work);
>
> + destroy_workqueue(glink->rx_wq);
> +
> list_for_each_entry_safe(dcmd, tmp, &glink->rx_queue, node)
> kfree(dcmd);
> }
> @@ -1750,6 +1754,10 @@ struct qcom_glink *qcom_glink_native_probe(struct device *dev,
> INIT_WORK(&glink->rx_work, qcom_glink_work);
> init_waitqueue_head(&glink->tx_avail_notify);
>
> + glink->rx_wq = alloc_workqueue("glink_rx_wq", WQ_UNBOUND, 1);
> + if (!glink->rx_wq)
> + return ERR_PTR(-ENOMEM);
> +
> spin_lock_init(&glink->idr_lock);
> idr_init(&glink->lcids);
> idr_init(&glink->rcids);
Gentle reminder for review!
Thanks,
Manikanta.
On Wed, Jun 07, 2023 at 05:47:30PM +0530, Manikanta Mylavarapu wrote:
> In case of heavy cpu load, rx worker thread
> not able to get cpu slot because it's bound
> to kernel global work queue.
>
> Glink client drivers will return timeout
> error if they didn't receive response
> within stipulated duration. Changing rx
> work queue type to UNBOUND workqueue
> ensures rx worker thread will be executed
> as early as possible.
>
I'm guessing that you're referring to the fact that incoming intent
requests (GLINK_CMD_RX_INTENT_REQ) are handled in the rx_worker. But I
shouldn't have to guess, please improve your commit message to be
slightly more specific.
Could we/should we handle the intent requests differently?
Please also use 75 character long lines in your commit message.
> Signed-off-by: Manikanta Mylavarapu <[email protected]>
> ---
> drivers/rpmsg/qcom_glink_native.c | 10 +++++++++-
> 1 file changed, 9 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/rpmsg/qcom_glink_native.c b/drivers/rpmsg/qcom_glink_native.c
> index 1beb40a1d3df..6f9a439e5046 100644
> --- a/drivers/rpmsg/qcom_glink_native.c
> +++ b/drivers/rpmsg/qcom_glink_native.c
> @@ -81,6 +81,7 @@ struct glink_core_rx_intent {
> * @rx_pipe: pipe object for receive FIFO
> * @tx_pipe: pipe object for transmit FIFO
> * @rx_work: worker for handling received control messages
> + * @rx_wq: work queue of rx_work
> * @rx_lock: protects the @rx_queue
> * @rx_queue: queue of received control messages to be processed in @rx_work
> * @tx_lock: synchronizes operations on the tx fifo
> @@ -100,6 +101,7 @@ struct qcom_glink {
> struct qcom_glink_pipe *tx_pipe;
>
> struct work_struct rx_work;
> + struct workqueue_struct *rx_wq;
> spinlock_t rx_lock;
> struct list_head rx_queue;
>
> @@ -822,7 +824,7 @@ static int qcom_glink_rx_defer(struct qcom_glink *glink, size_t extra)
> list_add_tail(&dcmd->node, &glink->rx_queue);
> spin_unlock(&glink->rx_lock);
>
> - schedule_work(&glink->rx_work);
> + queue_work(glink->rx_wq, &glink->rx_work);
> qcom_glink_rx_advance(glink, sizeof(dcmd->msg) + extra);
>
> return 0;
> @@ -1665,6 +1667,8 @@ static void qcom_glink_cancel_rx_work(struct qcom_glink *glink)
> /* cancel any pending deferred rx_work */
> cancel_work_sync(&glink->rx_work);
>
> + destroy_workqueue(glink->rx_wq);
> +
> list_for_each_entry_safe(dcmd, tmp, &glink->rx_queue, node)
> kfree(dcmd);
> }
> @@ -1750,6 +1754,10 @@ struct qcom_glink *qcom_glink_native_probe(struct device *dev,
> INIT_WORK(&glink->rx_work, qcom_glink_work);
> init_waitqueue_head(&glink->tx_avail_notify);
>
> + glink->rx_wq = alloc_workqueue("glink_rx_wq", WQ_UNBOUND, 1);
Please see the description in e.g. 56310520308a ("soc: qcom: qmi: Use
alloc_ordered_workqueue() to create ordered workqueues")
Regards,
Bjorn
> + if (!glink->rx_wq)
> + return ERR_PTR(-ENOMEM);
> +
> spin_lock_init(&glink->idr_lock);
> idr_init(&glink->lcids);
> idr_init(&glink->rcids);
> --
> 2.17.1
>