]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/seekable_format/examples/parallel_compression.c
import zstd 1.4.2
[FreeBSD/FreeBSD.git] / contrib / seekable_format / examples / parallel_compression.c
1 /*
2  * Copyright (c) 2017-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  */
9
10 #include <stdlib.h>    // malloc, free, exit, atoi
11 #include <stdio.h>     // fprintf, perror, feof, fopen, etc.
12 #include <string.h>    // strlen, memset, strcat
13 #define ZSTD_STATIC_LINKING_ONLY
14 #include <zstd.h>      // presumes zstd library is installed
15 #include <zstd_errors.h>
16 #if defined(WIN32) || defined(_WIN32)
17 #  include <windows.h>
18 #  define SLEEP(x) Sleep(x)
19 #else
20 #  include <unistd.h>
21 #  define SLEEP(x) usleep(x * 1000)
22 #endif
23
24 #define XXH_NAMESPACE ZSTD_
25 #include "xxhash.h"
26
27 #include "pool.h"      // use zstd thread pool for demo
28
29 #include "zstd_seekable.h"
30
31 static void* malloc_orDie(size_t size)
32 {
33     void* const buff = malloc(size);
34     if (buff) return buff;
35     /* error */
36     perror("malloc:");
37     exit(1);
38 }
39
40 static FILE* fopen_orDie(const char *filename, const char *instruction)
41 {
42     FILE* const inFile = fopen(filename, instruction);
43     if (inFile) return inFile;
44     /* error */
45     perror(filename);
46     exit(3);
47 }
48
49 static size_t fread_orDie(void* buffer, size_t sizeToRead, FILE* file)
50 {
51     size_t const readSize = fread(buffer, 1, sizeToRead, file);
52     if (readSize == sizeToRead) return readSize;   /* good */
53     if (feof(file)) return readSize;   /* good, reached end of file */
54     /* error */
55     perror("fread");
56     exit(4);
57 }
58
59 static size_t fwrite_orDie(const void* buffer, size_t sizeToWrite, FILE* file)
60 {
61     size_t const writtenSize = fwrite(buffer, 1, sizeToWrite, file);
62     if (writtenSize == sizeToWrite) return sizeToWrite;   /* good */
63     /* error */
64     perror("fwrite");
65     exit(5);
66 }
67
68 static size_t fclose_orDie(FILE* file)
69 {
70     if (!fclose(file)) return 0;
71     /* error */
72     perror("fclose");
73     exit(6);
74 }
75
76 static void fseek_orDie(FILE* file, long int offset, int origin)
77 {
78     if (!fseek(file, offset, origin)) {
79         if (!fflush(file)) return;
80     }
81     /* error */
82     perror("fseek");
83     exit(7);
84 }
85
86 static long int ftell_orDie(FILE* file)
87 {
88     long int off = ftell(file);
89     if (off != -1) return off;
90     /* error */
91     perror("ftell");
92     exit(8);
93 }
94
95 struct job {
96     const void* src;
97     size_t srcSize;
98     void* dst;
99     size_t dstSize;
100
101     unsigned checksum;
102
103     int compressionLevel;
104     int done;
105 };
106
107 static void compressFrame(void* opaque)
108 {
109     struct job* job = opaque;
110
111     job->checksum = XXH64(job->src, job->srcSize, 0);
112
113     size_t ret = ZSTD_compress(job->dst, job->dstSize, job->src, job->srcSize, job->compressionLevel);
114     if (ZSTD_isError(ret)) {
115         fprintf(stderr, "ZSTD_compress() error : %s \n", ZSTD_getErrorName(ret));
116         exit(20);
117     }
118
119     job->dstSize = ret;
120     job->done = 1;
121 }
122
123 static void compressFile_orDie(const char* fname, const char* outName, int cLevel, unsigned frameSize, int nbThreads)
124 {
125     POOL_ctx* pool = POOL_create(nbThreads, nbThreads);
126     if (pool == NULL) { fprintf(stderr, "POOL_create() error \n"); exit(9); }
127
128     FILE* const fin  = fopen_orDie(fname, "rb");
129     FILE* const fout = fopen_orDie(outName, "wb");
130
131     if (ZSTD_compressBound(frameSize) > 0xFFFFFFFFU) { fprintf(stderr, "Frame size too large \n"); exit(10); }
132     unsigned dstSize = ZSTD_compressBound(frameSize);
133
134
135     fseek_orDie(fin, 0, SEEK_END);
136     long int length = ftell_orDie(fin);
137     fseek_orDie(fin, 0, SEEK_SET);
138
139     size_t numFrames = (length + frameSize - 1) / frameSize;
140
141     struct job* jobs = malloc_orDie(sizeof(struct job) * numFrames);
142
143     size_t i;
144     for(i = 0; i < numFrames; i++) {
145         void* in = malloc_orDie(frameSize);
146         void* out = malloc_orDie(dstSize);
147
148         size_t inSize = fread_orDie(in, frameSize, fin);
149
150         jobs[i].src = in;
151         jobs[i].srcSize = inSize;
152         jobs[i].dst = out;
153         jobs[i].dstSize = dstSize;
154         jobs[i].compressionLevel = cLevel;
155         jobs[i].done = 0;
156         POOL_add(pool, compressFrame, &jobs[i]);
157     }
158
159     ZSTD_frameLog* fl = ZSTD_seekable_createFrameLog(1);
160     if (fl == NULL) { fprintf(stderr, "ZSTD_seekable_createFrameLog() failed \n"); exit(11); }
161     for (i = 0; i < numFrames; i++) {
162         while (!jobs[i].done) SLEEP(5); /* wake up every 5 milliseconds to check */
163         fwrite_orDie(jobs[i].dst, jobs[i].dstSize, fout);
164         free((void*)jobs[i].src);
165         free(jobs[i].dst);
166
167         size_t ret = ZSTD_seekable_logFrame(fl, jobs[i].dstSize, jobs[i].srcSize, jobs[i].checksum);
168         if (ZSTD_isError(ret)) { fprintf(stderr, "ZSTD_seekable_logFrame() error : %s \n", ZSTD_getErrorName(ret)); }
169     }
170
171     {   unsigned char seekTableBuff[1024];
172         ZSTD_outBuffer out = {seekTableBuff, 1024, 0};
173         while (ZSTD_seekable_writeSeekTable(fl, &out) != 0) {
174             fwrite_orDie(seekTableBuff, out.pos, fout);
175             out.pos = 0;
176         }
177         fwrite_orDie(seekTableBuff, out.pos, fout);
178     }
179
180     ZSTD_seekable_freeFrameLog(fl);
181     free(jobs);
182     fclose_orDie(fout);
183     fclose_orDie(fin);
184 }
185
186 static const char* createOutFilename_orDie(const char* filename)
187 {
188     size_t const inL = strlen(filename);
189     size_t const outL = inL + 5;
190     void* outSpace = malloc_orDie(outL);
191     memset(outSpace, 0, outL);
192     strcat(outSpace, filename);
193     strcat(outSpace, ".zst");
194     return (const char*)outSpace;
195 }
196
197 int main(int argc, const char** argv) {
198     const char* const exeName = argv[0];
199     if (argc!=4) {
200         printf("wrong arguments\n");
201         printf("usage:\n");
202         printf("%s FILE FRAME_SIZE NB_THREADS\n", exeName);
203         return 1;
204     }
205
206     {   const char* const inFileName = argv[1];
207         unsigned const frameSize = (unsigned)atoi(argv[2]);
208         int const nbThreads = atoi(argv[3]);
209
210         const char* const outFileName = createOutFilename_orDie(inFileName);
211         compressFile_orDie(inFileName, outFileName, 5, frameSize, nbThreads);
212     }
213
214     return 0;
215 }