2 * Copyright (c) 2017-present, Facebook, Inc.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
10 #include <stdio.h> /* fprintf */
11 #include <stdlib.h> /* malloc, free */
12 #include <pthread.h> /* pthread functions */
13 #include <string.h> /* memset */
14 #include "zstd_internal.h"
17 #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
18 #define PRINT(...) fprintf(stdout, __VA_ARGS__)
19 #define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
20 #define FILE_CHUNK_SIZE 4 << 20
21 #define MAX_NUM_JOBS 2
22 #define stdinmark "/*stdin*\\"
23 #define stdoutmark "/*stdout*\\"
25 #define DEFAULT_DISPLAY_LEVEL 1
26 #define DEFAULT_COMPRESSION_LEVEL 6
27 #define MAX_COMPRESSION_LEVEL_CHANGE 2
28 #define CONVERGENCE_LOWER_BOUND 5
29 #define CLEVEL_DECREASE_COOLDOWN 5
30 #define CHANGE_BY_TWO_THRESHOLD 0.1
31 #define CHANGE_BY_ONE_THRESHOLD 0.65
34 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
36 static int g_displayLevel = DEBUG_MODE;
39 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
40 static UTIL_time_t g_startTime;
41 static size_t g_streamedSize = 0;
42 static unsigned g_useProgressBar = 1;
43 static unsigned g_forceCompressionLevel = 0;
44 static unsigned g_minCLevel = 1;
45 static unsigned g_maxCLevel;
62 unsigned lastJobPlusOne;
63 size_t compressedSize;
68 pthread_mutex_t pMutex;
78 unsigned compressionLevel;
84 * JobIDs for the next jobs to be created, compressed, and written
87 unsigned jobCompressedID;
89 unsigned allJobsCompleted;
92 * counter for how many jobs in a row the compression level has not changed
93 * if the counter becomes >= CONVERGENCE_LOWER_BOUND, the next time the
94 * compression level tries to change (by non-zero amount) resets the counter
95 * to 1 and does not apply the change
97 unsigned convergenceCounter;
100 * cooldown counter in order to prevent rapid successive decreases in compression level
101 * whenever compression level is decreased, cooldown is set to CLEVEL_DECREASE_COOLDOWN
102 * whenever adaptCompressionLevel() is called and cooldown != 0, it is decremented
103 * as long as cooldown != 0, the compression level cannot be decreased
109 * Range from 0.0 to 1.0
110 * if the value is not 1.0, then this implies that thread X waited on thread Y to finish
111 * and thread Y was XWaitYCompletion finished at the time of the wait (i.e. compressWaitWriteCompletion=0.5
112 * implies that the compression thread waited on the write thread and it was only 50% finished writing a job)
114 double createWaitCompressionCompletion;
115 double compressWaitCreateCompletion;
116 double compressWaitWriteCompletion;
117 double writeWaitCompressionCompletion;
121 * Range from 0.0 to 1.0
122 * Jobs are divided into mini-chunks in order to measure completion
123 * these values are updated each time a thread finishes its operation on the
124 * mini-chunk (i.e. finishes writing out, compressing, etc. this mini-chunk).
126 double compressionCompletion;
127 double writeCompletion;
128 double createCompletion;
130 mutex_t jobCompressed_mutex;
131 cond_t jobCompressed_cond;
132 mutex_t jobReady_mutex;
133 cond_t jobReady_cond;
134 mutex_t allJobsCompleted_mutex;
135 cond_t allJobsCompleted_cond;
136 mutex_t jobWrite_mutex;
137 cond_t jobWrite_cond;
138 mutex_t compressionCompletion_mutex;
139 mutex_t createCompletion_mutex;
140 mutex_t writeCompletion_mutex;
141 mutex_t compressionLevel_mutex;
144 jobDescription* jobs;
156 outputThreadArg* otArg;
159 static void freeCompressionJobs(adaptCCtx* ctx)
162 for (u=0; u<ctx->numJobs; u++) {
163 jobDescription job = ctx->jobs[u];
169 static int destroyMutex(mutex_t* mutex)
171 if (mutex->noError) {
172 int const ret = pthread_mutex_destroy(&mutex->pMutex);
178 static int destroyCond(cond_t* cond)
181 int const ret = pthread_cond_destroy(&cond->pCond);
187 static int freeCCtx(adaptCCtx* ctx)
192 error |= destroyMutex(&ctx->jobCompressed_mutex);
193 error |= destroyCond(&ctx->jobCompressed_cond);
194 error |= destroyMutex(&ctx->jobReady_mutex);
195 error |= destroyCond(&ctx->jobReady_cond);
196 error |= destroyMutex(&ctx->allJobsCompleted_mutex);
197 error |= destroyCond(&ctx->allJobsCompleted_cond);
198 error |= destroyMutex(&ctx->jobWrite_mutex);
199 error |= destroyCond(&ctx->jobWrite_cond);
200 error |= destroyMutex(&ctx->compressionCompletion_mutex);
201 error |= destroyMutex(&ctx->createCompletion_mutex);
202 error |= destroyMutex(&ctx->writeCompletion_mutex);
203 error |= destroyMutex(&ctx->compressionLevel_mutex);
204 error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
205 free(ctx->input.buffer.start);
207 freeCompressionJobs(ctx);
215 static int initMutex(mutex_t* mutex)
217 int const ret = pthread_mutex_init(&mutex->pMutex, NULL);
218 mutex->noError = !ret;
222 static int initCond(cond_t* cond)
224 int const ret = pthread_cond_init(&cond->pCond, NULL);
225 cond->noError = !ret;
229 static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
231 ctx->compressionLevel = g_compressionLevel;
233 int pthreadError = 0;
234 pthreadError |= initMutex(&ctx->jobCompressed_mutex);
235 pthreadError |= initCond(&ctx->jobCompressed_cond);
236 pthreadError |= initMutex(&ctx->jobReady_mutex);
237 pthreadError |= initCond(&ctx->jobReady_cond);
238 pthreadError |= initMutex(&ctx->allJobsCompleted_mutex);
239 pthreadError |= initCond(&ctx->allJobsCompleted_cond);
240 pthreadError |= initMutex(&ctx->jobWrite_mutex);
241 pthreadError |= initCond(&ctx->jobWrite_cond);
242 pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
243 pthreadError |= initMutex(&ctx->createCompletion_mutex);
244 pthreadError |= initMutex(&ctx->writeCompletion_mutex);
245 pthreadError |= initMutex(&ctx->compressionLevel_mutex);
246 if (pthreadError) return pthreadError;
248 ctx->numJobs = numJobs;
250 ctx->jobCompressedID = 0;
252 ctx->lastDictSize = 0;
255 ctx->createWaitCompressionCompletion = 1;
256 ctx->compressWaitCreateCompletion = 1;
257 ctx->compressWaitWriteCompletion = 1;
258 ctx->writeWaitCompressionCompletion = 1;
259 ctx->createCompletion = 1;
260 ctx->writeCompletion = 1;
261 ctx->compressionCompletion = 1;
262 ctx->convergenceCounter = 0;
265 ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
268 DISPLAY("Error: could not allocate space for jobs during context creation\n");
272 /* initializing jobs */
275 for (jobNum=0; jobNum<numJobs; jobNum++) {
276 jobDescription* job = &ctx->jobs[jobNum];
277 job->src.start = malloc(2 * FILE_CHUNK_SIZE);
278 job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
279 job->lastJobPlusOne = 0;
280 if (!job->src.start || !job->dst.start) {
281 DISPLAY("Could not allocate buffers for jobs\n");
284 job->src.capacity = FILE_CHUNK_SIZE;
285 job->dst.capacity = ZSTD_compressBound(FILE_CHUNK_SIZE);
290 ctx->threadError = 0;
291 ctx->allJobsCompleted = 0;
293 ctx->cctx = ZSTD_createCCtx();
295 DISPLAY("Error: could not allocate ZSTD_CCtx\n");
299 ctx->input.filled = 0;
300 ctx->input.buffer.capacity = 2 * FILE_CHUNK_SIZE;
302 ctx->input.buffer.start = malloc(ctx->input.buffer.capacity);
303 if (!ctx->input.buffer.start) {
304 DISPLAY("Error: could not allocate input buffer\n");
310 static adaptCCtx* createCCtx(unsigned numJobs)
313 adaptCCtx* const ctx = calloc(1, sizeof(adaptCCtx));
315 DISPLAY("Error: could not allocate space for context\n");
319 int const error = initCCtx(ctx, numJobs);
328 static void signalErrorToThreads(adaptCCtx* ctx)
330 ctx->threadError = 1;
331 pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
332 pthread_cond_signal(&ctx->jobReady_cond.pCond);
333 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
335 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
336 pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
337 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
339 pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
340 pthread_cond_signal(&ctx->jobWrite_cond.pCond);
341 pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
343 pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
344 pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
345 pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
348 static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
351 pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
352 while (ctx->allJobsCompleted == 0 && !ctx->threadError) {
353 pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex);
355 pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
358 /* map completion percentages to values for changing compression level */
359 static unsigned convertCompletionToChange(double completion)
361 if (completion < CHANGE_BY_TWO_THRESHOLD) {
364 else if (completion < CHANGE_BY_ONE_THRESHOLD) {
373 * Compression level is changed depending on which part of the compression process is lagging
374 * Currently, three theads exist for job creation, compression, and file writing respectively.
375 * adaptCompressionLevel() increments or decrements compression level based on which of the threads is lagging
376 * job creation or file writing lag => increased compression level
377 * compression thread lag => decreased compression level
378 * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait
380 static void adaptCompressionLevel(adaptCCtx* ctx)
382 double createWaitCompressionCompletion;
383 double compressWaitCreateCompletion;
384 double compressWaitWriteCompletion;
385 double writeWaitCompressionCompletion;
386 double const threshold = 0.00001;
387 unsigned prevCompressionLevel;
389 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
390 prevCompressionLevel = ctx->compressionLevel;
391 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
394 if (g_forceCompressionLevel) {
395 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
396 ctx->compressionLevel = g_compressionLevel;
397 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
402 DEBUG(2, "adapting compression level %u\n", prevCompressionLevel);
404 /* read and reset completion measurements */
405 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
406 DEBUG(2, "createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion);
407 DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
408 createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
409 writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
410 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
412 pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
413 DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion);
414 compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
415 pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
417 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
418 DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
419 compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
420 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
421 DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
423 assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel);
425 /* adaptation logic */
426 if (ctx->cooldown) ctx->cooldown--;
428 if ((1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) && ctx->cooldown == 0) {
429 /* create or write waiting on compression */
430 /* use whichever one waited less because it was slower */
431 double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
432 unsigned const change = convertCompletionToChange(completion);
433 unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel);
434 if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
435 /* reset convergence counter, might have been a spike */
436 ctx->convergenceCounter = 0;
437 DEBUG(2, "convergence counter reset, no change applied\n");
439 else if (boundChange != 0) {
440 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
441 ctx->compressionLevel -= boundChange;
442 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
443 ctx->cooldown = CLEVEL_DECREASE_COOLDOWN;
444 ctx->convergenceCounter = 1;
446 DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange);
449 else if (1-compressWaitWriteCompletion > threshold || 1-compressWaitCreateCompletion > threshold) {
450 /* compress waiting on write */
451 double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion);
452 unsigned const change = convertCompletionToChange(completion);
453 unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel);
454 if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
455 /* reset convergence counter, might have been a spike */
456 ctx->convergenceCounter = 0;
457 DEBUG(2, "convergence counter reset, no change applied\n");
459 else if (boundChange != 0) {
460 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
461 ctx->compressionLevel += boundChange;
462 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
464 ctx->convergenceCounter = 1;
466 DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange);
471 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
472 if (ctx->compressionLevel == prevCompressionLevel) {
473 ctx->convergenceCounter++;
475 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
478 static size_t getUseableDictSize(unsigned compressionLevel)
480 ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
481 unsigned const overlapLog = compressionLevel >= (unsigned)ZSTD_maxCLevel() ? 0 : 3;
482 size_t const overlapSize = 1 << (params.cParams.windowLog - overlapLog);
486 static void* compressionThread(void* arg)
488 adaptCCtx* const ctx = (adaptCCtx*)arg;
489 unsigned currJob = 0;
491 unsigned const currJobIndex = currJob % ctx->numJobs;
492 jobDescription* const job = &ctx->jobs[currJobIndex];
493 DEBUG(2, "starting compression for job %u\n", currJob);
496 /* check if compression thread will have to wait */
497 unsigned willWaitForCreate = 0;
498 unsigned willWaitForWrite = 0;
500 pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
501 if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1;
502 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
504 pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
505 if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
506 pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
509 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
510 if (willWaitForCreate) {
511 DEBUG(2, "compression will wait for create on job %u\n", currJob);
512 ctx->compressWaitCreateCompletion = ctx->createCompletion;
513 DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
517 ctx->compressWaitCreateCompletion = 1;
519 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
521 pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
522 if (willWaitForWrite) {
523 DEBUG(2, "compression will wait for write on job %u\n", currJob);
524 ctx->compressWaitWriteCompletion = ctx->writeCompletion;
525 DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
528 ctx->compressWaitWriteCompletion = 1;
530 pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
534 /* wait until job is ready */
535 pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
536 while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
537 pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
539 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
541 /* wait until job previously in this space is written */
542 pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
543 while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
544 pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
546 pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
547 /* reset compression completion */
548 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
549 ctx->compressionCompletion = 0;
550 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
552 /* adapt compression level */
553 if (currJob) adaptCompressionLevel(ctx);
555 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
556 DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel);
557 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
559 /* compress the data */
561 size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */
563 unsigned blockNum = 0;
564 size_t remaining = job->src.size;
568 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
569 cLevel = ctx->compressionLevel;
570 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
572 /* reset compressed size */
573 job->compressedSize = 0;
574 DEBUG(2, "calling ZSTD_compressBegin()\n");
575 /* begin compression */
577 size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
578 ZSTD_parameters params = ZSTD_getParams(cLevel, 0, useDictSize);
579 params.cParams.windowLog = 23;
581 size_t const initError = ZSTD_compressBegin_advanced(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, params, 0);
582 size_t const windowSizeError = ZSTD_CCtx_setParameter(ctx->cctx, ZSTD_c_forceMaxWindow, 1);
583 if (ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
584 DISPLAY("Error: something went wrong while starting compression\n");
585 signalErrorToThreads(ctx);
590 DEBUG(2, "finished with ZSTD_compressBegin()\n");
593 size_t const actualBlockSize = MIN(remaining, compressionBlockSize);
595 /* continue compression */
596 if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */
597 size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, 0);
598 if (ZSTD_isError(hSize)) {
599 DISPLAY("Error: something went wrong while continuing compression\n");
600 job->compressedSize = hSize;
601 signalErrorToThreads(ctx);
604 ZSTD_invalidateRepCodes(ctx->cctx);
607 size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ?
608 ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
609 ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
610 if (ZSTD_isError(ret)) {
611 DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret));
612 signalErrorToThreads(ctx);
615 job->compressedSize += ret;
616 remaining -= actualBlockSize;
617 srcPos += actualBlockSize;
621 /* update completion */
622 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
623 ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
624 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
626 } while (remaining != 0);
627 job->dst.size = job->compressedSize;
629 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
630 ctx->jobCompressedID++;
631 pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
632 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
633 if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
634 /* finished compressing all jobs */
637 DEBUG(2, "finished compressing job %u\n", currJob);
643 static void displayProgress(unsigned cLevel, unsigned last)
645 UTIL_time_t currTime = UTIL_getTime();
646 if (!g_useProgressBar) return;
647 { double const timeElapsed = (double)(UTIL_getSpanTimeMicro(g_startTime, currTime) / 1000.0);
648 double const sizeMB = (double)g_streamedSize / (1 << 20);
649 double const avgCompRate = sizeMB * 1000 / timeElapsed;
650 fprintf(stderr, "\r| Comp. Level: %2u | Time Elapsed: %7.2f s | Data Size: %7.1f MB | Avg Comp. Rate: %6.2f MB/s |", cLevel, timeElapsed/1000.0, sizeMB, avgCompRate);
652 fprintf(stderr, "\n");
658 static void* outputThread(void* arg)
660 outputThreadArg* const otArg = (outputThreadArg*)arg;
661 adaptCCtx* const ctx = otArg->ctx;
662 FILE* const dstFile = otArg->dstFile;
664 unsigned currJob = 0;
666 unsigned const currJobIndex = currJob % ctx->numJobs;
667 jobDescription* const job = &ctx->jobs[currJobIndex];
668 unsigned willWaitForCompress = 0;
669 DEBUG(2, "starting write for job %u\n", currJob);
671 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
672 if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1;
673 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
676 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
677 if (willWaitForCompress) {
678 /* write thread is waiting on compression thread */
679 ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
680 DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
683 ctx->writeWaitCompressionCompletion = 1;
685 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
687 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
688 while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
689 pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
691 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
693 /* reset write completion */
694 pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
695 ctx->writeCompletion = 0;
696 pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
699 size_t const compressedSize = job->compressedSize;
700 size_t remaining = compressedSize;
701 if (ZSTD_isError(compressedSize)) {
702 DISPLAY("Error: an error occurred during compression\n");
703 signalErrorToThreads(ctx);
707 size_t const blockSize = MAX(compressedSize >> 7, 1 << 10);
710 size_t const writeSize = MIN(remaining, blockSize);
711 size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile);
712 if (ret != writeSize) break;
716 /* update completion variable for writing */
717 pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
718 ctx->writeCompletion = 1 - (double)remaining/compressedSize;
719 pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
721 if (remaining == 0) break;
723 if (pos != compressedSize) {
724 DISPLAY("Error: an error occurred during file write operation\n");
725 signalErrorToThreads(ctx);
732 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
733 cLevel = ctx->compressionLevel;
734 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
735 displayProgress(cLevel, job->lastJobPlusOne == currJob + 1);
737 pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
739 pthread_cond_signal(&ctx->jobWrite_cond.pCond);
740 pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
742 if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
743 /* finished with all jobs */
744 pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
745 ctx->allJobsCompleted = 1;
746 pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
747 pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
750 DEBUG(2, "finished writing job %u\n", currJob);
757 static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
759 unsigned const nextJob = ctx->nextJobID;
760 unsigned const nextJobIndex = nextJob % ctx->numJobs;
761 jobDescription* const job = &ctx->jobs[nextJobIndex];
764 job->src.size = srcSize;
765 job->jobID = nextJob;
766 if (last) job->lastJobPlusOne = nextJob + 1;
769 void* const copy = job->src.start;
770 job->src.start = ctx->input.buffer.start;
771 ctx->input.buffer.start = copy;
773 job->dictSize = ctx->lastDictSize;
776 /* if not on the last job, reuse data as dictionary in next job */
778 size_t const oldDictSize = ctx->lastDictSize;
779 memcpy(ctx->input.buffer.start, job->src.start + oldDictSize, srcSize);
780 ctx->lastDictSize = srcSize;
781 ctx->input.filled = srcSize;
784 /* signal job ready */
785 pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
787 pthread_cond_signal(&ctx->jobReady_cond.pCond);
788 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
793 static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg)
795 /* early error check to exit */
796 if (!ctx || !srcFile || !otArg) {
800 /* create output thread */
803 if (pthread_create(&out, NULL, &outputThread, otArg)) {
804 DISPLAY("Error: could not create output thread\n");
805 signalErrorToThreads(ctx);
808 else if (pthread_detach(out)) {
809 DISPLAY("Error: could not detach output thread\n");
810 signalErrorToThreads(ctx);
815 /* create compression thread */
817 pthread_t compression;
818 if (pthread_create(&compression, NULL, &compressionThread, ctx)) {
819 DISPLAY("Error: could not create compression thread\n");
820 signalErrorToThreads(ctx);
823 else if (pthread_detach(compression)) {
824 DISPLAY("Error: could not detach compression thread\n");
825 signalErrorToThreads(ctx);
830 unsigned currJob = 0;
834 size_t const readBlockSize = 1 << 15;
835 size_t remaining = FILE_CHUNK_SIZE;
836 unsigned const nextJob = ctx->nextJobID;
837 unsigned willWaitForCompress = 0;
838 DEBUG(2, "starting creation of job %u\n", currJob);
840 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
841 if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1;
842 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
844 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
845 if (willWaitForCompress) {
846 /* creation thread is waiting, take measurement of completion */
847 ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
848 DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
851 ctx->createWaitCompressionCompletion = 1;
853 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
855 /* wait until the job has been compressed */
856 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
857 while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
858 pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
860 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
862 /* reset create completion */
863 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
864 ctx->createCompletion = 0;
865 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
867 while (remaining != 0 && !feof(srcFile)) {
868 size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
869 if (ret != readBlockSize && !feof(srcFile)) {
870 /* error could not read correct number of bytes */
871 DISPLAY("Error: problem occurred during read from src file\n");
872 signalErrorToThreads(ctx);
877 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
878 ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
879 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
881 if (remaining != 0 && !feof(srcFile)) {
882 DISPLAY("Error: problem occurred during read from src file\n");
883 signalErrorToThreads(ctx);
886 g_streamedSize += pos;
887 /* reading was fine, now create the compression job */
889 int const last = feof(srcFile);
890 int const error = createCompressionJob(ctx, pos, last);
892 signalErrorToThreads(ctx);
896 DEBUG(2, "finished creating job %u\n", currJob);
903 /* success -- created all jobs */
907 static fcResources createFileCompressionResources(const char* const srcFilename, const char* const dstFilenameOrNull)
910 unsigned const stdinUsed = !strcmp(srcFilename, stdinmark);
911 FILE* const srcFile = stdinUsed ? stdin : fopen(srcFilename, "rb");
912 const char* const outFilenameIntermediate = (stdinUsed && !dstFilenameOrNull) ? stdoutmark : dstFilenameOrNull;
913 const char* outFilename = outFilenameIntermediate;
914 char fileAndSuffix[MAX_PATH];
915 size_t const numJobs = MAX_NUM_JOBS;
917 memset(&fcr, 0, sizeof(fcr));
919 if (!outFilenameIntermediate) {
920 if (snprintf(fileAndSuffix, MAX_PATH, "%s.zst", srcFilename) + 1 > MAX_PATH) {
921 DISPLAY("Error: output filename is too long\n");
924 outFilename = fileAndSuffix;
928 unsigned const stdoutUsed = !strcmp(outFilename, stdoutmark);
929 FILE* const dstFile = stdoutUsed ? stdout : fopen(outFilename, "wb");
930 fcr.otArg = malloc(sizeof(outputThreadArg));
932 DISPLAY("Error: could not allocate space for output thread argument\n");
935 fcr.otArg->dstFile = dstFile;
937 /* checking for errors */
938 if (!fcr.otArg->dstFile || !srcFile) {
939 DISPLAY("Error: some file(s) could not be opened\n");
943 /* creating context */
944 fcr.ctx = createCCtx(numJobs);
945 fcr.otArg->ctx = fcr.ctx;
946 fcr.srcFile = srcFile;
950 static int freeFileCompressionResources(fcResources* fcr)
953 waitUntilAllJobsCompleted(fcr->ctx);
954 ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
955 ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
957 ret |= (fcr->otArg->dstFile != stdout) ? fclose(fcr->otArg->dstFile) : 0;
959 /* no need to freeCCtx() on otArg->ctx because it should be the same context */
964 static int compressFilename(const char* const srcFilename, const char* const dstFilenameOrNull)
967 fcResources fcr = createFileCompressionResources(srcFilename, dstFilenameOrNull);
969 ret |= performCompression(fcr.ctx, fcr.srcFile, fcr.otArg);
970 ret |= freeFileCompressionResources(&fcr);
974 static int compressFilenames(const char** filenameTable, unsigned numFiles, unsigned forceStdout)
978 for (fileNum=0; fileNum<numFiles; fileNum++) {
979 const char* filename = filenameTable[fileNum];
981 ret |= compressFilename(filename, NULL);
984 ret |= compressFilename(filename, stdoutmark);
991 /*! readU32FromChar() :
992 @return : unsigned integer value read from input in `char` format
993 allows and interprets K, KB, KiB, M, MB and MiB suffix.
994 Will also modify `*stringPtr`, advancing it to position where it stopped reading.
995 Note : function result can overflow if digit string > MAX_UINT */
996 static unsigned readU32FromChar(const char** stringPtr)
999 while ((**stringPtr >='0') && (**stringPtr <='9'))
1000 result *= 10, result += **stringPtr - '0', (*stringPtr)++ ;
1001 if ((**stringPtr=='K') || (**stringPtr=='M')) {
1003 if (**stringPtr=='M') result <<= 10;
1005 if (**stringPtr=='i') (*stringPtr)++;
1006 if (**stringPtr=='B') (*stringPtr)++;
1011 static void help(const char* progPath)
1014 PRINT(" %s [options] [file(s)]\n", progPath);
1016 PRINT("Options:\n");
1017 PRINT(" -oFILE : specify the output file name\n");
1018 PRINT(" -i# : provide initial compression level -- default %d, must be in the range [L, U] where L and U are bound values (see below for defaults)\n", DEFAULT_COMPRESSION_LEVEL);
1019 PRINT(" -h : display help/information\n");
1020 PRINT(" -f : force the compression level to stay constant\n");
1021 PRINT(" -c : force write to stdout\n");
1022 PRINT(" -p : hide progress bar\n");
1023 PRINT(" -q : quiet mode -- do not show progress bar or other information\n");
1024 PRINT(" -l# : provide lower bound for compression level -- default 1\n");
1025 PRINT(" -u# : provide upper bound for compression level -- default %u\n", ZSTD_maxCLevel());
1027 /* return 0 if successful, else return error */
1028 int main(int argCount, const char* argv[])
1030 const char* outFilename = NULL;
1031 const char** filenameTable = (const char**)malloc(argCount*sizeof(const char*));
1032 unsigned filenameIdx = 0;
1033 unsigned forceStdout = 0;
1034 unsigned providedInitialCLevel = 0;
1037 filenameTable[0] = stdinmark;
1038 g_maxCLevel = ZSTD_maxCLevel();
1040 if (filenameTable == NULL) {
1041 DISPLAY("Error: could not allocate sapce for filename table.\n");
1045 for (argNum=1; argNum<argCount; argNum++) {
1046 const char* argument = argv[argNum];
1048 /* output filename designated with "-o" */
1049 if (argument[0]=='-' && strlen(argument) > 1) {
1050 switch (argument[1]) {
1053 outFilename = argument;
1057 g_compressionLevel = readU32FromChar(&argument);
1058 providedInitialCLevel = 1;
1064 g_useProgressBar = 0;
1068 outFilename = stdoutmark;
1071 g_forceCompressionLevel = 1;
1074 g_useProgressBar = 0;
1079 g_minCLevel = readU32FromChar(&argument);
1083 g_maxCLevel = readU32FromChar(&argument);
1086 DISPLAY("Error: invalid argument provided\n");
1093 /* regular files to be compressed */
1094 filenameTable[filenameIdx++] = argument;
1097 /* check initial, max, and min compression levels */
1099 unsigned const minMaxInconsistent = g_minCLevel > g_maxCLevel;
1100 unsigned const initialNotInRange = g_minCLevel > g_compressionLevel || g_maxCLevel < g_compressionLevel;
1101 if (minMaxInconsistent || (initialNotInRange && providedInitialCLevel)) {
1102 DISPLAY("Error: provided compression level parameters are invalid\n");
1106 else if (initialNotInRange) {
1107 g_compressionLevel = g_minCLevel;
1111 /* error checking with number of files */
1112 if (filenameIdx > 1 && (outFilename != NULL && strcmp(outFilename, stdoutmark))) {
1113 DISPLAY("Error: multiple input files provided, cannot use specified output file\n");
1118 /* compress files */
1119 if (filenameIdx <= 1) {
1120 ret |= compressFilename(filenameTable[0], outFilename);
1123 ret |= compressFilenames(filenameTable, filenameIdx, forceStdout);
1126 free(filenameTable);