258 lines
7.3 KiB
C++
258 lines
7.3 KiB
C++
// Copyright 2019 The Marl Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// https://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
#ifndef marl_ticket_h
|
|
#define marl_ticket_h
|
|
|
|
#include "conditionvariable.h"
|
|
#include "pool.h"
|
|
#include "scheduler.h"
|
|
|
|
namespace marl {
|
|
|
|
// Ticket is a synchronization primitive used to serially order execution.
|
|
//
|
|
// Tickets exist in 3 mutually exclusive states: Waiting, Called and Finished.
|
|
//
|
|
// Tickets are obtained from a Ticket::Queue, using the Ticket::Queue::take()
|
|
// methods. The order in which tickets are taken from the queue dictates the
|
|
// order in which they are called.
|
|
//
|
|
// The first ticket to be taken from a queue will be in the 'called' state,
|
|
// subsequent tickets will be in the 'waiting' state.
|
|
//
|
|
// Ticket::wait() will block until the ticket is called.
|
|
//
|
|
// Ticket::done() moves the ticket into the 'finished' state. If all preceeding
|
|
// tickets are finished, done() will call the next unfinished ticket.
|
|
//
|
|
// If the last remaining reference to an unfinished ticket is dropped then
|
|
// done() will be automatically called on that ticket.
|
|
//
|
|
// Example:
|
|
//
|
|
// void runTasksConcurrentThenSerially(int numConcurrentTasks)
|
|
// {
|
|
// marl::Ticket::Queue queue;
|
|
// for (int i = 0; i < numConcurrentTasks; i++)
|
|
// {
|
|
// auto ticket = queue.take();
|
|
// marl::schedule([=] {
|
|
// doConcurrentWork(); // <- function may be called concurrently
|
|
// ticket.wait(); // <- serialize tasks
|
|
// doSerialWork(); // <- function will not be called concurrently
|
|
// ticket.done(); // <- optional, as done() is called implicitly on
|
|
// // dropping of last reference
|
|
// });
|
|
// }
|
|
// }
|
|
class Ticket {
|
|
struct Shared;
|
|
struct Record;
|
|
|
|
public:
|
|
using OnCall = std::function<void()>;
|
|
|
|
// Queue hands out Tickets.
|
|
class Queue {
|
|
public:
|
|
// take() returns a single ticket from the queue.
|
|
MARL_NO_EXPORT inline Ticket take();
|
|
|
|
// take() retrieves count tickets from the queue, calling f() with each
|
|
// retrieved ticket.
|
|
// F must be a function of the signature: void(Ticket&&)
|
|
template <typename F>
|
|
MARL_NO_EXPORT inline void take(size_t count, const F& f);
|
|
|
|
private:
|
|
std::shared_ptr<Shared> shared = std::make_shared<Shared>();
|
|
UnboundedPool<Record> pool;
|
|
};
|
|
|
|
MARL_NO_EXPORT inline Ticket() = default;
|
|
MARL_NO_EXPORT inline Ticket(const Ticket& other) = default;
|
|
MARL_NO_EXPORT inline Ticket(Ticket&& other) = default;
|
|
MARL_NO_EXPORT inline Ticket& operator=(const Ticket& other) = default;
|
|
|
|
// wait() blocks until the ticket is called.
|
|
MARL_NO_EXPORT inline void wait() const;
|
|
|
|
// done() marks the ticket as finished and calls the next ticket.
|
|
MARL_NO_EXPORT inline void done() const;
|
|
|
|
// onCall() registers the function f to be invoked when this ticket is
|
|
// called. If the ticket is already called prior to calling onCall(), then
|
|
// f() will be executed immediately.
|
|
// F must be a function of the OnCall signature.
|
|
template <typename F>
|
|
MARL_NO_EXPORT inline void onCall(F&& f) const;
|
|
|
|
private:
|
|
// Internal doubly-linked-list data structure. One per ticket instance.
|
|
struct Record {
|
|
MARL_NO_EXPORT inline ~Record();
|
|
|
|
MARL_NO_EXPORT inline void done();
|
|
MARL_NO_EXPORT inline void callAndUnlock(marl::lock& lock);
|
|
MARL_NO_EXPORT inline void unlink(); // guarded by shared->mutex
|
|
|
|
ConditionVariable isCalledCondVar;
|
|
|
|
std::shared_ptr<Shared> shared;
|
|
Record* next = nullptr; // guarded by shared->mutex
|
|
Record* prev = nullptr; // guarded by shared->mutex
|
|
OnCall onCall; // guarded by shared->mutex
|
|
bool isCalled = false; // guarded by shared->mutex
|
|
std::atomic<bool> isDone = {false};
|
|
};
|
|
|
|
// Data shared between all tickets and the queue.
|
|
struct Shared {
|
|
marl::mutex mutex;
|
|
Record tail;
|
|
};
|
|
|
|
MARL_NO_EXPORT inline Ticket(Loan<Record>&& record);
|
|
|
|
Loan<Record> record;
|
|
};
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
// Ticket
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
Ticket::Ticket(Loan<Record>&& record) : record(std::move(record)) {}
|
|
|
|
void Ticket::wait() const {
|
|
marl::lock lock(record->shared->mutex);
|
|
record->isCalledCondVar.wait(lock, [this] { return record->isCalled; });
|
|
}
|
|
|
|
void Ticket::done() const {
|
|
record->done();
|
|
}
|
|
|
|
template <typename Function>
|
|
void Ticket::onCall(Function&& f) const {
|
|
marl::lock lock(record->shared->mutex);
|
|
if (record->isCalled) {
|
|
marl::schedule(std::forward<Function>(f));
|
|
return;
|
|
}
|
|
if (record->onCall) {
|
|
struct Joined {
|
|
void operator()() const {
|
|
a();
|
|
b();
|
|
}
|
|
OnCall a, b;
|
|
};
|
|
record->onCall =
|
|
std::move(Joined{std::move(record->onCall), std::forward<Function>(f)});
|
|
} else {
|
|
record->onCall = std::forward<Function>(f);
|
|
}
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
// Ticket::Queue
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
Ticket Ticket::Queue::take() {
|
|
Ticket out;
|
|
take(1, [&](Ticket&& ticket) { out = std::move(ticket); });
|
|
return out;
|
|
}
|
|
|
|
template <typename F>
|
|
void Ticket::Queue::take(size_t n, const F& f) {
|
|
Loan<Record> first, last;
|
|
pool.borrow(n, [&](Loan<Record>&& record) {
|
|
Loan<Record> rec = std::move(record);
|
|
rec->shared = shared;
|
|
if (first.get() == nullptr) {
|
|
first = rec;
|
|
}
|
|
if (last.get() != nullptr) {
|
|
last->next = rec.get();
|
|
rec->prev = last.get();
|
|
}
|
|
last = rec;
|
|
f(std::move(Ticket(std::move(rec))));
|
|
});
|
|
last->next = &shared->tail;
|
|
marl::lock lock(shared->mutex);
|
|
first->prev = shared->tail.prev;
|
|
shared->tail.prev = last.get();
|
|
if (first->prev == nullptr) {
|
|
first->callAndUnlock(lock);
|
|
} else {
|
|
first->prev->next = first.get();
|
|
}
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
// Ticket::Record
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
Ticket::Record::~Record() {
|
|
if (shared != nullptr) {
|
|
done();
|
|
}
|
|
}
|
|
|
|
void Ticket::Record::done() {
|
|
if (isDone.exchange(true)) {
|
|
return;
|
|
}
|
|
marl::lock lock(shared->mutex);
|
|
auto callNext = (prev == nullptr && next != nullptr) ? next : nullptr;
|
|
unlink();
|
|
if (callNext != nullptr) {
|
|
// lock needs to be held otherwise callNext might be destructed.
|
|
callNext->callAndUnlock(lock);
|
|
}
|
|
}
|
|
|
|
void Ticket::Record::callAndUnlock(marl::lock& lock) {
|
|
if (isCalled) {
|
|
return;
|
|
}
|
|
isCalled = true;
|
|
OnCall callback;
|
|
std::swap(callback, onCall);
|
|
isCalledCondVar.notify_all();
|
|
lock.unlock_no_tsa();
|
|
|
|
if (callback) {
|
|
marl::schedule(std::move(callback));
|
|
}
|
|
}
|
|
|
|
void Ticket::Record::unlink() {
|
|
if (prev != nullptr) {
|
|
prev->next = next;
|
|
}
|
|
if (next != nullptr) {
|
|
next->prev = prev;
|
|
}
|
|
prev = nullptr;
|
|
next = nullptr;
|
|
}
|
|
|
|
} // namespace marl
|
|
|
|
#endif // marl_ticket_h
|