]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/adaptive-compression/adapt.c
import zstd 1.3.8
[FreeBSD/FreeBSD.git] / contrib / adaptive-compression / adapt.c
1 /*
2  * Copyright (c) 2017-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  */
9
10 #include <stdio.h>      /* fprintf */
11 #include <stdlib.h>     /* malloc, free */
12 #include <pthread.h>    /* pthread functions */
13 #include <string.h>     /* memset */
14 #include "zstd_internal.h"
15 #include "util.h"
16
17 #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
18 #define PRINT(...) fprintf(stdout, __VA_ARGS__)
19 #define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
20 #define FILE_CHUNK_SIZE 4 << 20
21 #define MAX_NUM_JOBS 2
22 #define stdinmark  "/*stdin*\\"
23 #define stdoutmark "/*stdout*\\"
24 #define MAX_PATH 256
25 #define DEFAULT_DISPLAY_LEVEL 1
26 #define DEFAULT_COMPRESSION_LEVEL 6
27 #define MAX_COMPRESSION_LEVEL_CHANGE 2
28 #define CONVERGENCE_LOWER_BOUND 5
29 #define CLEVEL_DECREASE_COOLDOWN 5
30 #define CHANGE_BY_TWO_THRESHOLD 0.1
31 #define CHANGE_BY_ONE_THRESHOLD 0.65
32
33 #ifndef DEBUG_MODE
34 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
35 #else
36 static int g_displayLevel = DEBUG_MODE;
37 #endif
38
39 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
40 static UTIL_time_t g_startTime;
41 static size_t g_streamedSize = 0;
42 static unsigned g_useProgressBar = 1;
43 static unsigned g_forceCompressionLevel = 0;
44 static unsigned g_minCLevel = 1;
45 static unsigned g_maxCLevel;
46
47 typedef struct {
48     void* start;
49     size_t size;
50     size_t capacity;
51 } buffer_t;
52
53 typedef struct {
54     size_t filled;
55     buffer_t buffer;
56 } inBuff_t;
57
58 typedef struct {
59     buffer_t src;
60     buffer_t dst;
61     unsigned jobID;
62     unsigned lastJobPlusOne;
63     size_t compressedSize;
64     size_t dictSize;
65 } jobDescription;
66
67 typedef struct {
68     pthread_mutex_t pMutex;
69     int noError;
70 } mutex_t;
71
72 typedef struct {
73     pthread_cond_t pCond;
74     int noError;
75 } cond_t;
76
77 typedef struct {
78     unsigned compressionLevel;
79     unsigned numJobs;
80     unsigned nextJobID;
81     unsigned threadError;
82
83     /*
84      * JobIDs for the next jobs to be created, compressed, and written
85      */
86     unsigned jobReadyID;
87     unsigned jobCompressedID;
88     unsigned jobWriteID;
89     unsigned allJobsCompleted;
90
91     /*
92      * counter for how many jobs in a row the compression level has not changed
93      * if the counter becomes >= CONVERGENCE_LOWER_BOUND, the next time the
94      * compression level tries to change (by non-zero amount) resets the counter
95      * to 1 and does not apply the change
96      */
97     unsigned convergenceCounter;
98
99     /*
100      * cooldown counter in order to prevent rapid successive decreases in compression level
101      * whenever compression level is decreased, cooldown is set to CLEVEL_DECREASE_COOLDOWN
102      * whenever adaptCompressionLevel() is called and cooldown != 0, it is decremented
103      * as long as cooldown != 0, the compression level cannot be decreased
104      */
105     unsigned cooldown;
106
107     /*
108      * XWaitYCompletion
109      * Range from 0.0 to 1.0
110      * if the value is not 1.0, then this implies that thread X waited on thread Y to finish
111      * and thread Y was XWaitYCompletion finished at the time of the wait (i.e. compressWaitWriteCompletion=0.5
112      * implies that the compression thread waited on the write thread and it was only 50% finished writing a job)
113      */
114     double createWaitCompressionCompletion;
115     double compressWaitCreateCompletion;
116     double compressWaitWriteCompletion;
117     double writeWaitCompressionCompletion;
118
119     /*
120      * Completion values
121      * Range from 0.0 to 1.0
122      * Jobs are divided into mini-chunks in order to measure completion
123      * these values are updated each time a thread finishes its operation on the
124      * mini-chunk (i.e. finishes writing out, compressing, etc. this mini-chunk).
125      */
126     double compressionCompletion;
127     double writeCompletion;
128     double createCompletion;
129
130     mutex_t jobCompressed_mutex;
131     cond_t jobCompressed_cond;
132     mutex_t jobReady_mutex;
133     cond_t jobReady_cond;
134     mutex_t allJobsCompleted_mutex;
135     cond_t allJobsCompleted_cond;
136     mutex_t jobWrite_mutex;
137     cond_t jobWrite_cond;
138     mutex_t compressionCompletion_mutex;
139     mutex_t createCompletion_mutex;
140     mutex_t writeCompletion_mutex;
141     mutex_t compressionLevel_mutex;
142     size_t lastDictSize;
143     inBuff_t input;
144     jobDescription* jobs;
145     ZSTD_CCtx* cctx;
146 } adaptCCtx;
147
148 typedef struct {
149     adaptCCtx* ctx;
150     FILE* dstFile;
151 } outputThreadArg;
152
153 typedef struct {
154     FILE* srcFile;
155     adaptCCtx* ctx;
156     outputThreadArg* otArg;
157 } fcResources;
158
159 static void freeCompressionJobs(adaptCCtx* ctx)
160 {
161     unsigned u;
162     for (u=0; u<ctx->numJobs; u++) {
163         jobDescription job = ctx->jobs[u];
164         free(job.dst.start);
165         free(job.src.start);
166     }
167 }
168
169 static int destroyMutex(mutex_t* mutex)
170 {
171     if (mutex->noError) {
172         int const ret = pthread_mutex_destroy(&mutex->pMutex);
173         return ret;
174     }
175     return 0;
176 }
177
178 static int destroyCond(cond_t* cond)
179 {
180     if (cond->noError) {
181         int const ret = pthread_cond_destroy(&cond->pCond);
182         return ret;
183     }
184     return 0;
185 }
186
187 static int freeCCtx(adaptCCtx* ctx)
188 {
189     if (!ctx) return 0;
190     {
191         int error = 0;
192         error |= destroyMutex(&ctx->jobCompressed_mutex);
193         error |= destroyCond(&ctx->jobCompressed_cond);
194         error |= destroyMutex(&ctx->jobReady_mutex);
195         error |= destroyCond(&ctx->jobReady_cond);
196         error |= destroyMutex(&ctx->allJobsCompleted_mutex);
197         error |= destroyCond(&ctx->allJobsCompleted_cond);
198         error |= destroyMutex(&ctx->jobWrite_mutex);
199         error |= destroyCond(&ctx->jobWrite_cond);
200         error |= destroyMutex(&ctx->compressionCompletion_mutex);
201         error |= destroyMutex(&ctx->createCompletion_mutex);
202         error |= destroyMutex(&ctx->writeCompletion_mutex);
203         error |= destroyMutex(&ctx->compressionLevel_mutex);
204         error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
205         free(ctx->input.buffer.start);
206         if (ctx->jobs){
207             freeCompressionJobs(ctx);
208             free(ctx->jobs);
209         }
210         free(ctx);
211         return error;
212     }
213 }
214
215 static int initMutex(mutex_t* mutex)
216 {
217     int const ret = pthread_mutex_init(&mutex->pMutex, NULL);
218     mutex->noError = !ret;
219     return ret;
220 }
221
222 static int initCond(cond_t* cond)
223 {
224     int const ret = pthread_cond_init(&cond->pCond, NULL);
225     cond->noError = !ret;
226     return ret;
227 }
228
229 static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
230 {
231     ctx->compressionLevel = g_compressionLevel;
232     {
233         int pthreadError = 0;
234         pthreadError |= initMutex(&ctx->jobCompressed_mutex);
235         pthreadError |= initCond(&ctx->jobCompressed_cond);
236         pthreadError |= initMutex(&ctx->jobReady_mutex);
237         pthreadError |= initCond(&ctx->jobReady_cond);
238         pthreadError |= initMutex(&ctx->allJobsCompleted_mutex);
239         pthreadError |= initCond(&ctx->allJobsCompleted_cond);
240         pthreadError |= initMutex(&ctx->jobWrite_mutex);
241         pthreadError |= initCond(&ctx->jobWrite_cond);
242         pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
243         pthreadError |= initMutex(&ctx->createCompletion_mutex);
244         pthreadError |= initMutex(&ctx->writeCompletion_mutex);
245         pthreadError |= initMutex(&ctx->compressionLevel_mutex);
246         if (pthreadError) return pthreadError;
247     }
248     ctx->numJobs = numJobs;
249     ctx->jobReadyID = 0;
250     ctx->jobCompressedID = 0;
251     ctx->jobWriteID = 0;
252     ctx->lastDictSize = 0;
253
254
255     ctx->createWaitCompressionCompletion = 1;
256     ctx->compressWaitCreateCompletion = 1;
257     ctx->compressWaitWriteCompletion = 1;
258     ctx->writeWaitCompressionCompletion = 1;
259     ctx->createCompletion = 1;
260     ctx->writeCompletion = 1;
261     ctx->compressionCompletion = 1;
262     ctx->convergenceCounter = 0;
263     ctx->cooldown = 0;
264
265     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
266
267     if (!ctx->jobs) {
268         DISPLAY("Error: could not allocate space for jobs during context creation\n");
269         return 1;
270     }
271
272     /* initializing jobs */
273     {
274         unsigned jobNum;
275         for (jobNum=0; jobNum<numJobs; jobNum++) {
276             jobDescription* job = &ctx->jobs[jobNum];
277             job->src.start = malloc(2 * FILE_CHUNK_SIZE);
278             job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
279             job->lastJobPlusOne = 0;
280             if (!job->src.start || !job->dst.start) {
281                 DISPLAY("Could not allocate buffers for jobs\n");
282                 return 1;
283             }
284             job->src.capacity = FILE_CHUNK_SIZE;
285             job->dst.capacity = ZSTD_compressBound(FILE_CHUNK_SIZE);
286         }
287     }
288
289     ctx->nextJobID = 0;
290     ctx->threadError = 0;
291     ctx->allJobsCompleted = 0;
292
293     ctx->cctx = ZSTD_createCCtx();
294     if (!ctx->cctx) {
295         DISPLAY("Error: could not allocate ZSTD_CCtx\n");
296         return 1;
297     }
298
299     ctx->input.filled = 0;
300     ctx->input.buffer.capacity = 2 * FILE_CHUNK_SIZE;
301
302     ctx->input.buffer.start = malloc(ctx->input.buffer.capacity);
303     if (!ctx->input.buffer.start) {
304         DISPLAY("Error: could not allocate input buffer\n");
305         return 1;
306     }
307     return 0;
308 }
309
310 static adaptCCtx* createCCtx(unsigned numJobs)
311 {
312
313     adaptCCtx* const ctx = calloc(1, sizeof(adaptCCtx));
314     if (ctx == NULL) {
315         DISPLAY("Error: could not allocate space for context\n");
316         return NULL;
317     }
318     {
319         int const error = initCCtx(ctx, numJobs);
320         if (error) {
321             freeCCtx(ctx);
322             return NULL;
323         }
324         return ctx;
325     }
326 }
327
328 static void signalErrorToThreads(adaptCCtx* ctx)
329 {
330     ctx->threadError = 1;
331     pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
332     pthread_cond_signal(&ctx->jobReady_cond.pCond);
333     pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
334
335     pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
336     pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
337     pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
338
339     pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
340     pthread_cond_signal(&ctx->jobWrite_cond.pCond);
341     pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
342
343     pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
344     pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
345     pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
346 }
347
348 static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
349 {
350     if (!ctx) return;
351     pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
352     while (ctx->allJobsCompleted == 0 && !ctx->threadError) {
353         pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex);
354     }
355     pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
356 }
357
358 /* map completion percentages to values for changing compression level */
359 static unsigned convertCompletionToChange(double completion)
360 {
361     if (completion < CHANGE_BY_TWO_THRESHOLD) {
362         return 2;
363     }
364     else if (completion < CHANGE_BY_ONE_THRESHOLD) {
365         return 1;
366     }
367     else {
368         return 0;
369     }
370 }
371
372 /*
373  * Compression level is changed depending on which part of the compression process is lagging
374  * Currently, three theads exist for job creation, compression, and file writing respectively.
375  * adaptCompressionLevel() increments or decrements compression level based on which of the threads is lagging
376  * job creation or file writing lag => increased compression level
377  * compression thread lag           => decreased compression level
378  * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait
379  */
380 static void adaptCompressionLevel(adaptCCtx* ctx)
381 {
382     double createWaitCompressionCompletion;
383     double compressWaitCreateCompletion;
384     double compressWaitWriteCompletion;
385     double writeWaitCompressionCompletion;
386     double const threshold = 0.00001;
387     unsigned prevCompressionLevel;
388
389     pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
390     prevCompressionLevel = ctx->compressionLevel;
391     pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
392
393
394     if (g_forceCompressionLevel) {
395         pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
396         ctx->compressionLevel = g_compressionLevel;
397         pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
398         return;
399     }
400
401
402     DEBUG(2, "adapting compression level %u\n", prevCompressionLevel);
403
404     /* read and reset completion measurements */
405     pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
406     DEBUG(2, "createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion);
407     DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
408     createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
409     writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
410     pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
411
412     pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
413     DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion);
414     compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
415     pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
416
417     pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
418     DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
419     compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
420     pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
421     DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
422
423     assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel);
424
425     /* adaptation logic */
426     if (ctx->cooldown) ctx->cooldown--;
427
428     if ((1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) && ctx->cooldown == 0) {
429         /* create or write waiting on compression */
430         /* use whichever one waited less because it was slower */
431         double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
432         unsigned const change = convertCompletionToChange(completion);
433         unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel);
434         if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
435             /* reset convergence counter, might have been a spike */
436             ctx->convergenceCounter = 0;
437             DEBUG(2, "convergence counter reset, no change applied\n");
438         }
439         else if (boundChange != 0) {
440             pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
441             ctx->compressionLevel -= boundChange;
442             pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
443             ctx->cooldown = CLEVEL_DECREASE_COOLDOWN;
444             ctx->convergenceCounter = 1;
445
446             DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange);
447         }
448     }
449     else if (1-compressWaitWriteCompletion > threshold || 1-compressWaitCreateCompletion > threshold) {
450         /* compress waiting on write */
451         double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion);
452         unsigned const change = convertCompletionToChange(completion);
453         unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel);
454         if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
455             /* reset convergence counter, might have been a spike */
456             ctx->convergenceCounter = 0;
457             DEBUG(2, "convergence counter reset, no change applied\n");
458         }
459         else if (boundChange != 0) {
460             pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
461             ctx->compressionLevel += boundChange;
462             pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
463             ctx->cooldown = 0;
464             ctx->convergenceCounter = 1;
465
466             DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange);
467         }
468
469     }
470
471     pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
472     if (ctx->compressionLevel == prevCompressionLevel) {
473         ctx->convergenceCounter++;
474     }
475     pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
476 }
477
478 static size_t getUseableDictSize(unsigned compressionLevel)
479 {
480     ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
481     unsigned const overlapLog = compressionLevel >= (unsigned)ZSTD_maxCLevel() ? 0 : 3;
482     size_t const overlapSize = 1 << (params.cParams.windowLog - overlapLog);
483     return overlapSize;
484 }
485
486 static void* compressionThread(void* arg)
487 {
488     adaptCCtx* const ctx = (adaptCCtx*)arg;
489     unsigned currJob = 0;
490     for ( ; ; ) {
491         unsigned const currJobIndex = currJob % ctx->numJobs;
492         jobDescription* const job = &ctx->jobs[currJobIndex];
493         DEBUG(2, "starting compression for job %u\n", currJob);
494
495         {
496             /* check if compression thread will have to wait */
497             unsigned willWaitForCreate = 0;
498             unsigned willWaitForWrite = 0;
499
500             pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
501             if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1;
502             pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
503
504             pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
505             if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
506             pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
507
508
509             pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
510             if (willWaitForCreate) {
511                 DEBUG(2, "compression will wait for create on job %u\n", currJob);
512                 ctx->compressWaitCreateCompletion = ctx->createCompletion;
513                 DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
514
515             }
516             else {
517                 ctx->compressWaitCreateCompletion = 1;
518             }
519             pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
520
521             pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
522             if (willWaitForWrite) {
523                 DEBUG(2, "compression will wait for write on job %u\n", currJob);
524                 ctx->compressWaitWriteCompletion = ctx->writeCompletion;
525                 DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
526             }
527             else {
528                 ctx->compressWaitWriteCompletion = 1;
529             }
530             pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
531
532         }
533
534         /* wait until job is ready */
535         pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
536         while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
537             pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
538         }
539         pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
540
541         /* wait until job previously in this space is written */
542         pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
543         while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
544             pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
545         }
546         pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
547         /* reset compression completion */
548         pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
549         ctx->compressionCompletion = 0;
550         pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
551
552         /* adapt compression level */
553         if (currJob) adaptCompressionLevel(ctx);
554
555         pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
556         DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel);
557         pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
558
559         /* compress the data */
560         {
561             size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */
562             unsigned cLevel;
563             unsigned blockNum = 0;
564             size_t remaining = job->src.size;
565             size_t srcPos = 0;
566             size_t dstPos = 0;
567
568             pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
569             cLevel = ctx->compressionLevel;
570             pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
571
572             /* reset compressed size */
573             job->compressedSize = 0;
574             DEBUG(2, "calling ZSTD_compressBegin()\n");
575             /* begin compression */
576             {
577                 size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
578                 ZSTD_parameters params = ZSTD_getParams(cLevel, 0, useDictSize);
579                 params.cParams.windowLog = 23;
580                 {
581                     size_t const initError = ZSTD_compressBegin_advanced(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, params, 0);
582                     size_t const windowSizeError = ZSTD_CCtx_setParameter(ctx->cctx, ZSTD_c_forceMaxWindow, 1);
583                     if (ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
584                         DISPLAY("Error: something went wrong while starting compression\n");
585                         signalErrorToThreads(ctx);
586                         return arg;
587                     }
588                 }
589             }
590             DEBUG(2, "finished with ZSTD_compressBegin()\n");
591
592             do {
593                 size_t const actualBlockSize = MIN(remaining, compressionBlockSize);
594
595                 /* continue compression */
596                 if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */
597                     size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, 0);
598                     if (ZSTD_isError(hSize)) {
599                         DISPLAY("Error: something went wrong while continuing compression\n");
600                         job->compressedSize = hSize;
601                         signalErrorToThreads(ctx);
602                         return arg;
603                     }
604                     ZSTD_invalidateRepCodes(ctx->cctx);
605                 }
606                 {
607                     size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ?
608                                             ZSTD_compressEnd     (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
609                                             ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
610                     if (ZSTD_isError(ret)) {
611                         DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret));
612                         signalErrorToThreads(ctx);
613                         return arg;
614                     }
615                     job->compressedSize += ret;
616                     remaining -= actualBlockSize;
617                     srcPos += actualBlockSize;
618                     dstPos += ret;
619                     blockNum++;
620
621                     /* update completion */
622                     pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
623                     ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
624                     pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
625                 }
626             } while (remaining != 0);
627             job->dst.size = job->compressedSize;
628         }
629         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
630         ctx->jobCompressedID++;
631         pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
632         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
633         if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
634             /* finished compressing all jobs */
635             break;
636         }
637         DEBUG(2, "finished compressing job %u\n", currJob);
638         currJob++;
639     }
640     return arg;
641 }
642
643 static void displayProgress(unsigned cLevel, unsigned last)
644 {
645     UTIL_time_t currTime = UTIL_getTime();
646     if (!g_useProgressBar) return;
647     {   double const timeElapsed = (double)(UTIL_getSpanTimeMicro(g_startTime, currTime) / 1000.0);
648         double const sizeMB = (double)g_streamedSize / (1 << 20);
649         double const avgCompRate = sizeMB * 1000 / timeElapsed;
650         fprintf(stderr, "\r| Comp. Level: %2u | Time Elapsed: %7.2f s | Data Size: %7.1f MB | Avg Comp. Rate: %6.2f MB/s |", cLevel, timeElapsed/1000.0, sizeMB, avgCompRate);
651         if (last) {
652             fprintf(stderr, "\n");
653         } else {
654             fflush(stderr);
655     }   }
656 }
657
658 static void* outputThread(void* arg)
659 {
660     outputThreadArg* const otArg = (outputThreadArg*)arg;
661     adaptCCtx* const ctx = otArg->ctx;
662     FILE* const dstFile = otArg->dstFile;
663
664     unsigned currJob = 0;
665     for ( ; ; ) {
666         unsigned const currJobIndex = currJob % ctx->numJobs;
667         jobDescription* const job = &ctx->jobs[currJobIndex];
668         unsigned willWaitForCompress = 0;
669         DEBUG(2, "starting write for job %u\n", currJob);
670
671         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
672         if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1;
673         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
674
675
676         pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
677         if (willWaitForCompress) {
678             /* write thread is waiting on compression thread */
679             ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
680             DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
681         }
682         else {
683             ctx->writeWaitCompressionCompletion = 1;
684         }
685         pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
686
687         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
688         while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
689             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
690         }
691         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
692
693         /* reset write completion */
694         pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
695         ctx->writeCompletion = 0;
696         pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
697
698         {
699             size_t const compressedSize = job->compressedSize;
700             size_t remaining = compressedSize;
701             if (ZSTD_isError(compressedSize)) {
702                 DISPLAY("Error: an error occurred during compression\n");
703                 signalErrorToThreads(ctx);
704                 return arg;
705             }
706             {
707                 size_t const blockSize = MAX(compressedSize >> 7, 1 << 10);
708                 size_t pos = 0;
709                 for ( ; ; ) {
710                     size_t const writeSize = MIN(remaining, blockSize);
711                     size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile);
712                     if (ret != writeSize) break;
713                     pos += ret;
714                     remaining -= ret;
715
716                     /* update completion variable for writing */
717                     pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
718                     ctx->writeCompletion = 1 - (double)remaining/compressedSize;
719                     pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
720
721                     if (remaining == 0) break;
722                 }
723                 if (pos != compressedSize) {
724                     DISPLAY("Error: an error occurred during file write operation\n");
725                     signalErrorToThreads(ctx);
726                     return arg;
727                 }
728             }
729         }
730         {
731             unsigned cLevel;
732             pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
733             cLevel = ctx->compressionLevel;
734             pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
735             displayProgress(cLevel, job->lastJobPlusOne == currJob + 1);
736         }
737         pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
738         ctx->jobWriteID++;
739         pthread_cond_signal(&ctx->jobWrite_cond.pCond);
740         pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
741
742         if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
743             /* finished with all jobs */
744             pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
745             ctx->allJobsCompleted = 1;
746             pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
747             pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
748             break;
749         }
750         DEBUG(2, "finished writing job %u\n", currJob);
751         currJob++;
752
753     }
754     return arg;
755 }
756
757 static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
758 {
759     unsigned const nextJob = ctx->nextJobID;
760     unsigned const nextJobIndex = nextJob % ctx->numJobs;
761     jobDescription* const job = &ctx->jobs[nextJobIndex];
762
763
764     job->src.size = srcSize;
765     job->jobID = nextJob;
766     if (last) job->lastJobPlusOne = nextJob + 1;
767     {
768         /* swap buffer */
769         void* const copy = job->src.start;
770         job->src.start = ctx->input.buffer.start;
771         ctx->input.buffer.start = copy;
772     }
773     job->dictSize = ctx->lastDictSize;
774
775     ctx->nextJobID++;
776     /* if not on the last job, reuse data as dictionary in next job */
777     if (!last) {
778         size_t const oldDictSize = ctx->lastDictSize;
779         memcpy(ctx->input.buffer.start, job->src.start + oldDictSize, srcSize);
780         ctx->lastDictSize = srcSize;
781         ctx->input.filled = srcSize;
782     }
783
784     /* signal job ready */
785     pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
786     ctx->jobReadyID++;
787     pthread_cond_signal(&ctx->jobReady_cond.pCond);
788     pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
789
790     return 0;
791 }
792
793 static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg)
794 {
795     /* early error check to exit */
796     if (!ctx || !srcFile || !otArg) {
797         return 1;
798     }
799
800     /* create output thread */
801     {
802         pthread_t out;
803         if (pthread_create(&out, NULL, &outputThread, otArg)) {
804             DISPLAY("Error: could not create output thread\n");
805             signalErrorToThreads(ctx);
806             return 1;
807         }
808         else if (pthread_detach(out)) {
809                 DISPLAY("Error: could not detach output thread\n");
810                 signalErrorToThreads(ctx);
811                 return 1;
812         }
813     }
814
815     /* create compression thread */
816     {
817         pthread_t compression;
818         if (pthread_create(&compression, NULL, &compressionThread, ctx)) {
819             DISPLAY("Error: could not create compression thread\n");
820             signalErrorToThreads(ctx);
821             return 1;
822         }
823         else if (pthread_detach(compression)) {
824                 DISPLAY("Error: could not detach compression thread\n");
825                 signalErrorToThreads(ctx);
826                 return 1;
827         }
828     }
829     {
830         unsigned currJob = 0;
831         /* creating jobs */
832         for ( ; ; ) {
833             size_t pos = 0;
834             size_t const readBlockSize = 1 << 15;
835             size_t remaining = FILE_CHUNK_SIZE;
836             unsigned const nextJob = ctx->nextJobID;
837             unsigned willWaitForCompress = 0;
838             DEBUG(2, "starting creation of job %u\n", currJob);
839
840             pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
841             if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1;
842             pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
843
844             pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
845             if (willWaitForCompress) {
846                 /* creation thread is waiting, take measurement of completion */
847                 ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
848                 DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
849             }
850             else {
851                 ctx->createWaitCompressionCompletion = 1;
852             }
853             pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
854
855             /* wait until the job has been compressed */
856             pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
857             while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
858                 pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
859             }
860             pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
861
862             /* reset create completion */
863             pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
864             ctx->createCompletion = 0;
865             pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
866
867             while (remaining != 0 && !feof(srcFile)) {
868                 size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
869                 if (ret != readBlockSize && !feof(srcFile)) {
870                     /* error could not read correct number of bytes */
871                     DISPLAY("Error: problem occurred during read from src file\n");
872                     signalErrorToThreads(ctx);
873                     return 1;
874                 }
875                 pos += ret;
876                 remaining -= ret;
877                 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
878                 ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
879                 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
880             }
881             if (remaining != 0 && !feof(srcFile)) {
882                 DISPLAY("Error: problem occurred during read from src file\n");
883                 signalErrorToThreads(ctx);
884                 return 1;
885             }
886             g_streamedSize += pos;
887             /* reading was fine, now create the compression job */
888             {
889                 int const last = feof(srcFile);
890                 int const error = createCompressionJob(ctx, pos, last);
891                 if (error != 0) {
892                     signalErrorToThreads(ctx);
893                     return error;
894                 }
895             }
896             DEBUG(2, "finished creating job %u\n", currJob);
897             currJob++;
898             if (feof(srcFile)) {
899                 break;
900             }
901         }
902     }
903     /* success -- created all jobs */
904     return 0;
905 }
906
907 static fcResources createFileCompressionResources(const char* const srcFilename, const char* const dstFilenameOrNull)
908 {
909     fcResources fcr;
910     unsigned const stdinUsed = !strcmp(srcFilename, stdinmark);
911     FILE* const srcFile = stdinUsed ? stdin : fopen(srcFilename, "rb");
912     const char* const outFilenameIntermediate = (stdinUsed && !dstFilenameOrNull) ? stdoutmark : dstFilenameOrNull;
913     const char* outFilename = outFilenameIntermediate;
914     char fileAndSuffix[MAX_PATH];
915     size_t const numJobs = MAX_NUM_JOBS;
916
917     memset(&fcr, 0, sizeof(fcr));
918
919     if (!outFilenameIntermediate) {
920         if (snprintf(fileAndSuffix, MAX_PATH, "%s.zst", srcFilename) + 1 > MAX_PATH) {
921             DISPLAY("Error: output filename is too long\n");
922             return fcr;
923         }
924         outFilename = fileAndSuffix;
925     }
926
927     {
928         unsigned const stdoutUsed = !strcmp(outFilename, stdoutmark);
929         FILE* const dstFile = stdoutUsed ? stdout : fopen(outFilename, "wb");
930         fcr.otArg = malloc(sizeof(outputThreadArg));
931         if (!fcr.otArg) {
932             DISPLAY("Error: could not allocate space for output thread argument\n");
933             return fcr;
934         }
935         fcr.otArg->dstFile = dstFile;
936     }
937     /* checking for errors */
938     if (!fcr.otArg->dstFile || !srcFile) {
939         DISPLAY("Error: some file(s) could not be opened\n");
940         return fcr;
941     }
942
943     /* creating context */
944     fcr.ctx = createCCtx(numJobs);
945     fcr.otArg->ctx = fcr.ctx;
946     fcr.srcFile = srcFile;
947     return fcr;
948 }
949
950 static int freeFileCompressionResources(fcResources* fcr)
951 {
952     int ret = 0;
953     waitUntilAllJobsCompleted(fcr->ctx);
954     ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
955     ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
956     if (fcr->otArg) {
957         ret |= (fcr->otArg->dstFile != stdout) ? fclose(fcr->otArg->dstFile) : 0;
958         free(fcr->otArg);
959         /* no need to freeCCtx() on otArg->ctx because it should be the same context */
960     }
961     return ret;
962 }
963
964 static int compressFilename(const char* const srcFilename, const char* const dstFilenameOrNull)
965 {
966     int ret = 0;
967     fcResources fcr = createFileCompressionResources(srcFilename, dstFilenameOrNull);
968     g_streamedSize = 0;
969     ret |= performCompression(fcr.ctx, fcr.srcFile, fcr.otArg);
970     ret |= freeFileCompressionResources(&fcr);
971     return ret;
972 }
973
974 static int compressFilenames(const char** filenameTable, unsigned numFiles, unsigned forceStdout)
975 {
976     int ret = 0;
977     unsigned fileNum;
978     for (fileNum=0; fileNum<numFiles; fileNum++) {
979         const char* filename = filenameTable[fileNum];
980         if (!forceStdout) {
981             ret |= compressFilename(filename, NULL);
982         }
983         else {
984             ret |= compressFilename(filename, stdoutmark);
985         }
986
987     }
988     return ret;
989 }
990
991 /*! readU32FromChar() :
992     @return : unsigned integer value read from input in `char` format
993     allows and interprets K, KB, KiB, M, MB and MiB suffix.
994     Will also modify `*stringPtr`, advancing it to position where it stopped reading.
995     Note : function result can overflow if digit string > MAX_UINT */
996 static unsigned readU32FromChar(const char** stringPtr)
997 {
998     unsigned result = 0;
999     while ((**stringPtr >='0') && (**stringPtr <='9'))
1000         result *= 10, result += **stringPtr - '0', (*stringPtr)++ ;
1001     if ((**stringPtr=='K') || (**stringPtr=='M')) {
1002         result <<= 10;
1003         if (**stringPtr=='M') result <<= 10;
1004         (*stringPtr)++ ;
1005         if (**stringPtr=='i') (*stringPtr)++;
1006         if (**stringPtr=='B') (*stringPtr)++;
1007     }
1008     return result;
1009 }
1010
1011 static void help(const char* progPath)
1012 {
1013     PRINT("Usage:\n");
1014     PRINT("  %s [options] [file(s)]\n", progPath);
1015     PRINT("\n");
1016     PRINT("Options:\n");
1017     PRINT("  -oFILE : specify the output file name\n");
1018     PRINT("  -i#    : provide initial compression level -- default %d, must be in the range [L, U] where L and U are bound values (see below for defaults)\n", DEFAULT_COMPRESSION_LEVEL);
1019     PRINT("  -h     : display help/information\n");
1020     PRINT("  -f     : force the compression level to stay constant\n");
1021     PRINT("  -c     : force write to stdout\n");
1022     PRINT("  -p     : hide progress bar\n");
1023     PRINT("  -q     : quiet mode -- do not show progress bar or other information\n");
1024     PRINT("  -l#    : provide lower bound for compression level -- default 1\n");
1025     PRINT("  -u#    : provide upper bound for compression level -- default %u\n", ZSTD_maxCLevel());
1026 }
1027 /* return 0 if successful, else return error */
1028 int main(int argCount, const char* argv[])
1029 {
1030     const char* outFilename = NULL;
1031     const char** filenameTable = (const char**)malloc(argCount*sizeof(const char*));
1032     unsigned filenameIdx = 0;
1033     unsigned forceStdout = 0;
1034     unsigned providedInitialCLevel = 0;
1035     int ret = 0;
1036     int argNum;
1037     filenameTable[0] = stdinmark;
1038     g_maxCLevel = ZSTD_maxCLevel();
1039
1040     if (filenameTable == NULL) {
1041         DISPLAY("Error: could not allocate sapce for filename table.\n");
1042         return 1;
1043     }
1044
1045     for (argNum=1; argNum<argCount; argNum++) {
1046         const char* argument = argv[argNum];
1047
1048         /* output filename designated with "-o" */
1049         if (argument[0]=='-' && strlen(argument) > 1) {
1050             switch (argument[1]) {
1051                 case 'o':
1052                     argument += 2;
1053                     outFilename = argument;
1054                     break;
1055                 case 'i':
1056                     argument += 2;
1057                     g_compressionLevel = readU32FromChar(&argument);
1058                     providedInitialCLevel = 1;
1059                     break;
1060                 case 'h':
1061                     help(argv[0]);
1062                     goto _main_exit;
1063                 case 'p':
1064                     g_useProgressBar = 0;
1065                     break;
1066                 case 'c':
1067                     forceStdout = 1;
1068                     outFilename = stdoutmark;
1069                     break;
1070                 case 'f':
1071                     g_forceCompressionLevel = 1;
1072                     break;
1073                 case 'q':
1074                     g_useProgressBar = 0;
1075                     g_displayLevel = 0;
1076                     break;
1077                 case 'l':
1078                     argument += 2;
1079                     g_minCLevel = readU32FromChar(&argument);
1080                     break;
1081                 case 'u':
1082                     argument += 2;
1083                     g_maxCLevel = readU32FromChar(&argument);
1084                     break;
1085                 default:
1086                     DISPLAY("Error: invalid argument provided\n");
1087                     ret = 1;
1088                     goto _main_exit;
1089             }
1090             continue;
1091         }
1092
1093         /* regular files to be compressed */
1094         filenameTable[filenameIdx++] = argument;
1095     }
1096
1097     /* check initial, max, and min compression levels */
1098     {
1099         unsigned const minMaxInconsistent = g_minCLevel > g_maxCLevel;
1100         unsigned const initialNotInRange = g_minCLevel > g_compressionLevel || g_maxCLevel < g_compressionLevel;
1101         if (minMaxInconsistent || (initialNotInRange && providedInitialCLevel)) {
1102             DISPLAY("Error: provided compression level parameters are invalid\n");
1103             ret = 1;
1104             goto _main_exit;
1105         }
1106         else if (initialNotInRange) {
1107             g_compressionLevel = g_minCLevel;
1108         }
1109     }
1110
1111     /* error checking with number of files */
1112     if (filenameIdx > 1 && (outFilename != NULL && strcmp(outFilename, stdoutmark))) {
1113         DISPLAY("Error: multiple input files provided, cannot use specified output file\n");
1114         ret = 1;
1115         goto _main_exit;
1116     }
1117
1118     /* compress files */
1119     if (filenameIdx <= 1) {
1120         ret |= compressFilename(filenameTable[0], outFilename);
1121     }
1122     else {
1123         ret |= compressFilenames(filenameTable, filenameIdx, forceStdout);
1124     }
1125 _main_exit:
1126     free(filenameTable);
1127     return ret;
1128 }