mirror of
				https://git.suyu.dev/suyu/suyu.git
				synced 2025-10-26 04:17:12 +08:00 
			
		
		
		
	bounded_threadsafe_queue: Refactor Pop
Introduces PopModes to bring waiting logic into Pop, similar to Push.
This commit is contained in:
		
							parent
							
								
									8c56481249
								
							
						
					
					
						commit
						197d756560
					
				| @ -22,52 +22,38 @@ class SPSCQueue { | |||||||
|     static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); |     static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); | ||||||
| 
 | 
 | ||||||
| public: | public: | ||||||
|     bool TryPush(T&& t) { |  | ||||||
|         return Push<PushMode::Try>(std::move(t)); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     template <typename... Args> |     template <typename... Args> | ||||||
|     bool TryEmplace(Args&&... args) { |     bool TryEmplace(Args&&... args) { | ||||||
|         return Emplace<PushMode::Try>(std::forward<Args>(args)...); |         return Emplace<PushMode::Try>(std::forward<Args>(args)...); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void PushWait(T&& t) { |  | ||||||
|         Push<PushMode::Wait>(std::move(t)); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     template <typename... Args> |     template <typename... Args> | ||||||
|     void EmplaceWait(Args&&... args) { |     void EmplaceWait(Args&&... args) { | ||||||
|         Emplace<PushMode::Wait>(std::forward<Args>(args)...); |         Emplace<PushMode::Wait>(std::forward<Args>(args)...); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     bool TryPop(T& t) { |     bool TryPop(T& t) { | ||||||
|         return Pop(t); |         return Pop<PopMode::Try>(t); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     void PopWait(T& t) { | ||||||
|  |         Pop<PopMode::Wait>(t); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void PopWait(T& t, std::stop_token stop_token) { |     void PopWait(T& t, std::stop_token stop_token) { | ||||||
|         ConsumerWait(stop_token); |         Pop<PopMode::WaitWithStopToken>(t, stop_token); | ||||||
|         Pop(t); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     T PopWait(std::stop_token stop_token) { |     T PopWait() { | ||||||
|         ConsumerWait(stop_token); |  | ||||||
|         T t; |         T t; | ||||||
|         Pop(t); |         Pop<PopMode::Wait>(t); | ||||||
|         return t; |         return t; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void Clear() { |     T PopWait(std::stop_token stop_token) { | ||||||
|         while (!Empty()) { |         T t; | ||||||
|             Pop(); |         Pop<PopMode::WaitWithStopToken>(t, stop_token); | ||||||
|         } |         return t; | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     bool Empty() const { |  | ||||||
|         return m_read_index.load() == m_write_index.load(); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     size_t Size() const { |  | ||||||
|         return m_write_index.load() - m_read_index.load(); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| private: | private: | ||||||
| @ -77,55 +63,27 @@ private: | |||||||
|         Count, |         Count, | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|     template <PushMode Mode> |     enum class PopMode { | ||||||
|     bool Push(T&& t) { |         Try, | ||||||
|         const size_t write_index = m_write_index.load(); |         Wait, | ||||||
| 
 |         WaitWithStopToken, | ||||||
|         if constexpr (Mode == PushMode::Try) { |         Count, | ||||||
|             // Check if we have free slots to write to.
 |     }; | ||||||
|             if ((write_index - m_read_index.load()) == Capacity) { |  | ||||||
|                 return false; |  | ||||||
|             } |  | ||||||
|         } else if constexpr (Mode == PushMode::Wait) { |  | ||||||
|             // Wait until we have free slots to write to.
 |  | ||||||
|             std::unique_lock lock{producer_cv_mutex}; |  | ||||||
|             producer_cv.wait(lock, [this, write_index] { |  | ||||||
|                 return (write_index - m_read_index.load()) < Capacity; |  | ||||||
|             }); |  | ||||||
|         } else { |  | ||||||
|             static_assert(Mode < PushMode::Count, "Invalid PushMode."); |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         // Determine the position to write to.
 |  | ||||||
|         const size_t pos = write_index % Capacity; |  | ||||||
| 
 |  | ||||||
|         // Push into the queue.
 |  | ||||||
|         m_data[pos] = std::move(t); |  | ||||||
| 
 |  | ||||||
|         // Increment the write index.
 |  | ||||||
|         ++m_write_index; |  | ||||||
| 
 |  | ||||||
|         // Notify the consumer that we have pushed into the queue.
 |  | ||||||
|         std::scoped_lock lock{consumer_cv_mutex}; |  | ||||||
|         consumer_cv.notify_one(); |  | ||||||
| 
 |  | ||||||
|         return true; |  | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     template <PushMode Mode, typename... Args> |     template <PushMode Mode, typename... Args> | ||||||
|     bool Emplace(Args&&... args) { |     bool Emplace(Args&&... args) { | ||||||
|         const size_t write_index = m_write_index.load(); |         const size_t write_index = m_write_index.load(std::memory_order::relaxed); | ||||||
| 
 | 
 | ||||||
|         if constexpr (Mode == PushMode::Try) { |         if constexpr (Mode == PushMode::Try) { | ||||||
|             // Check if we have free slots to write to.
 |             // Check if we have free slots to write to.
 | ||||||
|             if ((write_index - m_read_index.load()) == Capacity) { |             if ((write_index - m_read_index.load(std::memory_order::acquire)) == Capacity) { | ||||||
|                 return false; |                 return false; | ||||||
|             } |             } | ||||||
|         } else if constexpr (Mode == PushMode::Wait) { |         } else if constexpr (Mode == PushMode::Wait) { | ||||||
|             // Wait until we have free slots to write to.
 |             // Wait until we have free slots to write to.
 | ||||||
|             std::unique_lock lock{producer_cv_mutex}; |             std::unique_lock lock{producer_cv_mutex}; | ||||||
|             producer_cv.wait(lock, [this, write_index] { |             producer_cv.wait(lock, [this, write_index] { | ||||||
|                 return (write_index - m_read_index.load()) < Capacity; |                 return (write_index - m_read_index.load(std::memory_order::acquire)) < Capacity; | ||||||
|             }); |             }); | ||||||
|         } else { |         } else { | ||||||
|             static_assert(Mode < PushMode::Count, "Invalid PushMode."); |             static_assert(Mode < PushMode::Count, "Invalid PushMode."); | ||||||
| @ -147,34 +105,32 @@ private: | |||||||
|         return true; |         return true; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void Pop() { |     template <PopMode Mode> | ||||||
|         const size_t read_index = m_read_index.load(); |     bool Pop(T& t, [[maybe_unused]] std::stop_token stop_token = {}) { | ||||||
|  |         const size_t read_index = m_read_index.load(std::memory_order::relaxed); | ||||||
| 
 | 
 | ||||||
|         // Check if the queue is empty.
 |         if constexpr (Mode == PopMode::Try) { | ||||||
|         if (read_index == m_write_index.load()) { |             // Check if the queue is empty.
 | ||||||
|             return; |             if (read_index == m_write_index.load(std::memory_order::acquire)) { | ||||||
|         } |                 return false; | ||||||
| 
 |             } | ||||||
|         // Determine the position to read from.
 |         } else if constexpr (Mode == PopMode::Wait) { | ||||||
|         const size_t pos = read_index % Capacity; |             // Wait until the queue is not empty.
 | ||||||
| 
 |             std::unique_lock lock{consumer_cv_mutex}; | ||||||
|         // Pop the data off the queue, deleting it.
 |             consumer_cv.wait(lock, [this, read_index] { | ||||||
|         std::destroy_at(std::addressof(m_data[pos])); |                 return read_index != m_write_index.load(std::memory_order::acquire); | ||||||
| 
 |             }); | ||||||
|         // Increment the read index.
 |         } else if constexpr (Mode == PopMode::WaitWithStopToken) { | ||||||
|         ++m_read_index; |             // Wait until the queue is not empty.
 | ||||||
| 
 |             std::unique_lock lock{consumer_cv_mutex}; | ||||||
|         // Notify the producer that we have popped off the queue.
 |             Common::CondvarWait(consumer_cv, lock, stop_token, [this, read_index] { | ||||||
|         std::unique_lock lock{producer_cv_mutex}; |                 return read_index != m_write_index.load(std::memory_order::acquire); | ||||||
|         producer_cv.notify_one(); |             }); | ||||||
|     } |             if (stop_token.stop_requested()) { | ||||||
| 
 |                 return false; | ||||||
|     bool Pop(T& t) { |             } | ||||||
|         const size_t read_index = m_read_index.load(); |         } else { | ||||||
| 
 |             static_assert(Mode < PopMode::Count, "Invalid PopMode."); | ||||||
|         // Check if the queue is empty.
 |  | ||||||
|         if (read_index == m_write_index.load()) { |  | ||||||
|             return false; |  | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         // Determine the position to read from.
 |         // Determine the position to read from.
 | ||||||
| @ -193,11 +149,6 @@ private: | |||||||
|         return true; |         return true; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void ConsumerWait(std::stop_token stop_token) { |  | ||||||
|         std::unique_lock lock{consumer_cv_mutex}; |  | ||||||
|         Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); }); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     alignas(128) std::atomic_size_t m_read_index{0}; |     alignas(128) std::atomic_size_t m_read_index{0}; | ||||||
|     alignas(128) std::atomic_size_t m_write_index{0}; |     alignas(128) std::atomic_size_t m_write_index{0}; | ||||||
| 
 | 
 | ||||||
| @ -212,22 +163,12 @@ private: | |||||||
| template <typename T, size_t Capacity = detail::DefaultCapacity> | template <typename T, size_t Capacity = detail::DefaultCapacity> | ||||||
| class MPSCQueue { | class MPSCQueue { | ||||||
| public: | public: | ||||||
|     bool TryPush(T&& t) { |  | ||||||
|         std::scoped_lock lock{write_mutex}; |  | ||||||
|         return spsc_queue.TryPush(std::move(t)); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     template <typename... Args> |     template <typename... Args> | ||||||
|     bool TryEmplace(Args&&... args) { |     bool TryEmplace(Args&&... args) { | ||||||
|         std::scoped_lock lock{write_mutex}; |         std::scoped_lock lock{write_mutex}; | ||||||
|         return spsc_queue.TryEmplace(std::forward<Args>(args)...); |         return spsc_queue.TryEmplace(std::forward<Args>(args)...); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void PushWait(T&& t) { |  | ||||||
|         std::scoped_lock lock{write_mutex}; |  | ||||||
|         spsc_queue.PushWait(std::move(t)); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     template <typename... Args> |     template <typename... Args> | ||||||
|     void EmplaceWait(Args&&... args) { |     void EmplaceWait(Args&&... args) { | ||||||
|         std::scoped_lock lock{write_mutex}; |         std::scoped_lock lock{write_mutex}; | ||||||
| @ -238,26 +179,22 @@ public: | |||||||
|         return spsc_queue.TryPop(t); |         return spsc_queue.TryPop(t); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     void PopWait(T& t) { | ||||||
|  |         spsc_queue.PopWait(t); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     void PopWait(T& t, std::stop_token stop_token) { |     void PopWait(T& t, std::stop_token stop_token) { | ||||||
|         spsc_queue.PopWait(t, stop_token); |         spsc_queue.PopWait(t, stop_token); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     T PopWait() { | ||||||
|  |         return spsc_queue.PopWait(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     T PopWait(std::stop_token stop_token) { |     T PopWait(std::stop_token stop_token) { | ||||||
|         return spsc_queue.PopWait(stop_token); |         return spsc_queue.PopWait(stop_token); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void Clear() { |  | ||||||
|         spsc_queue.Clear(); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     bool Empty() { |  | ||||||
|         return spsc_queue.Empty(); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     size_t Size() { |  | ||||||
|         return spsc_queue.Size(); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
| private: | private: | ||||||
|     SPSCQueue<T, Capacity> spsc_queue; |     SPSCQueue<T, Capacity> spsc_queue; | ||||||
|     std::mutex write_mutex; |     std::mutex write_mutex; | ||||||
| @ -266,22 +203,12 @@ private: | |||||||
| template <typename T, size_t Capacity = detail::DefaultCapacity> | template <typename T, size_t Capacity = detail::DefaultCapacity> | ||||||
| class MPMCQueue { | class MPMCQueue { | ||||||
| public: | public: | ||||||
|     bool TryPush(T&& t) { |  | ||||||
|         std::scoped_lock lock{write_mutex}; |  | ||||||
|         return spsc_queue.TryPush(std::move(t)); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     template <typename... Args> |     template <typename... Args> | ||||||
|     bool TryEmplace(Args&&... args) { |     bool TryEmplace(Args&&... args) { | ||||||
|         std::scoped_lock lock{write_mutex}; |         std::scoped_lock lock{write_mutex}; | ||||||
|         return spsc_queue.TryEmplace(std::forward<Args>(args)...); |         return spsc_queue.TryEmplace(std::forward<Args>(args)...); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void PushWait(T&& t) { |  | ||||||
|         std::scoped_lock lock{write_mutex}; |  | ||||||
|         spsc_queue.PushWait(std::move(t)); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     template <typename... Args> |     template <typename... Args> | ||||||
|     void EmplaceWait(Args&&... args) { |     void EmplaceWait(Args&&... args) { | ||||||
|         std::scoped_lock lock{write_mutex}; |         std::scoped_lock lock{write_mutex}; | ||||||
| @ -293,31 +220,26 @@ public: | |||||||
|         return spsc_queue.TryPop(t); |         return spsc_queue.TryPop(t); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     void PopWait(T& t) { | ||||||
|  |         std::scoped_lock lock{read_mutex}; | ||||||
|  |         spsc_queue.PopWait(t); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     void PopWait(T& t, std::stop_token stop_token) { |     void PopWait(T& t, std::stop_token stop_token) { | ||||||
|         std::scoped_lock lock{read_mutex}; |         std::scoped_lock lock{read_mutex}; | ||||||
|         spsc_queue.PopWait(t, stop_token); |         spsc_queue.PopWait(t, stop_token); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     T PopWait() { | ||||||
|  |         std::scoped_lock lock{read_mutex}; | ||||||
|  |         return spsc_queue.PopWait(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     T PopWait(std::stop_token stop_token) { |     T PopWait(std::stop_token stop_token) { | ||||||
|         std::scoped_lock lock{read_mutex}; |         std::scoped_lock lock{read_mutex}; | ||||||
|         return spsc_queue.PopWait(stop_token); |         return spsc_queue.PopWait(stop_token); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void Clear() { |  | ||||||
|         std::scoped_lock lock{read_mutex}; |  | ||||||
|         spsc_queue.Clear(); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     bool Empty() { |  | ||||||
|         std::scoped_lock lock{read_mutex}; |  | ||||||
|         return spsc_queue.Empty(); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     size_t Size() { |  | ||||||
|         std::scoped_lock lock{read_mutex}; |  | ||||||
|         return spsc_queue.Size(); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
| private: | private: | ||||||
|     SPSCQueue<T, Capacity> spsc_queue; |     SPSCQueue<T, Capacity> spsc_queue; | ||||||
|     std::mutex write_mutex; |     std::mutex write_mutex; | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user