You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
3.4KB

  1. // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
  2. // Distributed under the MIT License (http://opensource.org/licenses/MIT)
  3. #pragma once
  4. // multi producer-multi consumer blocking queue.
  5. // enqueue(..) - will block until room found to put the new message.
  6. // enqueue_nowait(..) - will return immediately with false if no room left in
  7. // the queue.
  8. // dequeue_for(..) - will block until the queue is not empty or timeout have
  9. // passed.
  10. #include "spdlog/details/circular_q.h"
  11. #include <condition_variable>
  12. #include <mutex>
  13. namespace spdlog {
  14. namespace details {
  15. template<typename T>
  16. class mpmc_blocking_queue
  17. {
  18. public:
  19. using item_type = T;
  20. explicit mpmc_blocking_queue(size_t max_items)
  21. : q_(max_items)
  22. {}
  23. #ifndef __MINGW32__
  24. // try to enqueue and block if no room left
  25. void enqueue(T &&item)
  26. {
  27. {
  28. std::unique_lock<std::mutex> lock(queue_mutex_);
  29. pop_cv_.wait(lock, [this] { return !this->q_.full(); });
  30. q_.push_back(std::move(item));
  31. }
  32. push_cv_.notify_one();
  33. }
  34. // enqueue immediately. overrun oldest message in the queue if no room left.
  35. void enqueue_nowait(T &&item)
  36. {
  37. {
  38. std::unique_lock<std::mutex> lock(queue_mutex_);
  39. q_.push_back(std::move(item));
  40. }
  41. push_cv_.notify_one();
  42. }
  43. // try to dequeue item. if no item found. wait upto timeout and try again
  44. // Return true, if succeeded dequeue item, false otherwise
  45. bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
  46. {
  47. {
  48. std::unique_lock<std::mutex> lock(queue_mutex_);
  49. if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
  50. {
  51. return false;
  52. }
  53. popped_item = std::move(q_.front());
  54. q_.pop_front();
  55. }
  56. pop_cv_.notify_one();
  57. return true;
  58. }
  59. #else
  60. // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
  61. // so release the mutex at the very end each function.
  62. // try to enqueue and block if no room left
  63. void enqueue(T &&item)
  64. {
  65. std::unique_lock<std::mutex> lock(queue_mutex_);
  66. pop_cv_.wait(lock, [this] { return !this->q_.full(); });
  67. q_.push_back(std::move(item));
  68. push_cv_.notify_one();
  69. }
  70. // enqueue immediately. overrun oldest message in the queue if no room left.
  71. void enqueue_nowait(T &&item)
  72. {
  73. std::unique_lock<std::mutex> lock(queue_mutex_);
  74. q_.push_back(std::move(item));
  75. push_cv_.notify_one();
  76. }
  77. // try to dequeue item. if no item found. wait upto timeout and try again
  78. // Return true, if succeeded dequeue item, false otherwise
  79. bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
  80. {
  81. std::unique_lock<std::mutex> lock(queue_mutex_);
  82. if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
  83. {
  84. return false;
  85. }
  86. popped_item = std::move(q_.front());
  87. q_.pop_front();
  88. pop_cv_.notify_one();
  89. return true;
  90. }
  91. #endif
  92. size_t overrun_counter()
  93. {
  94. std::unique_lock<std::mutex> lock(queue_mutex_);
  95. return q_.overrun_counter();
  96. }
  97. private:
  98. std::mutex queue_mutex_;
  99. std::condition_variable push_cv_;
  100. std::condition_variable pop_cv_;
  101. spdlog::details::circular_q<T> q_;
  102. };
  103. } // namespace details
  104. } // namespace spdlog