2 * Copyright (c) 2016-present, Facebook, Inc.
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree. An additional grant
7 * of patent rights can be found in the PATENTS file in the same directory.
9 #include "utils/Buffer.h"
10 #include "utils/WorkQueue.h"
12 #include <gtest/gtest.h>
18 using namespace pzstd;
22 WorkQueue<int>* queue;
28 while (queue->pop(result)) {
29 std::lock_guard<std::mutex> lock(*mutex);
30 results[result] = result;
36 TEST(WorkQueue, SingleThreaded) {
41 EXPECT_TRUE(queue.pop(result));
46 EXPECT_TRUE(queue.pop(result));
48 EXPECT_TRUE(queue.pop(result));
54 EXPECT_TRUE(queue.pop(result));
56 EXPECT_TRUE(queue.pop(result));
58 EXPECT_FALSE(queue.pop(result));
60 queue.waitUntilFinished();
63 TEST(WorkQueue, SPSC) {
67 for (int i = 0; i < 10; ++i) {
71 std::thread thread([ &queue, max ] {
73 for (int i = 0;; ++i) {
74 if (!queue.pop(result)) {
82 std::this_thread::yield();
83 for (int i = 10; i < max; ++i) {
91 TEST(WorkQueue, SPMC) {
93 std::vector<int> results(50, -1);
95 std::vector<std::thread> threads;
96 for (int i = 0; i < 5; ++i) {
97 threads.emplace_back(Popper{&queue, results.data(), &mutex});
100 for (int i = 0; i < 50; ++i) {
105 for (auto& thread : threads) {
109 for (int i = 0; i < 50; ++i) {
110 EXPECT_EQ(i, results[i]);
114 TEST(WorkQueue, MPMC) {
115 WorkQueue<int> queue;
116 std::vector<int> results(100, -1);
118 std::vector<std::thread> popperThreads;
119 for (int i = 0; i < 4; ++i) {
120 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
123 std::vector<std::thread> pusherThreads;
124 for (int i = 0; i < 2; ++i) {
126 auto max = (i + 1) * 50;
127 pusherThreads.emplace_back(
128 [ &queue, min, max ] {
129 for (int i = min; i < max; ++i) {
135 for (auto& thread : pusherThreads) {
140 for (auto& thread : popperThreads) {
144 for (int i = 0; i < 100; ++i) {
145 EXPECT_EQ(i, results[i]);
149 TEST(WorkQueue, BoundedSizeWorks) {
150 WorkQueue<int> queue(1);
159 EXPECT_EQ(5, result);
162 TEST(WorkQueue, BoundedSizePushAfterFinish) {
163 WorkQueue<int> queue(1);
166 std::thread pusher([&queue] {
169 // Dirtily try and make sure that pusher has run.
170 std::this_thread::sleep_for(std::chrono::seconds(1));
172 EXPECT_TRUE(queue.pop(result));
173 EXPECT_EQ(5, result);
174 EXPECT_FALSE(queue.pop(result));
179 TEST(WorkQueue, SetMaxSize) {
180 WorkQueue<int> queue(2);
185 std::thread pusher([&queue] {
188 // Dirtily try and make sure that pusher has run.
189 std::this_thread::sleep_for(std::chrono::seconds(1));
191 EXPECT_TRUE(queue.pop(result));
192 EXPECT_EQ(5, result);
193 EXPECT_TRUE(queue.pop(result));
194 EXPECT_EQ(6, result);
195 EXPECT_FALSE(queue.pop(result));
200 TEST(WorkQueue, BoundedSizeMPMC) {
201 WorkQueue<int> queue(10);
202 std::vector<int> results(200, -1);
204 std::vector<std::thread> popperThreads;
205 for (int i = 0; i < 4; ++i) {
206 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
209 std::vector<std::thread> pusherThreads;
210 for (int i = 0; i < 2; ++i) {
212 auto max = (i + 1) * 100;
213 pusherThreads.emplace_back(
214 [ &queue, min, max ] {
215 for (int i = min; i < max; ++i) {
221 for (auto& thread : pusherThreads) {
226 for (auto& thread : popperThreads) {
230 for (int i = 0; i < 200; ++i) {
231 EXPECT_EQ(i, results[i]);
235 TEST(WorkQueue, FailedPush) {
236 WorkQueue<std::unique_ptr<int>> queue;
237 std::unique_ptr<int> x(new int{5});
238 EXPECT_TRUE(queue.push(std::move(x)));
239 EXPECT_EQ(nullptr, x);
242 EXPECT_FALSE(queue.push(std::move(x)));
243 EXPECT_NE(nullptr, x);
247 TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
249 BufferWorkQueue queue;
251 EXPECT_EQ(0, queue.size());
254 BufferWorkQueue queue;
255 queue.push(Buffer(10));
257 EXPECT_EQ(10, queue.size());
260 BufferWorkQueue queue;
261 queue.push(Buffer(10));
262 queue.push(Buffer(5));
264 EXPECT_EQ(15, queue.size());
267 BufferWorkQueue queue;
268 queue.push(Buffer(10));
269 queue.push(Buffer(5));
273 EXPECT_EQ(5, queue.size());