234 lines
8.5 KiB
C++
234 lines
8.5 KiB
C++
// This file is part of Eigen, a lightweight C++ template library
|
|
// for linear algebra.
|
|
//
|
|
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla
|
|
// Public License v. 2.0. If a copy of the MPL was not distributed
|
|
// with this file, You can obtain one at the mozilla.org home page
|
|
|
|
#ifndef EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
|
|
#define EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
|
|
|
|
namespace Eigen {
|
|
|
|
// EventCount allows to wait for arbitrary predicates in non-blocking
|
|
// algorithms. Think of condition variable, but wait predicate does not need to
|
|
// be protected by a mutex. Usage:
|
|
// Waiting thread does:
|
|
//
|
|
// if (predicate)
|
|
// return act();
|
|
// EventCount::Waiter& w = waiters[my_index];
|
|
// ec.Prewait(&w);
|
|
// if (predicate) {
|
|
// ec.CancelWait(&w);
|
|
// return act();
|
|
// }
|
|
// ec.CommitWait(&w);
|
|
//
|
|
// Notifying thread does:
|
|
//
|
|
// predicate = true;
|
|
// ec.Notify(true);
|
|
//
|
|
// Notify is cheap if there are no waiting threads. Prewait/CommitWait are not
|
|
// cheap, but they are executed only if the preceeding predicate check has
|
|
// failed.
|
|
//
|
|
// Algorihtm outline:
|
|
// There are two main variables: predicate (managed by user) and state_.
|
|
// Operation closely resembles Dekker mutual algorithm:
|
|
// xxxps://en.wikipedia.org/wiki/Dekker%27s_algorithm
|
|
// Waiting thread sets state_ then checks predicate, Notifying thread sets
|
|
// predicate then checks state_. Due to seq_cst fences in between these
|
|
// operations it is guaranteed than either waiter will see predicate change
|
|
// and won't block, or notifying thread will see state_ change and will unblock
|
|
// the waiter, or both. But it can't happen that both threads don't see each
|
|
// other changes, which would lead to deadlock.
|
|
class EventCount {
|
|
public:
|
|
class Waiter;
|
|
|
|
EventCount(MaxSizeVector<Waiter>& waiters) : waiters_(waiters) {
|
|
eigen_assert(waiters.size() < (1 << kWaiterBits) - 1);
|
|
// Initialize epoch to something close to overflow to test overflow.
|
|
state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2);
|
|
}
|
|
|
|
~EventCount() {
|
|
// Ensure there are no waiters.
|
|
eigen_plain_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask);
|
|
}
|
|
|
|
// Prewait prepares for waiting.
|
|
// After calling this function the thread must re-check the wait predicate
|
|
// and call either CancelWait or CommitWait passing the same Waiter object.
|
|
void Prewait(Waiter* w) {
|
|
w->epoch = state_.fetch_add(kWaiterInc, std::memory_order_relaxed);
|
|
std::atomic_thread_fence(std::memory_order_seq_cst);
|
|
}
|
|
|
|
// CommitWait commits waiting.
|
|
void CommitWait(Waiter* w) {
|
|
w->state = Waiter::kNotSignaled;
|
|
// Modification epoch of this waiter.
|
|
uint64_t epoch =
|
|
(w->epoch & kEpochMask) +
|
|
(((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
|
|
uint64_t state = state_.load(std::memory_order_seq_cst);
|
|
for (;;) {
|
|
if (int64_t((state & kEpochMask) - epoch) < 0) {
|
|
// The preceeding waiter has not decided on its fate. Wait until it
|
|
// calls either CancelWait or CommitWait, or is notified.
|
|
EIGEN_THREAD_YIELD();
|
|
state = state_.load(std::memory_order_seq_cst);
|
|
continue;
|
|
}
|
|
// We've already been notified.
|
|
if (int64_t((state & kEpochMask) - epoch) > 0) return;
|
|
// Remove this thread from prewait counter and add it to the waiter list.
|
|
eigen_assert((state & kWaiterMask) != 0);
|
|
uint64_t newstate = state - kWaiterInc + kEpochInc;
|
|
newstate = (newstate & ~kStackMask) | (w - &waiters_[0]);
|
|
if ((state & kStackMask) == kStackMask)
|
|
w->next.store(nullptr, std::memory_order_relaxed);
|
|
else
|
|
w->next.store(&waiters_[state & kStackMask], std::memory_order_relaxed);
|
|
if (state_.compare_exchange_weak(state, newstate,
|
|
std::memory_order_release))
|
|
break;
|
|
}
|
|
Park(w);
|
|
}
|
|
|
|
// CancelWait cancels effects of the previous Prewait call.
|
|
void CancelWait(Waiter* w) {
|
|
uint64_t epoch =
|
|
(w->epoch & kEpochMask) +
|
|
(((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
|
|
uint64_t state = state_.load(std::memory_order_relaxed);
|
|
for (;;) {
|
|
if (int64_t((state & kEpochMask) - epoch) < 0) {
|
|
// The preceeding waiter has not decided on its fate. Wait until it
|
|
// calls either CancelWait or CommitWait, or is notified.
|
|
EIGEN_THREAD_YIELD();
|
|
state = state_.load(std::memory_order_relaxed);
|
|
continue;
|
|
}
|
|
// We've already been notified.
|
|
if (int64_t((state & kEpochMask) - epoch) > 0) return;
|
|
// Remove this thread from prewait counter.
|
|
eigen_assert((state & kWaiterMask) != 0);
|
|
if (state_.compare_exchange_weak(state, state - kWaiterInc + kEpochInc,
|
|
std::memory_order_relaxed))
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Notify wakes one or all waiting threads.
|
|
// Must be called after changing the associated wait predicate.
|
|
void Notify(bool all) {
|
|
std::atomic_thread_fence(std::memory_order_seq_cst);
|
|
uint64_t state = state_.load(std::memory_order_acquire);
|
|
for (;;) {
|
|
// Easy case: no waiters.
|
|
if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0)
|
|
return;
|
|
uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
|
|
uint64_t newstate;
|
|
if (all) {
|
|
// Reset prewait counter and empty wait list.
|
|
newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask;
|
|
} else if (waiters) {
|
|
// There is a thread in pre-wait state, unblock it.
|
|
newstate = state + kEpochInc - kWaiterInc;
|
|
} else {
|
|
// Pop a waiter from list and unpark it.
|
|
Waiter* w = &waiters_[state & kStackMask];
|
|
Waiter* wnext = w->next.load(std::memory_order_relaxed);
|
|
uint64_t next = kStackMask;
|
|
if (wnext != nullptr) next = wnext - &waiters_[0];
|
|
// Note: we don't add kEpochInc here. ABA problem on the lock-free stack
|
|
// can't happen because a waiter is re-pushed onto the stack only after
|
|
// it was in the pre-wait state which inevitably leads to epoch
|
|
// increment.
|
|
newstate = (state & kEpochMask) + next;
|
|
}
|
|
if (state_.compare_exchange_weak(state, newstate,
|
|
std::memory_order_acquire)) {
|
|
if (!all && waiters) return; // unblocked pre-wait thread
|
|
if ((state & kStackMask) == kStackMask) return;
|
|
Waiter* w = &waiters_[state & kStackMask];
|
|
if (!all) w->next.store(nullptr, std::memory_order_relaxed);
|
|
Unpark(w);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
class Waiter {
|
|
friend class EventCount;
|
|
// Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector.
|
|
EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next;
|
|
std::mutex mu;
|
|
std::condition_variable cv;
|
|
uint64_t epoch;
|
|
unsigned state;
|
|
enum {
|
|
kNotSignaled,
|
|
kWaiting,
|
|
kSignaled,
|
|
};
|
|
};
|
|
|
|
private:
|
|
// State_ layout:
|
|
// - low kStackBits is a stack of waiters committed wait.
|
|
// - next kWaiterBits is count of waiters in prewait state.
|
|
// - next kEpochBits is modification counter.
|
|
static const uint64_t kStackBits = 16;
|
|
static const uint64_t kStackMask = (1ull << kStackBits) - 1;
|
|
static const uint64_t kWaiterBits = 16;
|
|
static const uint64_t kWaiterShift = 16;
|
|
static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1)
|
|
<< kWaiterShift;
|
|
static const uint64_t kWaiterInc = 1ull << kWaiterBits;
|
|
static const uint64_t kEpochBits = 32;
|
|
static const uint64_t kEpochShift = 32;
|
|
static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
|
|
static const uint64_t kEpochInc = 1ull << kEpochShift;
|
|
std::atomic<uint64_t> state_;
|
|
MaxSizeVector<Waiter>& waiters_;
|
|
|
|
void Park(Waiter* w) {
|
|
std::unique_lock<std::mutex> lock(w->mu);
|
|
while (w->state != Waiter::kSignaled) {
|
|
w->state = Waiter::kWaiting;
|
|
w->cv.wait(lock);
|
|
}
|
|
}
|
|
|
|
void Unpark(Waiter* waiters) {
|
|
Waiter* next = nullptr;
|
|
for (Waiter* w = waiters; w; w = next) {
|
|
next = w->next.load(std::memory_order_relaxed);
|
|
unsigned state;
|
|
{
|
|
std::unique_lock<std::mutex> lock(w->mu);
|
|
state = w->state;
|
|
w->state = Waiter::kSignaled;
|
|
}
|
|
// Avoid notifying if it wasn't waiting.
|
|
if (state == Waiter::kWaiting) w->cv.notify_one();
|
|
}
|
|
}
|
|
|
|
EventCount(const EventCount&) = delete;
|
|
void operator=(const EventCount&) = delete;
|
|
};
|
|
|
|
} // namespace Eigen
|
|
|
|
#endif // EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
|