On Thu, 18 Aug 2016, Waiman Long wrote:
>The osq_lock() and osq_unlock() function may not provide the necessary
>acquire and release barrier in some cases. This patch makes sure
>that the proper barriers are provided when osq_lock() is successful
>or when osq_unlock() is called.
But why do we need these guarantees given that osq is only used internally
for lock owner spinning situations? Leaking out of the critical region will
obviously be bad if using it as a full lock, but, as is, this can only hurt
performance of two of the most popular locks in the kernel -- although yes,
using smp_acquire__after_ctrl_dep is nicer for polling.
If you need tighter osq for rwsems, could it be refactored such that mutexes
do not take a hit?
>
>Suggested-by: Peter Zijlstra (Intel) <[email protected]>
>Signed-off-by: Waiman Long <[email protected]>
>---
> kernel/locking/osq_lock.c | 24 ++++++++++++++++++------
> 1 files changed, 18 insertions(+), 6 deletions(-)
>
>diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
>index 05a3785..3da0b97 100644
>--- a/kernel/locking/osq_lock.c
>+++ b/kernel/locking/osq_lock.c
>@@ -124,6 +124,11 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>
> cpu_relax_lowlatency();
> }
>+ /*
>+ * Add an acquire memory barrier for pairing with the release barrier
>+ * in unlock.
>+ */
>+ smp_acquire__after_ctrl_dep();
> return true;
>
> unqueue:
>@@ -198,13 +203,20 @@ void osq_unlock(struct optimistic_spin_queue *lock)
> * Second most likely case.
> */
> node = this_cpu_ptr(&osq_node);
>- next = xchg(&node->next, NULL);
>- if (next) {
>- WRITE_ONCE(next->locked, 1);
>+ next = xchg_relaxed(&node->next, NULL);
>+ if (next)
>+ goto unlock;
>+
>+ next = osq_wait_next(lock, node, NULL);
>+ if (unlikely(!next)) {
>+ /*
>+ * In the unlikely event that the OSQ is empty, we need to
>+ * provide a proper release barrier.
>+ */
>+ smp_mb();
> return;
> }
>
>- next = osq_wait_next(lock, node, NULL);
>- if (next)
>- WRITE_ONCE(next->locked, 1);
>+unlock:
>+ smp_store_release(&next->locked, 1);
> }
As well as for the smp_acquire__after_ctrl_dep comment you have above, this also
obviously pairs with the osq_lock's smp_load_acquire while backing out (unqueueing,
step A). Given the above, for this case we might also just rely on READ_ONCE(node->locked),
if we get the conditional wrong and miss the node becoming locked, all we do is another
iteration, and while there is a cmpxchg() there, it is mitigated with the ccas thingy.
Thanks,
Davidlohr
On Tue, Oct 4, 2016 at 12:06 PM, Davidlohr Bueso <[email protected]> wrote:
> On Thu, 18 Aug 2016, Waiman Long wrote:
>
>> The osq_lock() and osq_unlock() function may not provide the necessary
>> acquire and release barrier in some cases. This patch makes sure
>> that the proper barriers are provided when osq_lock() is successful
>> or when osq_unlock() is called.
>
>
> But why do we need these guarantees given that osq is only used internally
> for lock owner spinning situations? Leaking out of the critical region will
> obviously be bad if using it as a full lock, but, as is, this can only hurt
> performance of two of the most popular locks in the kernel -- although yes,
> using smp_acquire__after_ctrl_dep is nicer for polling.
>
> If you need tighter osq for rwsems, could it be refactored such that mutexes
> do not take a hit?
I agree with David, the OSQ is meant to be used internally as a
queuing mechanism than as something for protecting critical regions,
and so these guarantees of preventing critical section leaks don't
seem to be necessary for the OSQ. If that is the case, then it would
be better to avoid the performance hit to mutexes and rwsems.
Jason
On 10/04/2016 03:06 PM, Davidlohr Bueso wrote:
> On Thu, 18 Aug 2016, Waiman Long wrote:
>
>> The osq_lock() and osq_unlock() function may not provide the necessary
>> acquire and release barrier in some cases. This patch makes sure
>> that the proper barriers are provided when osq_lock() is successful
>> or when osq_unlock() is called.
>
> But why do we need these guarantees given that osq is only used
> internally
> for lock owner spinning situations? Leaking out of the critical region
> will
> obviously be bad if using it as a full lock, but, as is, this can only
> hurt
> performance of two of the most popular locks in the kernel -- although
> yes,
> using smp_acquire__after_ctrl_dep is nicer for polling.
First of all, it is not obvious from the name osq_lock() that it is not
an acquire barrier in some cases. We either need to clearly document it
or has a variant name that indicate that, e.g. osq_lock_relaxed, for
example.
Secondly, if we look at the use cases of osq_lock(), the additional
latency (for non-x86 archs) only matters if the master lock is
immediately available for acquisition after osq_lock() return.
Otherwise, it will be hidden in the spinning loop for that master lock.
So yes, there may be a slight performance hit in some cases, but
certainly not always.
> If you need tighter osq for rwsems, could it be refactored such that
> mutexes
> do not take a hit?
>
Yes, we can certainly do that like splitting into 2 variants, one with
acquire barrier guarantee and one without.
>>
>> Suggested-by: Peter Zijlstra (Intel) <[email protected]>
>> Signed-off-by: Waiman Long <[email protected]>
>> ---
>> kernel/locking/osq_lock.c | 24 ++++++++++++++++++------
>> 1 files changed, 18 insertions(+), 6 deletions(-)
>>
>> diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
>> index 05a3785..3da0b97 100644
>> --- a/kernel/locking/osq_lock.c
>> +++ b/kernel/locking/osq_lock.c
>> @@ -124,6 +124,11 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>>
>> cpu_relax_lowlatency();
>> }
>> + /*
>> + * Add an acquire memory barrier for pairing with the release
>> barrier
>> + * in unlock.
>> + */
>> + smp_acquire__after_ctrl_dep();
>> return true;
>>
>> unqueue:
>> @@ -198,13 +203,20 @@ void osq_unlock(struct optimistic_spin_queue
>> *lock)
>> * Second most likely case.
>> */
>> node = this_cpu_ptr(&osq_node);
>> - next = xchg(&node->next, NULL);
>> - if (next) {
>> - WRITE_ONCE(next->locked, 1);
>> + next = xchg_relaxed(&node->next, NULL);
>> + if (next)
>> + goto unlock;
>> +
>> + next = osq_wait_next(lock, node, NULL);
>> + if (unlikely(!next)) {
>> + /*
>> + * In the unlikely event that the OSQ is empty, we need to
>> + * provide a proper release barrier.
>> + */
>> + smp_mb();
>> return;
>> }
>>
>> - next = osq_wait_next(lock, node, NULL);
>> - if (next)
>> - WRITE_ONCE(next->locked, 1);
>> +unlock:
>> + smp_store_release(&next->locked, 1);
>> }
>
> As well as for the smp_acquire__after_ctrl_dep comment you have above,
> this also
> obviously pairs with the osq_lock's smp_load_acquire while backing out
> (unqueueing,
> step A). Given the above, for this case we might also just rely on
> READ_ONCE(node->locked),
> if we get the conditional wrong and miss the node becoming locked, all
> we do is another
> iteration, and while there is a cmpxchg() there, it is mitigated with
> the ccas thingy.
Similar to osq_lock(), the current osq_unlock() does not have a release
barrier guarantee. I think splitting into 2 variants - osq_unlock,
osq_unlock_relaxed will help.
Cheers,
Longman
On 10/05/2016 08:19 AM, Waiman Long wrote:
> On 10/04/2016 03:06 PM, Davidlohr Bueso wrote:
>> On Thu, 18 Aug 2016, Waiman Long wrote:
>>
>>> The osq_lock() and osq_unlock() function may not provide the necessary
>>> acquire and release barrier in some cases. This patch makes sure
>>> that the proper barriers are provided when osq_lock() is successful
>>> or when osq_unlock() is called.
>>
>> But why do we need these guarantees given that osq is only used
>> internally
>> for lock owner spinning situations? Leaking out of the critical
>> region will
>> obviously be bad if using it as a full lock, but, as is, this can
>> only hurt
>> performance of two of the most popular locks in the kernel --
>> although yes,
>> using smp_acquire__after_ctrl_dep is nicer for polling.
>
> First of all, it is not obvious from the name osq_lock() that it is
> not an acquire barrier in some cases. We either need to clearly
> document it or has a variant name that indicate that, e.g.
> osq_lock_relaxed, for example.
>
> Secondly, if we look at the use cases of osq_lock(), the additional
> latency (for non-x86 archs) only matters if the master lock is
> immediately available for acquisition after osq_lock() return.
> Otherwise, it will be hidden in the spinning loop for that master
> lock. So yes, there may be a slight performance hit in some cases, but
> certainly not always.
>
>> If you need tighter osq for rwsems, could it be refactored such that
>> mutexes
>> do not take a hit?
>>
>
> Yes, we can certainly do that like splitting into 2 variants, one with
> acquire barrier guarantee and one without.
>
How about the following patch? Does that address your concern?
Cheers,
Longman
----------------------------------[ Cut Here
]--------------------------------------
[PATCH] locking/osq: Provide 2 variants of lock/unlock functions
Despite the lock/unlock names, the osq_lock() and osq_unlock()
functions were not proper acquire and release barriers. To clarify
the situation, two different variants are now provided:
1) osq_lock/osq_unlock - they are proper acquire (if successful)
and release barriers respectively.
2) osq_lock_relaxed/osq_unlock_relaxed - they do not provide the
acquire/release barrier guarantee.
Both the mutex and rwsem optimistic spinning codes are modified to
use the relaxed variants which will be faster in some architectures as
the proper memory barrier semantics are not needed for queuing purpose.
Signed-off-by: Waiman Long <[email protected]>
---
include/linux/osq_lock.h | 2 +
kernel/locking/mutex.c | 6 +-
kernel/locking/osq_lock.c | 89
+++++++++++++++++++++++++++++++++++-------
kernel/locking/rwsem-xadd.c | 4 +-
4 files changed, 81 insertions(+), 20 deletions(-)
diff --git a/include/linux/osq_lock.h b/include/linux/osq_lock.h
index 703ea5c..72357d0 100644
--- a/include/linux/osq_lock.h
+++ b/include/linux/osq_lock.h
@@ -30,7 +30,9 @@ static inline void osq_lock_init(struct
optimistic_spin_queue *lock)
}
extern bool osq_lock(struct optimistic_spin_queue *lock);
+extern bool osq_lock_relaxed(struct optimistic_spin_queue *lock);
extern void osq_unlock(struct optimistic_spin_queue *lock);
+extern void osq_unlock_relaxed(struct optimistic_spin_queue *lock);
static inline bool osq_is_locked(struct optimistic_spin_queue *lock)
{
diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
index a70b90d..b1bf1e0 100644
--- a/kernel/locking/mutex.c
+++ b/kernel/locking/mutex.c
@@ -316,7 +316,7 @@ static bool mutex_optimistic_spin(struct mutex *lock,
* acquire the mutex all at once, the spinners need to take a
* MCS (queued) lock first before spinning on the owner field.
*/
- if (!osq_lock(&lock->osq))
+ if (!osq_lock_relaxed(&lock->osq))
goto done;
while (true) {
@@ -358,7 +358,7 @@ static bool mutex_optimistic_spin(struct mutex *lock,
}
mutex_set_owner(lock);
- osq_unlock(&lock->osq);
+ osq_unlock_relaxed(&lock->osq);
return true;
}
@@ -380,7 +380,7 @@ static bool mutex_optimistic_spin(struct mutex *lock,
cpu_relax_lowlatency();
}
- osq_unlock(&lock->osq);
+ osq_unlock_relaxed(&lock->osq);
done:
/*
* If we fell out of the spin path because of need_resched(),
diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
index 05a3785..1e6823a 100644
--- a/kernel/locking/osq_lock.c
+++ b/kernel/locking/osq_lock.c
@@ -12,6 +12,23 @@
*/
static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node,
osq_node);
+enum mbtype {
+ acquire,
+ release,
+ relaxed,
+};
+
+static __always_inline int
+_atomic_cmpxchg_(const enum mbtype barrier, atomic_t *v, int old, int new)
+{
+ if (barrier == acquire)
+ return atomic_cmpxchg_acquire(v, old, new);
+ else if (barrier == release)
+ return atomic_cmpxchg_release(v, old, new);
+ else
+ return atomic_cmpxchg_relaxed(v, old, new);
+}
+
/*
* We use the value 0 to represent "no CPU", thus the encoded value
* will be the CPU number incremented by 1.
@@ -35,7 +52,8 @@ static inline struct optimistic_spin_node
*decode_cpu(int encoded_cpu_val)
static inline struct optimistic_spin_node *
osq_wait_next(struct optimistic_spin_queue *lock,
struct optimistic_spin_node *node,
- struct optimistic_spin_node *prev)
+ struct optimistic_spin_node *prev,
+ const enum mbtype barrier)
{
struct optimistic_spin_node *next = NULL;
int curr = encode_cpu(smp_processor_id());
@@ -50,7 +68,7 @@ osq_wait_next(struct optimistic_spin_queue *lock,
for (;;) {
if (atomic_read(&lock->tail) == curr &&
- atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) {
+ _atomic_cmpxchg_(barrier, &lock->tail, curr, old) == curr) {
/*
* We were the last queued, we moved @lock back. @prev
* will now observe @lock and will complete its
@@ -70,7 +88,13 @@ osq_wait_next(struct optimistic_spin_queue *lock,
* wait for a new @node->next from its Step-C.
*/
if (node->next) {
- next = xchg(&node->next, NULL);
+ if (barrier == acquire)
+ next = xchg_acquire(&node->next, NULL);
+ else if (barrier == release)
+ next = xchg_release(&node->next, NULL);
+ else
+ next = xchg_relaxed(&node->next, NULL);
+
if (next)
break;
}
@@ -81,7 +105,11 @@ osq_wait_next(struct optimistic_spin_queue *lock,
return next;
}
-bool osq_lock(struct optimistic_spin_queue *lock)
+/*
+ * We don't need to provide any memory barrier guarantee if the locking
fails.
+ */
+static inline bool
+__osq_lock(struct optimistic_spin_queue *lock, const enum mbtype barrier)
{
struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
struct optimistic_spin_node *prev, *next;
@@ -124,6 +152,12 @@ bool osq_lock(struct optimistic_spin_queue *lock)
cpu_relax_lowlatency();
}
+ /*
+ * Add an acquire memory barrier, if necessary, for pairing with the
+ * release barrier in unlock.
+ */
+ if (barrier == acquire)
+ smp_acquire__after_ctrl_dep();
return true;
unqueue:
@@ -137,7 +171,7 @@ unqueue:
for (;;) {
if (prev->next == node &&
- cmpxchg(&prev->next, node, NULL) == node)
+ cmpxchg_relaxed(&prev->next, node, NULL) == node)
break;
/*
@@ -164,7 +198,7 @@ unqueue:
* back to @prev.
*/
- next = osq_wait_next(lock, node, prev);
+ next = osq_wait_next(lock, node, prev, relaxed);
if (!next)
return false;
@@ -182,7 +216,8 @@ unqueue:
return false;
}
-void osq_unlock(struct optimistic_spin_queue *lock)
+static inline void
+__osq_unlock(struct optimistic_spin_queue *lock, const enum mbtype barrier)
{
struct optimistic_spin_node *node, *next;
int curr = encode_cpu(smp_processor_id());
@@ -190,21 +225,45 @@ void osq_unlock(struct optimistic_spin_queue *lock)
/*
* Fast path for the uncontended case.
*/
- if (likely(atomic_cmpxchg_release(&lock->tail, curr,
- OSQ_UNLOCKED_VAL) == curr))
+ if (likely(_atomic_cmpxchg_(barrier, &lock->tail, curr,
+ OSQ_UNLOCKED_VAL) == curr))
return;
/*
* Second most likely case.
*/
node = this_cpu_ptr(&osq_node);
- next = xchg(&node->next, NULL);
- if (next) {
- WRITE_ONCE(next->locked, 1);
+ next = xchg_relaxed(&node->next, NULL);
+ if (next)
+ goto unlock;
+
+ next = osq_wait_next(lock, node, NULL, barrier);
+ if (unlikely(!next))
return;
- }
- next = osq_wait_next(lock, node, NULL);
- if (next)
+unlock:
+ if (barrier == release)
+ smp_store_release(&next->locked, 1);
+ else
WRITE_ONCE(next->locked, 1);
}
+
+bool osq_lock(struct optimistic_spin_queue *lock)
+{
+ return __osq_lock(lock, acquire);
+}
+
+bool osq_lock_relaxed(struct optimistic_spin_queue *lock)
+{
+ return __osq_lock(lock, relaxed);
+}
+
+void osq_unlock(struct optimistic_spin_queue *lock)
+{
+ __osq_unlock(lock, release);
+}
+
+void osq_unlock_relaxed(struct optimistic_spin_queue *lock)
+{
+ __osq_unlock(lock, relaxed);
+}
diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
index 2337b4b..88e95b1 100644
--- a/kernel/locking/rwsem-xadd.c
+++ b/kernel/locking/rwsem-xadd.c
@@ -389,7 +389,7 @@ static bool rwsem_optimistic_spin(struct
rw_semaphore *sem)
if (!rwsem_can_spin_on_owner(sem))
goto done;
- if (!osq_lock(&sem->osq))
+ if (!osq_lock_relaxed(&sem->osq))
goto done;
/*
@@ -425,7 +425,7 @@ static bool rwsem_optimistic_spin(struct
rw_semaphore *sem)
*/
cpu_relax_lowlatency();
}
- osq_unlock(&sem->osq);
+ osq_unlock_relaxed(&sem->osq);
done:
preempt_enable();
return taken;
--
1.7.1
On Wed, 05 Oct 2016, Waiman Long wrote:
>diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
>index 05a3785..1e6823a 100644
>--- a/kernel/locking/osq_lock.c
>+++ b/kernel/locking/osq_lock.c
>@@ -12,6 +12,23 @@
> */
> static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node,
>osq_node);
>
>+enum mbtype {
>+ acquire,
>+ release,
>+ relaxed,
>+};
No, please.
>+
>+static __always_inline int
>+_atomic_cmpxchg_(const enum mbtype barrier, atomic_t *v, int old, int new)
>+{
>+ if (barrier == acquire)
>+ return atomic_cmpxchg_acquire(v, old, new);
>+ else if (barrier == release)
>+ return atomic_cmpxchg_release(v, old, new);
>+ else
>+ return atomic_cmpxchg_relaxed(v, old, new);
>+}
Things like the above are icky. How about something like below? I'm not
crazy about it, but there are other similar macros, ie lockref. We still
provide the osq_lock/unlock to imply acquire/release and the new _relaxed
flavor, as I agree that should be the correct naming
While I have not touched osq_wait_next(), the following are impacted:
- node->locked is now completely without ordering for _relaxed() (currently
its under smp_load_acquire, which does not match and the race is harmless
to begin with as we just iterate again. For the acquire flavor, it is always
formed with ctr dep + smp_rmb().
- If osq_lock() fails we never guarantee any ordering.
What do you think?
Thanks,
Davidlohr
diff --git a/include/linux/osq_lock.h b/include/linux/osq_lock.h
index 703ea5c30a33..183ee51e6e54 100644
--- a/include/linux/osq_lock.h
+++ b/include/linux/osq_lock.h
@@ -29,9 +29,20 @@ static inline void osq_lock_init(struct optimistic_spin_queue *lock)
atomic_set(&lock->tail, OSQ_UNLOCKED_VAL);
}
+/*
+ * Versions of osq_lock/unlock that do not imply or guarantee (load)-ACQUIRE
+ * (store)-RELEASE barrier semantics.
+ *
+ * Note that a failed call to either osq_lock() or osq_lock_relaxed() does
+ * not imply barriers... we are next to block.
+ */
+extern bool osq_lock_relaxed(struct optimistic_spin_queue *lock);
+extern void osq_unlock_relaxed(struct optimistic_spin_queue *lock);
+
extern bool osq_lock(struct optimistic_spin_queue *lock);
extern void osq_unlock(struct optimistic_spin_queue *lock);
+
static inline bool osq_is_locked(struct optimistic_spin_queue *lock)
{
return atomic_read(&lock->tail) != OSQ_UNLOCKED_VAL;
diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
index a70b90db3909..b1bf1e057565 100644
--- a/kernel/locking/mutex.c
+++ b/kernel/locking/mutex.c
@@ -316,7 +316,7 @@ static bool mutex_optimistic_spin(struct mutex *lock,
* acquire the mutex all at once, the spinners need to take a
* MCS (queued) lock first before spinning on the owner field.
*/
- if (!osq_lock(&lock->osq))
+ if (!osq_lock_relaxed(&lock->osq))
goto done;
while (true) {
@@ -358,7 +358,7 @@ static bool mutex_optimistic_spin(struct mutex *lock,
}
mutex_set_owner(lock);
- osq_unlock(&lock->osq);
+ osq_unlock_relaxed(&lock->osq);
return true;
}
@@ -380,7 +380,7 @@ static bool mutex_optimistic_spin(struct mutex *lock,
cpu_relax_lowlatency();
}
- osq_unlock(&lock->osq);
+ osq_unlock_relaxed(&lock->osq);
done:
/*
* If we fell out of the spin path because of need_resched(),
diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
index 05a37857ab55..8c3d57698702 100644
--- a/kernel/locking/osq_lock.c
+++ b/kernel/locking/osq_lock.c
@@ -28,6 +28,17 @@ static inline struct optimistic_spin_node *decode_cpu(int encoded_cpu_val)
return per_cpu_ptr(&osq_node, cpu_nr);
}
+static inline void set_node_locked_release(struct optimistic_spin_node *node)
+{
+ smp_store_release(&node->locked, 1);
+}
+
+static inline void set_node_locked_relaxed(struct optimistic_spin_node *node)
+{
+ WRITE_ONCE(node->locked, 1);
+
+}
+
/*
* Get a stable @node->next pointer, either for unlock() or unqueue() purposes.
* Can return NULL in case we were the last queued and we updated @lock instead.
@@ -81,130 +92,140 @@ osq_wait_next(struct optimistic_spin_queue *lock,
return next;
}
-bool osq_lock(struct optimistic_spin_queue *lock)
-{
- struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
- struct optimistic_spin_node *prev, *next;
- int curr = encode_cpu(smp_processor_id());
- int old;
-
- node->locked = 0;
- node->next = NULL;
- node->cpu = curr;
-
- /*
- * We need both ACQUIRE (pairs with corresponding RELEASE in
- * unlock() uncontended, or fastpath) and RELEASE (to publish
- * the node fields we just initialised) semantics when updating
- * the lock tail.
- */
- old = atomic_xchg(&lock->tail, curr);
- if (old == OSQ_UNLOCKED_VAL)
- return true;
-
- prev = decode_cpu(old);
- node->prev = prev;
- WRITE_ONCE(prev->next, node);
-
- /*
- * Normally @prev is untouchable after the above store; because at that
- * moment unlock can proceed and wipe the node element from stack.
- *
- * However, since our nodes are static per-cpu storage, we're
- * guaranteed their existence -- this allows us to apply
- * cmpxchg in an attempt to undo our queueing.
- */
-
- while (!READ_ONCE(node->locked)) {
- /*
- * If we need to reschedule bail... so we can block.
- */
- if (need_resched())
- goto unqueue;
-
- cpu_relax_lowlatency();
- }
- return true;
-
-unqueue:
- /*
- * Step - A -- stabilize @prev
- *
- * Undo our @prev->next assignment; this will make @prev's
- * unlock()/unqueue() wait for a next pointer since @lock points to us
- * (or later).
- */
-
- for (;;) {
- if (prev->next == node &&
- cmpxchg(&prev->next, node, NULL) == node)
- break;
-
- /*
- * We can only fail the cmpxchg() racing against an unlock(),
- * in which case we should observe @node->locked becomming
- * true.
- */
- if (smp_load_acquire(&node->locked))
- return true;
-
- cpu_relax_lowlatency();
-
- /*
- * Or we race against a concurrent unqueue()'s step-B, in which
- * case its step-C will write us a new @node->prev pointer.
- */
- prev = READ_ONCE(node->prev);
- }
-
- /*
- * Step - B -- stabilize @next
- *
- * Similar to unlock(), wait for @node->next or move @lock from @node
- * back to @prev.
- */
-
- next = osq_wait_next(lock, node, prev);
- if (!next)
- return false;
-
- /*
- * Step - C -- unlink
- *
- * @prev is stable because its still waiting for a new @prev->next
- * pointer, @next is stable because our @node->next pointer is NULL and
- * it will wait in Step-A.
- */
-
- WRITE_ONCE(next->prev, prev);
- WRITE_ONCE(prev->next, next);
-
- return false;
+#define OSQ_LOCK(EXT, FENCECB) \
+bool osq_lock##EXT(struct optimistic_spin_queue *lock) \
+{ \
+ struct optimistic_spin_node *node = this_cpu_ptr(&osq_node); \
+ struct optimistic_spin_node *prev, *next; \
+ int old, curr = encode_cpu(smp_processor_id()); \
+ \
+ node->locked = 0; \
+ node->next = NULL; \
+ node->cpu = curr; \
+ \
+ /* \
+ * We need both ACQUIRE (pairs with corresponding RELEASE in \
+ * unlock() uncontended, or fastpath) and RELEASE (to publish \
+ * the node fields we just initialised) semantics when updating \
+ * the lock tail. \
+ */ \
+ old = atomic_xchg(&lock->tail, curr); \
+ if (old == OSQ_UNLOCKED_VAL) \
+ return true; \
+ \
+ prev = decode_cpu(old); \
+ node->prev = prev; \
+ WRITE_ONCE(prev->next, node); \
+ \
+ /* \
+ * Normally @prev is untouchable after the above store; because \
+ * at that moment unlock can proceed and wipe the node element \
+ * from stack. \
+ * \
+ * However, since our nodes are static per-cpu storage, we're \
+ * guaranteed their existence -- this allows us to apply \
+ * cmpxchg in an attempt to undo our queueing. \
+ */ \
+ while (!READ_ONCE(node->locked)) { \
+ /* \
+ * If we need to reschedule bail... so we can block. \
+ */ \
+ if (need_resched()) \
+ goto unqueue; \
+ \
+ cpu_relax_lowlatency(); \
+ } \
+ FENCECB; \
+ return true; \
+ \
+unqueue: \
+ /* \
+ * Step - A -- stabilize @prev \
+ * \
+ * Undo our @prev->next assignment; this will make @prev's \
+ * unlock()/unqueue() wait for a next pointer since @lock \
+ * points to us (or later). \
+ */ \
+ for (;;) { \
+ /* \
+ * Failed calls to osq_lock() do not guarantee \
+ * barriers, thus always rely on RELAXED semantics. \
+ */ \
+ if (prev->next == node && \
+ cmpxchg_relaxed(&prev->next, node, NULL) == node) \
+ break; \
+ \
+ /* \
+ * We can only fail the cmpxchg() racing against an \
+ * unlock(), in which case we should observe \
+ * @node->locked becoming true. \
+ */ \
+ if (READ_ONCE(node->locked)) { \
+ FENCECB; \
+ return true; \
+ } \
+ \
+ cpu_relax_lowlatency(); \
+ \
+ /* \
+ * Or we race against a concurrent unqueue()'s step-B, \
+ * in which case its step-C will write us a new \
+ * @node->prev pointer. \
+ */ \
+ prev = READ_ONCE(node->prev); \
+ } \
+ \
+ /* \
+ * Step - B -- stabilize @next \
+ * \
+ * Similar to unlock(), wait for @node->next or move @lock \
+ * from @node back to @prev. \
+ */ \
+ \
+ next = osq_wait_next(lock, node, prev); \
+ if (!next) \
+ return false; \
+ \
+ /* \
+ * Step - C -- unlink \
+ * \
+ * @prev is stable because its still waiting for a new \
+ * @prev->next pointer, @next is stable because our \
+ * @node->next pointer is NULL and it will wait in Step-A. \
+ */ \
+ \
+ WRITE_ONCE(next->prev, prev); \
+ WRITE_ONCE(prev->next, next); \
+ \
+ return false; \
}
-void osq_unlock(struct optimistic_spin_queue *lock)
-{
- struct optimistic_spin_node *node, *next;
- int curr = encode_cpu(smp_processor_id());
-
- /*
- * Fast path for the uncontended case.
- */
- if (likely(atomic_cmpxchg_release(&lock->tail, curr,
- OSQ_UNLOCKED_VAL) == curr))
- return;
-
- /*
- * Second most likely case.
- */
- node = this_cpu_ptr(&osq_node);
- next = xchg(&node->next, NULL);
- if (next) {
- WRITE_ONCE(next->locked, 1);
- return;
- }
-
- next = osq_wait_next(lock, node, NULL);
- if (next)
- WRITE_ONCE(next->locked, 1);
+OSQ_LOCK(, smp_acquire__after_ctrl_dep())
+OSQ_LOCK(_relaxed, )
+
+#define OSQ_UNLOCK(EXT, FENCE) \
+void osq_unlock##EXT(struct optimistic_spin_queue *lock) \
+{ \
+ struct optimistic_spin_node *node, *next; \
+ int curr = encode_cpu(smp_processor_id()); \
+ \
+ /* Fast path for the uncontended case. */ \
+ if (likely(atomic_cmpxchg_##FENCE(&lock->tail, curr, \
+ OSQ_UNLOCKED_VAL) == curr)) \
+ return; \
+ \
+ /* Second most likely case. */ \
+ node = this_cpu_ptr(&osq_node); \
+ next = xchg(&node->next, NULL); \
+ if (next) \
+ goto done_setlocked; \
+ \
+ next = osq_wait_next(lock, node, NULL); \
+ if (!next) \
+ return; \
+done_setlocked: \
+ set_node_locked_##FENCE(next); \
}
+
+OSQ_UNLOCK(, release)
+OSQ_UNLOCK(_relaxed, relaxed)
diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
index 2337b4bb2366..88e95b114392 100644
--- a/kernel/locking/rwsem-xadd.c
+++ b/kernel/locking/rwsem-xadd.c
@@ -389,7 +389,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
if (!rwsem_can_spin_on_owner(sem))
goto done;
- if (!osq_lock(&sem->osq))
+ if (!osq_lock_relaxed(&sem->osq))
goto done;
/*
@@ -425,7 +425,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
*/
cpu_relax_lowlatency();
}
- osq_unlock(&sem->osq);
+ osq_unlock_relaxed(&sem->osq);
done:
preempt_enable();
return taken;
On 10/06/2016 01:47 AM, Davidlohr Bueso wrote:
> On Wed, 05 Oct 2016, Waiman Long wrote:
>
>> diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
>> index 05a3785..1e6823a 100644
>> --- a/kernel/locking/osq_lock.c
>> +++ b/kernel/locking/osq_lock.c
>> @@ -12,6 +12,23 @@
>> */
>> static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node,
>> osq_node);
>>
>> +enum mbtype {
>> + acquire,
>> + release,
>> + relaxed,
>> +};
>
> No, please.
>
>> +
>> +static __always_inline int
>> +_atomic_cmpxchg_(const enum mbtype barrier, atomic_t *v, int old,
>> int new)
>> +{
>> + if (barrier == acquire)
>> + return atomic_cmpxchg_acquire(v, old, new);
>> + else if (barrier == release)
>> + return atomic_cmpxchg_release(v, old, new);
>> + else
>> + return atomic_cmpxchg_relaxed(v, old, new);
>> +}
>
> Things like the above are icky. How about something like below? I'm not
> crazy about it, but there are other similar macros, ie lockref. We still
> provide the osq_lock/unlock to imply acquire/release and the new _relaxed
> flavor, as I agree that should be the correct naming
>
> While I have not touched osq_wait_next(), the following are impacted:
>
> - node->locked is now completely without ordering for _relaxed()
> (currently
> its under smp_load_acquire, which does not match and the race is harmless
> to begin with as we just iterate again. For the acquire flavor, it is
> always
> formed with ctr dep + smp_rmb().
>
> - If osq_lock() fails we never guarantee any ordering.
>
> What do you think?
>
> Thanks,
> Davidlohr
Yes, I am OK with your change. However, I need some additional changes
in osq_wait_next() as well. Either it is changed to use the release
variants of atomic_cmpxchg and xchg or using macro like what you did
with osq_lock and osq_unlock. The release variant is needed in the
osq_lock(). As osq_wait_next() is only invoked in the failure path of
osq_lock(), the barrier type doesn't really matter.
Cheers,
Longman
On Wed, Oct 5, 2016 at 10:47 PM, Davidlohr Bueso <[email protected]> wrote:
> On Wed, 05 Oct 2016, Waiman Long wrote:
>
>> diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
>> index 05a3785..1e6823a 100644
>> --- a/kernel/locking/osq_lock.c
>> +++ b/kernel/locking/osq_lock.c
>> @@ -12,6 +12,23 @@
>> */
>> static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node,
>> osq_node);
>>
>> +enum mbtype {
>> + acquire,
>> + release,
>> + relaxed,
>> +};
>
>
> No, please.
>
>> +
>> +static __always_inline int
>> +_atomic_cmpxchg_(const enum mbtype barrier, atomic_t *v, int old, int
>> new)
>> +{
>> + if (barrier == acquire)
>> + return atomic_cmpxchg_acquire(v, old, new);
>> + else if (barrier == release)
>> + return atomic_cmpxchg_release(v, old, new);
>> + else
>> + return atomic_cmpxchg_relaxed(v, old, new);
>> +}
>
>
> Things like the above are icky. How about something like below? I'm not
> crazy about it, but there are other similar macros, ie lockref. We still
> provide the osq_lock/unlock to imply acquire/release and the new _relaxed
> flavor, as I agree that should be the correct naming
>
> While I have not touched osq_wait_next(), the following are impacted:
>
> - node->locked is now completely without ordering for _relaxed() (currently
> its under smp_load_acquire, which does not match and the race is harmless
> to begin with as we just iterate again. For the acquire flavor, it is always
> formed with ctr dep + smp_rmb().
>
> - If osq_lock() fails we never guarantee any ordering.
>
> What do you think?
I think that this method looks cleaner than the version with the
conditionals and enum.
My first preference though would be to document that the current code
doesn't provide the acquire semantics. The thinking is that if OSQ is
just used internally as a queuing mechanism and isn't used as a
traditional lock guarding critical sections, then it may just be
misleading and just adds more complexity.
If we do want a separate acquire and relaxed variants, then I think
David's version using macros is a bit more in line with other locking
code.
Jason
Because osq has only been used for mutex/rwsem spinning logic,
we have gotten away with being rather flexible in any of the
traditional lock/unlock ACQUIRE/RELEASE minimal guarantees.
However, if wanted to be used as a _real_ lock, then it would
be in trouble. To this end, this patch provides the two
alternatives, where osq_lock/unlock() calls have the required
semantics, and a _relaxed() call, for no ordering guarantees
at all.
- node->locked is now completely without ordering for _relaxed()
(currently its under smp_load_acquire, which does not match and
the race is harmless to begin with as we just iterate again. For
the ACQUIRE flavor, it is always formed with ctr dep + smp_rmb().
- In order to avoid more code duplication via macros, the common
osq_wait_next() call is completely unordered, but the caller
can provide the necessary barriers, if required - ie the case for
osq_unlock(): similar to the node->locked case, this also relies
on ctrl dep + smp_wmb() to form RELEASE.
- If osq_lock() fails we never guarantee any ordering (obviously
same goes for _relaxed).
Both mutexes and rwsems have been updated to continue using the
relaxed versions, but this will obviously change for the later.
Signed-off-by: Davidlohr Bueso <[email protected]>
---
XXX: This obviously needs a lot of testing.
include/asm-generic/barrier.h | 9 ++
include/linux/osq_lock.h | 10 ++
kernel/locking/mutex.c | 6 +-
kernel/locking/osq_lock.c | 279 +++++++++++++++++++++++-------------------
kernel/locking/rwsem-xadd.c | 4 +-
5 files changed, 177 insertions(+), 131 deletions(-)
diff --git a/include/asm-generic/barrier.h b/include/asm-generic/barrier.h
index fe297b599b0a..0036b08151c3 100644
--- a/include/asm-generic/barrier.h
+++ b/include/asm-generic/barrier.h
@@ -221,6 +221,15 @@ do { \
#endif
/**
+ * smp_release__after_ctrl_dep() - Provide RELEASE ordering after a control dependency
+ *
+ * A control dependency provides a LOAD->STORE order, the additional WMB
+ * provides STORE->STORE order, together they provide {LOAD,STORE}->STORE order,
+ * aka. (store)-RELEASE.
+ */
+#define smp_release__after_ctrl_dep() smp_wmb()
+
+/**
* smp_cond_load_acquire() - (Spin) wait for cond with ACQUIRE ordering
* @ptr: pointer to the variable to wait on
* @cond: boolean expression to wait for
diff --git a/include/linux/osq_lock.h b/include/linux/osq_lock.h
index 703ea5c30a33..a63ffa95aa70 100644
--- a/include/linux/osq_lock.h
+++ b/include/linux/osq_lock.h
@@ -29,6 +29,16 @@ static inline void osq_lock_init(struct optimistic_spin_queue *lock)
atomic_set(&lock->tail, OSQ_UNLOCKED_VAL);
}
+/*
+ * Versions of osq_lock/unlock that do not imply or guarantee (load)-ACQUIRE
+ * (store)-RELEASE barrier semantics.
+ *
+ * Note that a failed call to either osq_lock() or osq_lock_relaxed() does
+ * not imply barriers... we are next to block.
+ */
+extern bool osq_lock_relaxed(struct optimistic_spin_queue *lock);
+extern void osq_unlock_relaxed(struct optimistic_spin_queue *lock);
+
extern bool osq_lock(struct optimistic_spin_queue *lock);
extern void osq_unlock(struct optimistic_spin_queue *lock);
diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
index a70b90db3909..b1bf1e057565 100644
--- a/kernel/locking/mutex.c
+++ b/kernel/locking/mutex.c
@@ -316,7 +316,7 @@ static bool mutex_optimistic_spin(struct mutex *lock,
* acquire the mutex all at once, the spinners need to take a
* MCS (queued) lock first before spinning on the owner field.
*/
- if (!osq_lock(&lock->osq))
+ if (!osq_lock_relaxed(&lock->osq))
goto done;
while (true) {
@@ -358,7 +358,7 @@ static bool mutex_optimistic_spin(struct mutex *lock,
}
mutex_set_owner(lock);
- osq_unlock(&lock->osq);
+ osq_unlock_relaxed(&lock->osq);
return true;
}
@@ -380,7 +380,7 @@ static bool mutex_optimistic_spin(struct mutex *lock,
cpu_relax_lowlatency();
}
- osq_unlock(&lock->osq);
+ osq_unlock_relaxed(&lock->osq);
done:
/*
* If we fell out of the spin path because of need_resched(),
diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
index 05a37857ab55..d3d1042a509c 100644
--- a/kernel/locking/osq_lock.c
+++ b/kernel/locking/osq_lock.c
@@ -28,6 +28,17 @@ static inline struct optimistic_spin_node *decode_cpu(int encoded_cpu_val)
return per_cpu_ptr(&osq_node, cpu_nr);
}
+static inline void set_node_locked_release(struct optimistic_spin_node *node)
+{
+ smp_store_release(&node->locked, 1);
+}
+
+static inline void set_node_locked_relaxed(struct optimistic_spin_node *node)
+{
+ WRITE_ONCE(node->locked, 1);
+
+}
+
/*
* Get a stable @node->next pointer, either for unlock() or unqueue() purposes.
* Can return NULL in case we were the last queued and we updated @lock instead.
@@ -50,7 +61,7 @@ osq_wait_next(struct optimistic_spin_queue *lock,
for (;;) {
if (atomic_read(&lock->tail) == curr &&
- atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) {
+ atomic_cmpxchg_relaxed(&lock->tail, curr, old) == curr) {
/*
* We were the last queued, we moved @lock back. @prev
* will now observe @lock and will complete its
@@ -70,7 +81,7 @@ osq_wait_next(struct optimistic_spin_queue *lock,
* wait for a new @node->next from its Step-C.
*/
if (node->next) {
- next = xchg(&node->next, NULL);
+ next = xchg_relaxed(&node->next, NULL);
if (next)
break;
}
@@ -81,130 +92,146 @@ osq_wait_next(struct optimistic_spin_queue *lock,
return next;
}
-bool osq_lock(struct optimistic_spin_queue *lock)
-{
- struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
- struct optimistic_spin_node *prev, *next;
- int curr = encode_cpu(smp_processor_id());
- int old;
-
- node->locked = 0;
- node->next = NULL;
- node->cpu = curr;
-
- /*
- * We need both ACQUIRE (pairs with corresponding RELEASE in
- * unlock() uncontended, or fastpath) and RELEASE (to publish
- * the node fields we just initialised) semantics when updating
- * the lock tail.
- */
- old = atomic_xchg(&lock->tail, curr);
- if (old == OSQ_UNLOCKED_VAL)
- return true;
-
- prev = decode_cpu(old);
- node->prev = prev;
- WRITE_ONCE(prev->next, node);
-
- /*
- * Normally @prev is untouchable after the above store; because at that
- * moment unlock can proceed and wipe the node element from stack.
- *
- * However, since our nodes are static per-cpu storage, we're
- * guaranteed their existence -- this allows us to apply
- * cmpxchg in an attempt to undo our queueing.
- */
-
- while (!READ_ONCE(node->locked)) {
- /*
- * If we need to reschedule bail... so we can block.
- */
- if (need_resched())
- goto unqueue;
-
- cpu_relax_lowlatency();
- }
- return true;
-
-unqueue:
- /*
- * Step - A -- stabilize @prev
- *
- * Undo our @prev->next assignment; this will make @prev's
- * unlock()/unqueue() wait for a next pointer since @lock points to us
- * (or later).
- */
-
- for (;;) {
- if (prev->next == node &&
- cmpxchg(&prev->next, node, NULL) == node)
- break;
-
- /*
- * We can only fail the cmpxchg() racing against an unlock(),
- * in which case we should observe @node->locked becomming
- * true.
- */
- if (smp_load_acquire(&node->locked))
- return true;
-
- cpu_relax_lowlatency();
-
- /*
- * Or we race against a concurrent unqueue()'s step-B, in which
- * case its step-C will write us a new @node->prev pointer.
- */
- prev = READ_ONCE(node->prev);
- }
-
- /*
- * Step - B -- stabilize @next
- *
- * Similar to unlock(), wait for @node->next or move @lock from @node
- * back to @prev.
- */
-
- next = osq_wait_next(lock, node, prev);
- if (!next)
- return false;
-
- /*
- * Step - C -- unlink
- *
- * @prev is stable because its still waiting for a new @prev->next
- * pointer, @next is stable because our @node->next pointer is NULL and
- * it will wait in Step-A.
- */
-
- WRITE_ONCE(next->prev, prev);
- WRITE_ONCE(prev->next, next);
-
- return false;
+#define OSQ_LOCK(EXT, FENCECB) \
+bool osq_lock##EXT(struct optimistic_spin_queue *lock) \
+{ \
+ struct optimistic_spin_node *node = this_cpu_ptr(&osq_node); \
+ struct optimistic_spin_node *prev, *next; \
+ int old, curr = encode_cpu(smp_processor_id()); \
+ \
+ node->locked = 0; \
+ node->next = NULL; \
+ node->cpu = curr; \
+ \
+ /* \
+ * At the very least we need RELEASE semantics to initialize \
+ * the node fields _before_ publishing it to the the lock tail. \
+ */ \
+ old = atomic_xchg_release(&lock->tail, curr); \
+ if (old == OSQ_UNLOCKED_VAL) { \
+ FENCECB; \
+ return true; \
+ } \
+ \
+ prev = decode_cpu(old); \
+ node->prev = prev; \
+ WRITE_ONCE(prev->next, node); \
+ \
+ /* \
+ * Normally @prev is untouchable after the above store; because \
+ * at that moment unlock can proceed and wipe the node element \
+ * from stack. \
+ * \
+ * However, since our nodes are static per-cpu storage, we're \
+ * guaranteed their existence -- this allows us to apply \
+ * cmpxchg in an attempt to undo our queueing. \
+ */ \
+ while (!READ_ONCE(node->locked)) { \
+ /* \
+ * If we need to reschedule bail... so we can block. \
+ */ \
+ if (need_resched()) \
+ goto unqueue; \
+ \
+ cpu_relax_lowlatency(); \
+ } \
+ FENCECB; \
+ return true; \
+ \
+unqueue: \
+ /* \
+ * Step - A -- stabilize @prev \
+ * \
+ * Undo our @prev->next assignment; this will make @prev's \
+ * unlock()/unqueue() wait for a next pointer since @lock \
+ * points to us (or later). \
+ */ \
+ for (;;) { \
+ /* \
+ * Failed calls to osq_lock() do not guarantee any \
+ * ordering, thus always rely on RELAXED semantics. \
+ * This also applies below, in Step - B. \
+ */ \
+ if (prev->next == node && \
+ cmpxchg_relaxed(&prev->next, node, NULL) == node) \
+ break; \
+ \
+ /* \
+ * We can only fail the cmpxchg() racing against an \
+ * unlock(), in which case we should observe \
+ * @node->locked becoming true. \
+ */ \
+ if (READ_ONCE(node->locked)) { \
+ FENCECB; \
+ return true; \
+ } \
+ \
+ cpu_relax_lowlatency(); \
+ \
+ /* \
+ * Or we race against a concurrent unqueue()'s step-B, \
+ * in which case its step-C will write us a new \
+ * @node->prev pointer. \
+ */ \
+ prev = READ_ONCE(node->prev); \
+ } \
+ \
+ /* \
+ * Step - B -- stabilize @next \
+ * \
+ * Similar to unlock(), wait for @node->next or move @lock \
+ * from @node back to @prev. \
+ */ \
+ next = osq_wait_next(lock, node, prev); \
+ if (!next) \
+ return false; \
+ \
+ /* \
+ * Step - C -- unlink \
+ * \
+ * @prev is stable because its still waiting for a new \
+ * @prev->next pointer, @next is stable because our \
+ * @node->next pointer is NULL and it will wait in Step-A. \
+ */ \
+ WRITE_ONCE(next->prev, prev); \
+ WRITE_ONCE(prev->next, next); \
+ \
+ return false; \
}
-void osq_unlock(struct optimistic_spin_queue *lock)
-{
- struct optimistic_spin_node *node, *next;
- int curr = encode_cpu(smp_processor_id());
-
- /*
- * Fast path for the uncontended case.
- */
- if (likely(atomic_cmpxchg_release(&lock->tail, curr,
- OSQ_UNLOCKED_VAL) == curr))
- return;
-
- /*
- * Second most likely case.
- */
- node = this_cpu_ptr(&osq_node);
- next = xchg(&node->next, NULL);
- if (next) {
- WRITE_ONCE(next->locked, 1);
- return;
- }
-
- next = osq_wait_next(lock, node, NULL);
- if (next)
- WRITE_ONCE(next->locked, 1);
+OSQ_LOCK(, smp_acquire__after_ctrl_dep())
+OSQ_LOCK(_relaxed, )
+
+#define OSQ_UNLOCK(EXT, FENCE, FENCECB) \
+void osq_unlock##EXT(struct optimistic_spin_queue *lock) \
+{ \
+ struct optimistic_spin_node *node, *next; \
+ int curr = encode_cpu(smp_processor_id()); \
+ \
+ /* \
+ * Fast path for the uncontended case. \
+ */ \
+ if (likely(atomic_cmpxchg_##FENCE(&lock->tail, curr, \
+ OSQ_UNLOCKED_VAL) == curr)) \
+ return; \
+ \
+ /* \
+ * Second most likely case. \
+ */ \
+ node = this_cpu_ptr(&osq_node); \
+ next = xchg(&node->next, NULL); \
+ if (next) \
+ goto done_setlocked; \
+ \
+ next = osq_wait_next(lock, node, NULL); \
+ if (!next) { \
+ FENCECB; \
+ return; \
+ } \
+ \
+done_setlocked: \
+ set_node_locked_##FENCE(next); \
}
+
+OSQ_UNLOCK(, release, smp_release__after_ctrl_dep())
+OSQ_UNLOCK(_relaxed, relaxed, )
diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
index 2337b4bb2366..88e95b114392 100644
--- a/kernel/locking/rwsem-xadd.c
+++ b/kernel/locking/rwsem-xadd.c
@@ -389,7 +389,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
if (!rwsem_can_spin_on_owner(sem))
goto done;
- if (!osq_lock(&sem->osq))
+ if (!osq_lock_relaxed(&sem->osq))
goto done;
/*
@@ -425,7 +425,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
*/
cpu_relax_lowlatency();
}
- osq_unlock(&sem->osq);
+ osq_unlock_relaxed(&sem->osq);
done:
preempt_enable();
return taken;
--
2.6.6