]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp
Import zstandard 1.1.4 in base
[FreeBSD/FreeBSD.git] / contrib / zstd / contrib / pzstd / utils / test / WorkQueueTest.cpp
1 /**
2  * Copyright (c) 2016-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  */
9 #include "utils/Buffer.h"
10 #include "utils/WorkQueue.h"
11
12 #include <gtest/gtest.h>
13 #include <memory>
14 #include <mutex>
15 #include <thread>
16 #include <vector>
17
18 using namespace pzstd;
19
20 namespace {
21 struct Popper {
22   WorkQueue<int>* queue;
23   int* results;
24   std::mutex* mutex;
25
26   void operator()() {
27     int result;
28     while (queue->pop(result)) {
29       std::lock_guard<std::mutex> lock(*mutex);
30       results[result] = result;
31     }
32   }
33 };
34 }
35
36 TEST(WorkQueue, SingleThreaded) {
37   WorkQueue<int> queue;
38   int result;
39
40   queue.push(5);
41   EXPECT_TRUE(queue.pop(result));
42   EXPECT_EQ(5, result);
43
44   queue.push(1);
45   queue.push(2);
46   EXPECT_TRUE(queue.pop(result));
47   EXPECT_EQ(1, result);
48   EXPECT_TRUE(queue.pop(result));
49   EXPECT_EQ(2, result);
50
51   queue.push(1);
52   queue.push(2);
53   queue.finish();
54   EXPECT_TRUE(queue.pop(result));
55   EXPECT_EQ(1, result);
56   EXPECT_TRUE(queue.pop(result));
57   EXPECT_EQ(2, result);
58   EXPECT_FALSE(queue.pop(result));
59
60   queue.waitUntilFinished();
61 }
62
63 TEST(WorkQueue, SPSC) {
64   WorkQueue<int> queue;
65   const int max = 100;
66
67   for (int i = 0; i < 10; ++i) {
68     queue.push(int{i});
69   }
70
71   std::thread thread([ &queue, max ] {
72     int result;
73     for (int i = 0;; ++i) {
74       if (!queue.pop(result)) {
75         EXPECT_EQ(i, max);
76         break;
77       }
78       EXPECT_EQ(i, result);
79     }
80   });
81
82   std::this_thread::yield();
83   for (int i = 10; i < max; ++i) {
84     queue.push(int{i});
85   }
86   queue.finish();
87
88   thread.join();
89 }
90
91 TEST(WorkQueue, SPMC) {
92   WorkQueue<int> queue;
93   std::vector<int> results(50, -1);
94   std::mutex mutex;
95   std::vector<std::thread> threads;
96   for (int i = 0; i < 5; ++i) {
97     threads.emplace_back(Popper{&queue, results.data(), &mutex});
98   }
99
100   for (int i = 0; i < 50; ++i) {
101     queue.push(int{i});
102   }
103   queue.finish();
104
105   for (auto& thread : threads) {
106     thread.join();
107   }
108
109   for (int i = 0; i < 50; ++i) {
110     EXPECT_EQ(i, results[i]);
111   }
112 }
113
114 TEST(WorkQueue, MPMC) {
115   WorkQueue<int> queue;
116   std::vector<int> results(100, -1);
117   std::mutex mutex;
118   std::vector<std::thread> popperThreads;
119   for (int i = 0; i < 4; ++i) {
120     popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
121   }
122
123   std::vector<std::thread> pusherThreads;
124   for (int i = 0; i < 2; ++i) {
125     auto min = i * 50;
126     auto max = (i + 1) * 50;
127     pusherThreads.emplace_back(
128         [ &queue, min, max ] {
129           for (int i = min; i < max; ++i) {
130             queue.push(int{i});
131           }
132         });
133   }
134
135   for (auto& thread : pusherThreads) {
136     thread.join();
137   }
138   queue.finish();
139
140   for (auto& thread : popperThreads) {
141     thread.join();
142   }
143
144   for (int i = 0; i < 100; ++i) {
145     EXPECT_EQ(i, results[i]);
146   }
147 }
148
149 TEST(WorkQueue, BoundedSizeWorks) {
150   WorkQueue<int> queue(1);
151   int result;
152   queue.push(5);
153   queue.pop(result);
154   queue.push(5);
155   queue.pop(result);
156   queue.push(5);
157   queue.finish();
158   queue.pop(result);
159   EXPECT_EQ(5, result);
160 }
161
162 TEST(WorkQueue, BoundedSizePushAfterFinish) {
163   WorkQueue<int> queue(1);
164   int result;
165   queue.push(5);
166   std::thread pusher([&queue] {
167     queue.push(6);
168   });
169   // Dirtily try and make sure that pusher has run.
170   std::this_thread::sleep_for(std::chrono::seconds(1));
171   queue.finish();
172   EXPECT_TRUE(queue.pop(result));
173   EXPECT_EQ(5, result);
174   EXPECT_FALSE(queue.pop(result));
175
176   pusher.join();
177 }
178
179 TEST(WorkQueue, SetMaxSize) {
180   WorkQueue<int> queue(2);
181   int result;
182   queue.push(5);
183   queue.push(6);
184   queue.setMaxSize(1);
185   std::thread pusher([&queue] {
186     queue.push(7);
187   });
188   // Dirtily try and make sure that pusher has run.
189   std::this_thread::sleep_for(std::chrono::seconds(1));
190   queue.finish();
191   EXPECT_TRUE(queue.pop(result));
192   EXPECT_EQ(5, result);
193   EXPECT_TRUE(queue.pop(result));
194   EXPECT_EQ(6, result);
195   EXPECT_FALSE(queue.pop(result));
196
197   pusher.join();
198 }
199
200 TEST(WorkQueue, BoundedSizeMPMC) {
201   WorkQueue<int> queue(10);
202   std::vector<int> results(200, -1);
203   std::mutex mutex;
204   std::vector<std::thread> popperThreads;
205   for (int i = 0; i < 4; ++i) {
206     popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
207   }
208
209   std::vector<std::thread> pusherThreads;
210   for (int i = 0; i < 2; ++i) {
211     auto min = i * 100;
212     auto max = (i + 1) * 100;
213     pusherThreads.emplace_back(
214         [ &queue, min, max ] {
215           for (int i = min; i < max; ++i) {
216             queue.push(int{i});
217           }
218         });
219   }
220
221   for (auto& thread : pusherThreads) {
222     thread.join();
223   }
224   queue.finish();
225
226   for (auto& thread : popperThreads) {
227     thread.join();
228   }
229
230   for (int i = 0; i < 200; ++i) {
231     EXPECT_EQ(i, results[i]);
232   }
233 }
234
235 TEST(WorkQueue, FailedPush) {
236   WorkQueue<std::unique_ptr<int>> queue;
237   std::unique_ptr<int> x(new int{5});
238   EXPECT_TRUE(queue.push(std::move(x)));
239   EXPECT_EQ(nullptr, x);
240   queue.finish();
241   x.reset(new int{6});
242   EXPECT_FALSE(queue.push(std::move(x)));
243   EXPECT_NE(nullptr, x);
244   EXPECT_EQ(6, *x);
245 }
246
247 TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
248   {
249     BufferWorkQueue queue;
250     queue.finish();
251     EXPECT_EQ(0, queue.size());
252   }
253   {
254     BufferWorkQueue queue;
255     queue.push(Buffer(10));
256     queue.finish();
257     EXPECT_EQ(10, queue.size());
258   }
259   {
260     BufferWorkQueue queue;
261     queue.push(Buffer(10));
262     queue.push(Buffer(5));
263     queue.finish();
264     EXPECT_EQ(15, queue.size());
265   }
266   {
267     BufferWorkQueue queue;
268     queue.push(Buffer(10));
269     queue.push(Buffer(5));
270     queue.finish();
271     Buffer buffer;
272     queue.pop(buffer);
273     EXPECT_EQ(5, queue.size());
274   }
275 }