You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

io_context 21KB

3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872
  1. // <experimental/io_service> -*- C++ -*-
  2. // Copyright (C) 2015-2020 Free Software Foundation, Inc.
  3. //
  4. // This file is part of the GNU ISO C++ Library. This library is free
  5. // software; you can redistribute it and/or modify it under the
  6. // terms of the GNU General Public License as published by the
  7. // Free Software Foundation; either version 3, or (at your option)
  8. // any later version.
  9. // This library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU General Public License for more details.
  13. // Under Section 7 of GPL version 3, you are granted additional
  14. // permissions described in the GCC Runtime Library Exception, version
  15. // 3.1, as published by the Free Software Foundation.
  16. // You should have received a copy of the GNU General Public License and
  17. // a copy of the GCC Runtime Library Exception along with this program;
  18. // see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
  19. // <http://www.gnu.org/licenses/>.
  20. /** @file experimental/io_context
  21. * This is a TS C++ Library header.
  22. * @ingroup networking-ts
  23. */
  24. #ifndef _GLIBCXX_EXPERIMENTAL_IO_SERVICE
  25. #define _GLIBCXX_EXPERIMENTAL_IO_SERVICE 1
  26. #pragma GCC system_header
  27. #if __cplusplus >= 201402L
  28. #include <atomic>
  29. #include <chrono>
  30. #include <forward_list>
  31. #include <functional>
  32. #include <system_error>
  33. #include <thread>
  34. #include <experimental/netfwd>
  35. #include <experimental/executor>
  36. #if _GLIBCXX_HAVE_UNISTD_H
  37. # include <unistd.h>
  38. #endif
  39. #ifdef _GLIBCXX_HAVE_POLL_H
  40. # include <poll.h>
  41. #endif
  42. #ifdef _GLIBCXX_HAVE_FCNTL_H
  43. # include <fcntl.h>
  44. #endif
  45. namespace std _GLIBCXX_VISIBILITY(default)
  46. {
  47. _GLIBCXX_BEGIN_NAMESPACE_VERSION
  48. namespace experimental
  49. {
  50. namespace net
  51. {
  52. inline namespace v1
  53. {
  54. /** @addtogroup networking-ts
  55. * @{
  56. */
  57. class __socket_impl;
  58. /// An ExecutionContext for I/O operations.
  59. class io_context : public execution_context
  60. {
  61. public:
  62. // types:
  63. /// An executor for an io_context.
  64. class executor_type
  65. {
  66. public:
  67. // construct / copy / destroy:
  68. executor_type(const executor_type& __other) noexcept = default;
  69. executor_type(executor_type&& __other) noexcept = default;
  70. executor_type& operator=(const executor_type& __other) noexcept = default;
  71. executor_type& operator=(executor_type&& __other) noexcept = default;
  72. // executor operations:
  73. bool running_in_this_thread() const noexcept
  74. {
  75. lock_guard<mutex> __lock(_M_ctx->_M_mtx);
  76. auto __end = _M_ctx->_M_call_stack.end();
  77. return std::find(_M_ctx->_M_call_stack.begin(), __end,
  78. this_thread::get_id()) != __end;
  79. }
  80. io_context& context() const noexcept { return *_M_ctx; }
  81. void on_work_started() const noexcept { ++_M_ctx->_M_work_count; }
  82. void on_work_finished() const noexcept { --_M_ctx->_M_work_count; }
  83. template<typename _Func, typename _ProtoAllocator>
  84. void
  85. dispatch(_Func&& __f, const _ProtoAllocator& __a) const
  86. {
  87. if (running_in_this_thread())
  88. decay_t<_Func>{std::forward<_Func>(__f)}();
  89. else
  90. post(std::forward<_Func>(__f), __a);
  91. }
  92. template<typename _Func, typename _ProtoAllocator>
  93. void
  94. post(_Func&& __f, const _ProtoAllocator& __a) const
  95. {
  96. lock_guard<mutex> __lock(_M_ctx->_M_mtx);
  97. // TODO (re-use functionality in system_context)
  98. _M_ctx->_M_reactor._M_notify();
  99. }
  100. template<typename _Func, typename _ProtoAllocator>
  101. void
  102. defer(_Func&& __f, const _ProtoAllocator& __a) const
  103. { post(std::forward<_Func>(__f), __a); }
  104. private:
  105. friend io_context;
  106. explicit
  107. executor_type(io_context& __ctx) : _M_ctx(std::addressof(__ctx)) { }
  108. io_context* _M_ctx;
  109. };
  110. using count_type = size_t;
  111. // construct / copy / destroy:
  112. io_context() : _M_work_count(0) { }
  113. explicit
  114. io_context(int __concurrency_hint) : _M_work_count(0) { }
  115. io_context(const io_context&) = delete;
  116. io_context& operator=(const io_context&) = delete;
  117. // io_context operations:
  118. executor_type get_executor() noexcept { return executor_type(*this); }
  119. count_type
  120. run()
  121. {
  122. count_type __n = 0;
  123. while (run_one())
  124. if (__n != numeric_limits<count_type>::max())
  125. ++__n;
  126. return __n;
  127. }
  128. template<typename _Rep, typename _Period>
  129. count_type
  130. run_for(const chrono::duration<_Rep, _Period>& __rel_time)
  131. { return run_until(chrono::steady_clock::now() + __rel_time); }
  132. template<typename _Clock, typename _Duration>
  133. count_type
  134. run_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
  135. {
  136. count_type __n = 0;
  137. while (run_one_until(__abs_time))
  138. if (__n != numeric_limits<count_type>::max())
  139. ++__n;
  140. return __n;
  141. }
  142. count_type
  143. run_one()
  144. { return _M_do_one(chrono::milliseconds{-1}); }
  145. template<typename _Rep, typename _Period>
  146. count_type
  147. run_one_for(const chrono::duration<_Rep, _Period>& __rel_time)
  148. { return run_one_until(chrono::steady_clock::now() + __rel_time); }
  149. template<typename _Clock, typename _Duration>
  150. count_type
  151. run_one_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
  152. {
  153. auto __now = _Clock::now();
  154. while (__now < __abs_time)
  155. {
  156. using namespace std::chrono;
  157. auto __ms = duration_cast<milliseconds>(__abs_time - __now);
  158. if (_M_do_one(__ms))
  159. return 1;
  160. __now = _Clock::now();
  161. }
  162. return 0;
  163. }
  164. count_type
  165. poll()
  166. {
  167. count_type __n = 0;
  168. while (poll_one())
  169. if (__n != numeric_limits<count_type>::max())
  170. ++__n;
  171. return __n;
  172. }
  173. count_type
  174. poll_one()
  175. { return _M_do_one(chrono::milliseconds{0}); }
  176. void stop()
  177. {
  178. lock_guard<mutex> __lock(_M_mtx);
  179. _M_stopped = true;
  180. _M_reactor._M_notify();
  181. }
  182. bool stopped() const noexcept
  183. {
  184. lock_guard<mutex> __lock(_M_mtx);
  185. return _M_stopped;
  186. }
  187. void restart()
  188. {
  189. _M_stopped = false;
  190. }
  191. private:
  192. template<typename _Clock, typename _WaitTraits>
  193. friend class basic_waitable_timer;
  194. friend __socket_impl;
  195. template<typename _Protocol>
  196. friend class __basic_socket_impl;
  197. template<typename _Protocol>
  198. friend class basic_socket;
  199. template<typename _Protocol>
  200. friend class basic_datagram_socket;
  201. template<typename _Protocol>
  202. friend class basic_stream_socket;
  203. template<typename _Protocol>
  204. friend class basic_socket_acceptor;
  205. count_type
  206. _M_outstanding_work() const
  207. { return _M_work_count + !_M_ops.empty(); }
  208. struct __timer_queue_base : execution_context::service
  209. {
  210. // return milliseconds until next timer expires, or milliseconds::max()
  211. virtual chrono::milliseconds _M_next() const = 0;
  212. virtual bool run_one() = 0;
  213. protected:
  214. explicit
  215. __timer_queue_base(execution_context& __ctx) : service(__ctx)
  216. {
  217. auto& __ioc = static_cast<io_context&>(__ctx);
  218. lock_guard<mutex> __lock(__ioc._M_mtx);
  219. __ioc._M_timers.push_back(this);
  220. }
  221. mutable mutex _M_qmtx;
  222. };
  223. template<typename _Timer, typename _Key = typename _Timer::_Key>
  224. struct __timer_queue : __timer_queue_base
  225. {
  226. using key_type = __timer_queue;
  227. explicit
  228. __timer_queue(execution_context& __ctx) : __timer_queue_base(__ctx)
  229. { }
  230. void shutdown() noexcept { }
  231. io_context& context() noexcept
  232. { return static_cast<io_context&>(service::context()); }
  233. // Start an asynchronous wait.
  234. void
  235. push(const _Timer& __t, function<void(error_code)> __h)
  236. {
  237. context().get_executor().on_work_started();
  238. lock_guard<mutex> __lock(_M_qmtx);
  239. _M_queue.emplace(__t, _M_next_id++, std::move(__h));
  240. // no need to notify reactor unless this timer went to the front?
  241. }
  242. // Cancel all outstanding waits for __t
  243. size_t
  244. cancel(const _Timer& __t)
  245. {
  246. lock_guard<mutex> __lock(_M_qmtx);
  247. size_t __count = 0;
  248. auto __last = _M_queue.end();
  249. for (auto __it = _M_queue.begin(), __end = __last; __it != __end;
  250. ++__it)
  251. {
  252. if (__it->_M_key == __t._M_key.get())
  253. {
  254. __it->cancel();
  255. __last = __it;
  256. ++__count;
  257. }
  258. }
  259. if (__count)
  260. _M_queue._M_sort_to(__last);
  261. return __count;
  262. }
  263. // Cancel oldest outstanding wait for __t
  264. bool
  265. cancel_one(const _Timer& __t)
  266. {
  267. lock_guard<mutex> __lock(_M_qmtx);
  268. const auto __end = _M_queue.end();
  269. auto __oldest = __end;
  270. for (auto __it = _M_queue.begin(); __it != __end; ++__it)
  271. if (__it->_M_key == __t._M_key.get())
  272. if (__oldest == __end || __it->_M_id < __oldest->_M_id)
  273. __oldest = __it;
  274. if (__oldest == __end)
  275. return false;
  276. __oldest->cancel();
  277. _M_queue._M_sort_to(__oldest);
  278. return true;
  279. }
  280. chrono::milliseconds
  281. _M_next() const override
  282. {
  283. typename _Timer::time_point __exp;
  284. {
  285. lock_guard<mutex> __lock(_M_qmtx);
  286. if (_M_queue.empty())
  287. return chrono::milliseconds::max(); // no pending timers
  288. if (_M_queue.top()._M_key == nullptr)
  289. return chrono::milliseconds::zero(); // cancelled, run now
  290. __exp = _M_queue.top()._M_expiry;
  291. }
  292. auto __dur = _Timer::traits_type::to_wait_duration(__exp);
  293. if (__dur < __dur.zero())
  294. __dur = __dur.zero();
  295. return chrono::duration_cast<chrono::milliseconds>(__dur);
  296. }
  297. private:
  298. bool run_one() override
  299. {
  300. auto __now = _Timer::clock_type::now();
  301. function<void(error_code)> __h;
  302. error_code __ec;
  303. {
  304. lock_guard<mutex> __lock(_M_qmtx);
  305. if (_M_queue.top()._M_key == nullptr) // cancelled
  306. {
  307. __h = std::move(_M_queue.top()._M_h);
  308. __ec = std::make_error_code(errc::operation_canceled);
  309. _M_queue.pop();
  310. }
  311. else if (_M_queue.top()._M_expiry <= _Timer::clock_type::now())
  312. {
  313. __h = std::move(_M_queue.top()._M_h);
  314. _M_queue.pop();
  315. }
  316. }
  317. if (__h)
  318. {
  319. __h(__ec);
  320. context().get_executor().on_work_finished();
  321. return true;
  322. }
  323. return false;
  324. }
  325. using __timer_id_type = uint64_t;
  326. struct __pending_timer
  327. {
  328. __pending_timer(const _Timer& __t, uint64_t __id,
  329. function<void(error_code)> __h)
  330. : _M_expiry(__t.expiry()), _M_key(__t._M_key.get()), _M_id(__id),
  331. _M_h(std::move(__h))
  332. { }
  333. typename _Timer::time_point _M_expiry;
  334. _Key* _M_key;
  335. __timer_id_type _M_id;
  336. function<void(error_code)> _M_h;
  337. void cancel() { _M_expiry = _M_expiry.min(); _M_key = nullptr; }
  338. bool
  339. operator<(const __pending_timer& __rhs) const
  340. { return _M_expiry < __rhs._M_expiry; }
  341. };
  342. struct __queue : priority_queue<__pending_timer>
  343. {
  344. using iterator =
  345. typename priority_queue<__pending_timer>::container_type::iterator;
  346. // expose begin/end/erase for direct access to underlying container
  347. iterator begin() { return this->c.begin(); }
  348. iterator end() { return this->c.end(); }
  349. iterator erase(iterator __it) { return this->c.erase(__it); }
  350. void
  351. _M_sort_to(iterator __it)
  352. { std::stable_sort(this->c.begin(), ++__it); }
  353. };
  354. __queue _M_queue;
  355. __timer_id_type _M_next_id = 0;
  356. };
  357. template<typename _Timer, typename _CompletionHandler>
  358. void
  359. async_wait(const _Timer& __timer, _CompletionHandler&& __h)
  360. {
  361. auto& __queue = use_service<__timer_queue<_Timer>>(*this);
  362. __queue.push(__timer, std::move(__h));
  363. _M_reactor._M_notify();
  364. }
  365. // Cancel all wait operations initiated by __timer.
  366. template<typename _Timer>
  367. size_t
  368. cancel(const _Timer& __timer)
  369. {
  370. if (!has_service<__timer_queue<_Timer>>(*this))
  371. return 0;
  372. auto __c = use_service<__timer_queue<_Timer>>(*this).cancel(__timer);
  373. if (__c != 0)
  374. _M_reactor._M_notify();
  375. return __c;
  376. }
  377. // Cancel the oldest wait operation initiated by __timer.
  378. template<typename _Timer>
  379. size_t
  380. cancel_one(const _Timer& __timer)
  381. {
  382. if (!has_service<__timer_queue<_Timer>>(*this))
  383. return 0;
  384. if (use_service<__timer_queue<_Timer>>(*this).cancel_one(__timer))
  385. {
  386. _M_reactor._M_notify();
  387. return 1;
  388. }
  389. return 0;
  390. }
  391. template<typename _Op>
  392. void
  393. async_wait(int __fd, int __w, _Op&& __op)
  394. {
  395. lock_guard<mutex> __lock(_M_mtx);
  396. // TODO need push_back, use std::list not std::forward_list
  397. auto __tail = _M_ops.before_begin(), __it = _M_ops.begin();
  398. while (__it != _M_ops.end())
  399. {
  400. ++__it;
  401. ++__tail;
  402. }
  403. using __type = __async_operation_impl<_Op>;
  404. _M_ops.emplace_after(__tail,
  405. make_unique<__type>(std::move(__op), __fd, __w));
  406. _M_reactor._M_fd_interest(__fd, __w);
  407. }
  408. void _M_add_fd(int __fd) { _M_reactor._M_add_fd(__fd); }
  409. void _M_remove_fd(int __fd) { _M_reactor._M_remove_fd(__fd); }
  410. void cancel(int __fd, error_code&)
  411. {
  412. lock_guard<mutex> __lock(_M_mtx);
  413. const auto __end = _M_ops.end();
  414. auto __it = _M_ops.begin();
  415. auto __prev = _M_ops.before_begin();
  416. while (__it != __end && (*__it)->_M_is_cancelled())
  417. {
  418. ++__it;
  419. ++__prev;
  420. }
  421. auto __cancelled = __prev;
  422. while (__it != __end)
  423. {
  424. if ((*__it)->_M_fd == __fd)
  425. {
  426. (*__it)->cancel();
  427. ++__it;
  428. _M_ops.splice_after(__cancelled, _M_ops, __prev);
  429. ++__cancelled;
  430. }
  431. else
  432. {
  433. ++__it;
  434. ++__prev;
  435. }
  436. }
  437. _M_reactor._M_not_interested(__fd);
  438. }
  439. struct __async_operation
  440. {
  441. __async_operation(int __fd, int __ev) : _M_fd(__fd), _M_ev(__ev) { }
  442. virtual ~__async_operation() = default;
  443. int _M_fd;
  444. short _M_ev;
  445. void cancel() { _M_fd = -1; }
  446. bool _M_is_cancelled() const { return _M_fd == -1; }
  447. virtual void run(io_context&) = 0;
  448. };
  449. template<typename _Op>
  450. struct __async_operation_impl : __async_operation
  451. {
  452. __async_operation_impl(_Op&& __op, int __fd, int __ev)
  453. : __async_operation{__fd, __ev}, _M_op(std::move(__op)) { }
  454. _Op _M_op;
  455. void run(io_context& __ctx)
  456. {
  457. if (_M_is_cancelled())
  458. _M_op(std::make_error_code(errc::operation_canceled));
  459. else
  460. _M_op(error_code{});
  461. }
  462. };
  463. atomic<count_type> _M_work_count;
  464. mutable mutex _M_mtx;
  465. queue<function<void()>> _M_op;
  466. bool _M_stopped = false;
  467. struct __monitor
  468. {
  469. __monitor(io_context& __c) : _M_ctx(__c)
  470. {
  471. lock_guard<mutex> __lock(_M_ctx._M_mtx);
  472. _M_ctx._M_call_stack.push_back(this_thread::get_id());
  473. }
  474. ~__monitor()
  475. {
  476. lock_guard<mutex> __lock(_M_ctx._M_mtx);
  477. _M_ctx._M_call_stack.pop_back();
  478. if (_M_ctx._M_outstanding_work() == 0)
  479. {
  480. _M_ctx._M_stopped = true;
  481. _M_ctx._M_reactor._M_notify();
  482. }
  483. }
  484. __monitor(__monitor&&) = delete;
  485. io_context& _M_ctx;
  486. };
  487. bool
  488. _M_do_one(chrono::milliseconds __timeout)
  489. {
  490. const bool __block = __timeout != chrono::milliseconds::zero();
  491. __reactor::__fdvec __fds;
  492. __monitor __mon{*this};
  493. __timer_queue_base* __timerq = nullptr;
  494. unique_ptr<__async_operation> __async_op;
  495. while (true)
  496. {
  497. if (__timerq)
  498. {
  499. if (__timerq->run_one())
  500. return true;
  501. else
  502. __timerq = nullptr;
  503. }
  504. if (__async_op)
  505. {
  506. __async_op->run(*this);
  507. // TODO need to unregister __async_op
  508. return true;
  509. }
  510. chrono::milliseconds __ms{0};
  511. {
  512. lock_guard<mutex> __lock(_M_mtx);
  513. if (_M_stopped)
  514. return false;
  515. // find first timer with something to do
  516. for (auto __q : _M_timers)
  517. {
  518. auto __next = __q->_M_next();
  519. if (__next == __next.zero()) // ready to run immediately
  520. {
  521. __timerq = __q;
  522. __ms = __next;
  523. break;
  524. }
  525. else if (__next != __next.max() && __block
  526. && (__next < __ms || __timerq == nullptr))
  527. {
  528. __timerq = __q;
  529. __ms = __next;
  530. }
  531. }
  532. if (__timerq && __ms == __ms.zero())
  533. continue; // restart loop to run a timer immediately
  534. if (!_M_ops.empty() && _M_ops.front()->_M_is_cancelled())
  535. {
  536. _M_ops.front().swap(__async_op);
  537. _M_ops.pop_front();
  538. continue;
  539. }
  540. // TODO run any posted items
  541. if (__block)
  542. {
  543. if (__timerq == nullptr)
  544. __ms = __timeout;
  545. else if (__ms.zero() <= __timeout && __timeout < __ms)
  546. __ms = __timeout;
  547. else if (__ms.count() > numeric_limits<int>::max())
  548. __ms = chrono::milliseconds{numeric_limits<int>::max()};
  549. }
  550. // else __ms == 0 and poll() will return immediately
  551. }
  552. auto __res = _M_reactor.wait(__fds, __ms);
  553. if (__res == __reactor::_S_retry)
  554. continue;
  555. if (__res == __reactor::_S_timeout)
  556. if (__timerq == nullptr)
  557. return false;
  558. else
  559. continue; // timed out, so restart loop and process the timer
  560. __timerq = nullptr;
  561. if (__fds.empty()) // nothing to do
  562. return false;
  563. lock_guard<mutex> __lock(_M_mtx);
  564. for (auto __it = _M_ops.begin(), __end = _M_ops.end(),
  565. __prev = _M_ops.before_begin(); __it != __end; ++__it, ++__prev)
  566. {
  567. auto& __op = **__it;
  568. auto __pos = std::lower_bound(__fds.begin(), __fds.end(),
  569. __op._M_fd,
  570. [](const auto& __p, int __fd) { return __p.fd < __fd; });
  571. if (__pos != __fds.end() && __pos->fd == __op._M_fd
  572. && __pos->revents & __op._M_ev)
  573. {
  574. __it->swap(__async_op);
  575. _M_ops.erase_after(__prev);
  576. break; // restart loop and run op
  577. }
  578. }
  579. }
  580. }
  581. struct __reactor
  582. {
  583. __reactor() : _M_fds(1)
  584. {
  585. int __pipe[2];
  586. if (::pipe(__pipe) == -1)
  587. __throw_system_error(errno);
  588. if (::fcntl(__pipe[0], F_SETFL, O_NONBLOCK) == -1
  589. || ::fcntl(__pipe[1], F_SETFL, O_NONBLOCK) == -1)
  590. {
  591. int __e = errno;
  592. ::close(__pipe[0]);
  593. ::close(__pipe[1]);
  594. __throw_system_error(__e);
  595. }
  596. _M_fds.back().events = POLLIN;
  597. _M_fds.back().fd = __pipe[0];
  598. _M_notify_wr = __pipe[1];
  599. }
  600. ~__reactor()
  601. {
  602. ::close(_M_fds.back().fd);
  603. ::close(_M_notify_wr);
  604. }
  605. // write a notification byte to the pipe (ignoring errors)
  606. void _M_notify()
  607. {
  608. int __n;
  609. do {
  610. __n = ::write(_M_notify_wr, "", 1);
  611. } while (__n == -1 && errno == EINTR);
  612. }
  613. // read all notification bytes from the pipe
  614. void _M_on_notify()
  615. {
  616. // Drain the pipe.
  617. char __buf[64];
  618. ssize_t __n;
  619. do {
  620. __n = ::read(_M_fds.back().fd, __buf, sizeof(__buf));
  621. } while (__n != -1 || errno == EINTR);
  622. }
  623. void
  624. _M_add_fd(int __fd)
  625. {
  626. auto __pos = _M_lower_bound(__fd);
  627. if (__pos->fd == __fd)
  628. __throw_system_error((int)errc::invalid_argument);
  629. _M_fds.insert(__pos, __fdvec::value_type{})->fd = __fd;
  630. _M_notify();
  631. }
  632. void
  633. _M_remove_fd(int __fd)
  634. {
  635. auto __pos = _M_lower_bound(__fd);
  636. if (__pos->fd == __fd)
  637. _M_fds.erase(__pos);
  638. // else bug!
  639. _M_notify();
  640. }
  641. void
  642. _M_fd_interest(int __fd, int __w)
  643. {
  644. auto __pos = _M_lower_bound(__fd);
  645. if (__pos->fd == __fd)
  646. __pos->events |= __w;
  647. // else bug!
  648. _M_notify();
  649. }
  650. void
  651. _M_not_interested(int __fd)
  652. {
  653. auto __pos = _M_lower_bound(__fd);
  654. if (__pos->fd == __fd)
  655. __pos->events = 0;
  656. _M_notify();
  657. }
  658. # ifdef _GLIBCXX_HAVE_POLL_H
  659. using __fdvec = vector<::pollfd>;
  660. // Find first element p such that !(p.fd < __fd)
  661. // N.B. always returns a dereferencable iterator.
  662. __fdvec::iterator
  663. _M_lower_bound(int __fd)
  664. {
  665. return std::lower_bound(_M_fds.begin(), _M_fds.end() - 1,
  666. __fd, [](const auto& __p, int __fd) { return __p.fd < __fd; });
  667. }
  668. enum __status { _S_retry, _S_timeout, _S_ok, _S_error };
  669. __status
  670. wait(__fdvec& __fds, chrono::milliseconds __timeout)
  671. {
  672. // XXX not thread-safe!
  673. __fds = _M_fds; // take snapshot to pass to poll()
  674. int __res = ::poll(__fds.data(), __fds.size(), __timeout.count());
  675. if (__res == -1)
  676. {
  677. __fds.clear();
  678. if (errno == EINTR)
  679. return _S_retry;
  680. return _S_error; // XXX ???
  681. }
  682. else if (__res == 0)
  683. {
  684. __fds.clear();
  685. return _S_timeout;
  686. }
  687. else if (__fds.back().revents != 0) // something changed, restart
  688. {
  689. __fds.clear();
  690. _M_on_notify();
  691. return _S_retry;
  692. }
  693. auto __part = std::stable_partition(__fds.begin(), __fds.end() - 1,
  694. [](const __fdvec::value_type& __p) { return __p.revents != 0; });
  695. __fds.erase(__part, __fds.end());
  696. return _S_ok;
  697. }
  698. __fdvec _M_fds; // _M_fds.back() is the read end of the self-pipe
  699. #endif
  700. int _M_notify_wr; // write end of the self-pipe
  701. };
  702. __reactor _M_reactor;
  703. vector<__timer_queue_base*> _M_timers;
  704. forward_list<unique_ptr<__async_operation>> _M_ops;
  705. vector<thread::id> _M_call_stack;
  706. };
  707. inline bool
  708. operator==(const io_context::executor_type& __a,
  709. const io_context::executor_type& __b) noexcept
  710. {
  711. // https://github.com/chriskohlhoff/asio-tr2/issues/201
  712. using executor_type = io_context::executor_type;
  713. return std::addressof(executor_type(__a).context())
  714. == std::addressof(executor_type(__b).context());
  715. }
  716. inline bool
  717. operator!=(const io_context::executor_type& __a,
  718. const io_context::executor_type& __b) noexcept
  719. { return !(__a == __b); }
  720. template<> struct is_executor<io_context::executor_type> : true_type {};
  721. /// @}
  722. } // namespace v1
  723. } // namespace net
  724. } // namespace experimental
  725. _GLIBCXX_END_NAMESPACE_VERSION
  726. } // namespace std
  727. #endif // C++14
  728. #endif // _GLIBCXX_EXPERIMENTAL_IO_SERVICE