Sunshine latest
Self-hosted game stream host for Moonlight.
task_pool.h
Go to the documentation of this file.
1
5#pragma once
6
7// standard includes
8#include <chrono>
9#include <deque>
10#include <functional>
11#include <future>
12#include <mutex>
13#include <optional>
14#include <type_traits>
15#include <utility>
16#include <vector>
17
18// local includes
19#include "move_by_copy.h"
20#include "utility.h"
21
22namespace task_pool_util {
23
24 class _ImplBase {
25 public:
26 // _unique_base_type _this_ptr;
27
28 inline virtual ~_ImplBase() = default;
29
30 virtual void run() = 0;
31 };
32
33 template<class Function>
34 class _Impl: public _ImplBase {
35 Function _func;
36
37 public:
38 _Impl(Function &&f):
39 _func(std::forward<Function>(f)) {
40 }
41
42 void run() override {
43 _func();
44 }
45 };
46
47 class TaskPool {
48 public:
49 typedef std::unique_ptr<_ImplBase> __task;
50 typedef _ImplBase *task_id_t;
51
52 typedef std::chrono::steady_clock::time_point __time_point;
53
54 template<class R>
56 public:
57 task_id_t task_id;
58 std::future<R> future;
59
60 timer_task_t(task_id_t task_id, std::future<R> &future):
61 task_id {task_id},
62 future {std::move(future)} {
63 }
64 };
65
66 protected:
67 std::deque<__task> _tasks;
68 std::vector<std::pair<__time_point, __task>> _timer_tasks;
69 std::mutex _task_mutex;
70
71 public:
72 TaskPool() = default;
73
74 TaskPool(TaskPool &&other) noexcept:
75 _tasks {std::move(other._tasks)},
76 _timer_tasks {std::move(other._timer_tasks)} {
77 }
78
79 TaskPool &operator=(TaskPool &&other) noexcept {
80 std::swap(_tasks, other._tasks);
81 std::swap(_timer_tasks, other._timer_tasks);
82
83 return *this;
84 }
85
86 template<class Function, class... Args>
87 auto push(Function &&newTask, Args &&...args) {
88 static_assert(std::is_invocable_v<Function, Args &&...>, "arguments don't match the function");
89
90 using __return = std::invoke_result_t<Function, Args &&...>;
91 using task_t = std::packaged_task<__return()>;
92
93 auto bind = [task = std::forward<Function>(newTask), tuple_args = std::make_tuple(std::forward<Args>(args)...)]() mutable {
94 return std::apply(task, std::move(tuple_args));
95 };
96
97 task_t task(std::move(bind));
98
99 auto future = task.get_future();
100
101 std::lock_guard<std::mutex> lg(_task_mutex);
102 _tasks.emplace_back(toRunnable(std::move(task)));
103
104 return future;
105 }
106
107 void pushDelayed(std::pair<__time_point, __task> &&task) {
108 std::lock_guard lg(_task_mutex);
109
110 auto it = _timer_tasks.cbegin();
111 for (; it < _timer_tasks.cend(); ++it) {
112 if (std::get<0>(*it) < task.first) {
113 break;
114 }
115 }
116
117 _timer_tasks.emplace(it, task.first, std::move(task.second));
118 }
119
123 template<class Function, class X, class Y, class... Args>
124 auto pushDelayed(Function &&newTask, std::chrono::duration<X, Y> duration, Args &&...args) {
125 static_assert(std::is_invocable_v<Function, Args &&...>, "arguments don't match the function");
126
127 using __return = std::invoke_result_t<Function, Args &&...>;
128 using task_t = std::packaged_task<__return()>;
129
130 __time_point time_point;
131 if constexpr (std::is_floating_point_v<X>) {
132 time_point = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::nanoseconds>(duration);
133 } else {
134 time_point = std::chrono::steady_clock::now() + duration;
135 }
136
137 auto bind = [task = std::forward<Function>(newTask), tuple_args = std::make_tuple(std::forward<Args>(args)...)]() mutable {
138 return std::apply(task, std::move(tuple_args));
139 };
140
141 task_t task(std::move(bind));
142
143 auto future = task.get_future();
144 auto runnable = toRunnable(std::move(task));
145
146 task_id_t task_id = &*runnable;
147
148 pushDelayed(std::pair {time_point, std::move(runnable)});
149
150 return timer_task_t<__return> {task_id, future};
151 }
152
157 template<class X, class Y>
158 void delay(task_id_t task_id, std::chrono::duration<X, Y> duration) {
159 std::lock_guard<std::mutex> lg(_task_mutex);
160
161 auto it = _timer_tasks.begin();
162 for (; it < _timer_tasks.cend(); ++it) {
163 const __task &task = std::get<1>(*it);
164
165 if (&*task == task_id) {
166 std::get<0>(*it) = std::chrono::steady_clock::now() + duration;
167
168 break;
169 }
170 }
171
172 if (it == _timer_tasks.cend()) {
173 return;
174 }
175
176 // smaller time goes to the back
177 auto prev = it - 1;
178 while (it > _timer_tasks.cbegin()) {
179 if (std::get<0>(*it) > std::get<0>(*prev)) {
180 std::swap(*it, *prev);
181 }
182
183 --prev;
184 --it;
185 }
186 }
187
188 bool cancel(task_id_t task_id) {
189 std::lock_guard lg(_task_mutex);
190
191 auto it = _timer_tasks.begin();
192 for (; it < _timer_tasks.cend(); ++it) {
193 const __task &task = std::get<1>(*it);
194
195 if (&*task == task_id) {
196 _timer_tasks.erase(it);
197
198 return true;
199 }
200 }
201
202 return false;
203 }
204
205 std::optional<std::pair<__time_point, __task>> pop(task_id_t task_id) {
206 std::lock_guard lg(_task_mutex);
207
208 auto pos = std::find_if(std::begin(_timer_tasks), std::end(_timer_tasks), [&task_id](const auto &t) {
209 return t.second.get() == task_id;
210 });
211
212 if (pos == std::end(_timer_tasks)) {
213 return std::nullopt;
214 }
215
216 return std::move(*pos);
217 }
218
219 std::optional<__task> pop() {
220 std::lock_guard lg(_task_mutex);
221
222 if (!_tasks.empty()) {
223 __task task = std::move(_tasks.front());
224 _tasks.pop_front();
225 return task;
226 }
227
228 if (!_timer_tasks.empty() && std::get<0>(_timer_tasks.back()) <= std::chrono::steady_clock::now()) {
229 __task task = std::move(std::get<1>(_timer_tasks.back()));
230 _timer_tasks.pop_back();
231 return task;
232 }
233
234 return std::nullopt;
235 }
236
237 bool ready() {
238 std::lock_guard<std::mutex> lg(_task_mutex);
239
240 return !_tasks.empty() || (!_timer_tasks.empty() && std::get<0>(_timer_tasks.back()) <= std::chrono::steady_clock::now());
241 }
242
243 std::optional<__time_point> next() {
244 std::lock_guard<std::mutex> lg(_task_mutex);
245
246 if (_timer_tasks.empty()) {
247 return std::nullopt;
248 }
249
250 return std::get<0>(_timer_tasks.back());
251 }
252
253 private:
254 template<class Function>
255 std::unique_ptr<_ImplBase> toRunnable(Function &&f) {
256 return std::make_unique<_Impl<Function>>(std::forward<Function &&>(f));
257 }
258 };
259} // namespace task_pool_util
Definition task_pool.h:47
void delay(task_id_t task_id, std::chrono::duration< X, Y > duration)
Definition task_pool.h:158
auto pushDelayed(Function &&newTask, std::chrono::duration< X, Y > duration, Args &&...args)
Definition task_pool.h:124
Definition task_pool.h:24
Definition task_pool.h:34
Declarations for the MoveByCopy utility class.
Functions for handling command line arguments.
Definition entry_handler.cpp:39
Declarations for utility functions.