]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/xz/src/liblzma/common/stream_encoder_mt.c
MFC r359201: MFV r359197: xz 5.2.5.
[FreeBSD/FreeBSD.git] / contrib / xz / src / liblzma / common / stream_encoder_mt.c
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       stream_encoder_mt.c
4 /// \brief      Multithreaded .xz Stream encoder
5 //
6 //  Author:     Lasse Collin
7 //
8 //  This file has been put into the public domain.
9 //  You can do whatever you want with this file.
10 //
11 ///////////////////////////////////////////////////////////////////////////////
12
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
18 #include "outqueue.h"
19
20
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24
25
26 typedef enum {
27         /// Waiting for work.
28         THR_IDLE,
29
30         /// Encoding is in progress.
31         THR_RUN,
32
33         /// Encoding is in progress but no more input data will
34         /// be read.
35         THR_FINISH,
36
37         /// The main thread wants the thread to stop whatever it was doing
38         /// but not exit.
39         THR_STOP,
40
41         /// The main thread wants the thread to exit. We could use
42         /// cancellation but since there's stopped anyway, this is lazier.
43         THR_EXIT,
44
45 } worker_state;
46
47 typedef struct lzma_stream_coder_s lzma_stream_coder;
48
49 typedef struct worker_thread_s worker_thread;
50 struct worker_thread_s {
51         worker_state state;
52
53         /// Input buffer of coder->block_size bytes. The main thread will
54         /// put new input into this and update in_size accordingly. Once
55         /// no more input is coming, state will be set to THR_FINISH.
56         uint8_t *in;
57
58         /// Amount of data available in the input buffer. This is modified
59         /// only by the main thread.
60         size_t in_size;
61
62         /// Output buffer for this thread. This is set by the main
63         /// thread every time a new Block is started with this thread
64         /// structure.
65         lzma_outbuf *outbuf;
66
67         /// Pointer to the main structure is needed when putting this
68         /// thread back to the stack of free threads.
69         lzma_stream_coder *coder;
70
71         /// The allocator is set by the main thread. Since a copy of the
72         /// pointer is kept here, the application must not change the
73         /// allocator before calling lzma_end().
74         const lzma_allocator *allocator;
75
76         /// Amount of uncompressed data that has already been compressed.
77         uint64_t progress_in;
78
79         /// Amount of compressed data that is ready.
80         uint64_t progress_out;
81
82         /// Block encoder
83         lzma_next_coder block_encoder;
84
85         /// Compression options for this Block
86         lzma_block block_options;
87
88         /// Next structure in the stack of free worker threads.
89         worker_thread *next;
90
91         mythread_mutex mutex;
92         mythread_cond cond;
93
94         /// The ID of this thread is used to join the thread
95         /// when it's not needed anymore.
96         mythread thread_id;
97 };
98
99
100 struct lzma_stream_coder_s {
101         enum {
102                 SEQ_STREAM_HEADER,
103                 SEQ_BLOCK,
104                 SEQ_INDEX,
105                 SEQ_STREAM_FOOTER,
106         } sequence;
107
108         /// Start a new Block every block_size bytes of input unless
109         /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
110         size_t block_size;
111
112         /// The filter chain currently in use
113         lzma_filter filters[LZMA_FILTERS_MAX + 1];
114
115
116         /// Index to hold sizes of the Blocks
117         lzma_index *index;
118
119         /// Index encoder
120         lzma_next_coder index_encoder;
121
122
123         /// Stream Flags for encoding the Stream Header and Stream Footer.
124         lzma_stream_flags stream_flags;
125
126         /// Buffer to hold Stream Header and Stream Footer.
127         uint8_t header[LZMA_STREAM_HEADER_SIZE];
128
129         /// Read position in header[]
130         size_t header_pos;
131
132
133         /// Output buffer queue for compressed data
134         lzma_outq outq;
135
136
137         /// Maximum wait time if cannot use all the input and cannot
138         /// fill the output buffer. This is in milliseconds.
139         uint32_t timeout;
140
141
142         /// Error code from a worker thread
143         lzma_ret thread_error;
144
145         /// Array of allocated thread-specific structures
146         worker_thread *threads;
147
148         /// Number of structures in "threads" above. This is also the
149         /// number of threads that will be created at maximum.
150         uint32_t threads_max;
151
152         /// Number of thread structures that have been initialized, and
153         /// thus the number of worker threads actually created so far.
154         uint32_t threads_initialized;
155
156         /// Stack of free threads. When a thread finishes, it puts itself
157         /// back into this stack. This starts as empty because threads
158         /// are created only when actually needed.
159         worker_thread *threads_free;
160
161         /// The most recent worker thread to which the main thread writes
162         /// the new input from the application.
163         worker_thread *thr;
164
165
166         /// Amount of uncompressed data in Blocks that have already
167         /// been finished.
168         uint64_t progress_in;
169
170         /// Amount of compressed data in Stream Header + Blocks that
171         /// have already been finished.
172         uint64_t progress_out;
173
174
175         mythread_mutex mutex;
176         mythread_cond cond;
177 };
178
179
180 /// Tell the main thread that something has gone wrong.
181 static void
182 worker_error(worker_thread *thr, lzma_ret ret)
183 {
184         assert(ret != LZMA_OK);
185         assert(ret != LZMA_STREAM_END);
186
187         mythread_sync(thr->coder->mutex) {
188                 if (thr->coder->thread_error == LZMA_OK)
189                         thr->coder->thread_error = ret;
190
191                 mythread_cond_signal(&thr->coder->cond);
192         }
193
194         return;
195 }
196
197
198 static worker_state
199 worker_encode(worker_thread *thr, worker_state state)
200 {
201         assert(thr->progress_in == 0);
202         assert(thr->progress_out == 0);
203
204         // Set the Block options.
205         thr->block_options = (lzma_block){
206                 .version = 0,
207                 .check = thr->coder->stream_flags.check,
208                 .compressed_size = thr->coder->outq.buf_size_max,
209                 .uncompressed_size = thr->coder->block_size,
210
211                 // TODO: To allow changing the filter chain, the filters
212                 // array must be copied to each worker_thread.
213                 .filters = thr->coder->filters,
214         };
215
216         // Calculate maximum size of the Block Header. This amount is
217         // reserved in the beginning of the buffer so that Block Header
218         // along with Compressed Size and Uncompressed Size can be
219         // written there.
220         lzma_ret ret = lzma_block_header_size(&thr->block_options);
221         if (ret != LZMA_OK) {
222                 worker_error(thr, ret);
223                 return THR_STOP;
224         }
225
226         // Initialize the Block encoder.
227         ret = lzma_block_encoder_init(&thr->block_encoder,
228                         thr->allocator, &thr->block_options);
229         if (ret != LZMA_OK) {
230                 worker_error(thr, ret);
231                 return THR_STOP;
232         }
233
234         size_t in_pos = 0;
235         size_t in_size = 0;
236
237         thr->outbuf->size = thr->block_options.header_size;
238         const size_t out_size = thr->coder->outq.buf_size_max;
239
240         do {
241                 mythread_sync(thr->mutex) {
242                         // Store in_pos and out_pos into *thr so that
243                         // an application may read them via
244                         // lzma_get_progress() to get progress information.
245                         //
246                         // NOTE: These aren't updated when the encoding
247                         // finishes. Instead, the final values are taken
248                         // later from thr->outbuf.
249                         thr->progress_in = in_pos;
250                         thr->progress_out = thr->outbuf->size;
251
252                         while (in_size == thr->in_size
253                                         && thr->state == THR_RUN)
254                                 mythread_cond_wait(&thr->cond, &thr->mutex);
255
256                         state = thr->state;
257                         in_size = thr->in_size;
258                 }
259
260                 // Return if we were asked to stop or exit.
261                 if (state >= THR_STOP)
262                         return state;
263
264                 lzma_action action = state == THR_FINISH
265                                 ? LZMA_FINISH : LZMA_RUN;
266
267                 // Limit the amount of input given to the Block encoder
268                 // at once. This way this thread can react fairly quickly
269                 // if the main thread wants us to stop or exit.
270                 static const size_t in_chunk_max = 16384;
271                 size_t in_limit = in_size;
272                 if (in_size - in_pos > in_chunk_max) {
273                         in_limit = in_pos + in_chunk_max;
274                         action = LZMA_RUN;
275                 }
276
277                 ret = thr->block_encoder.code(
278                                 thr->block_encoder.coder, thr->allocator,
279                                 thr->in, &in_pos, in_limit, thr->outbuf->buf,
280                                 &thr->outbuf->size, out_size, action);
281         } while (ret == LZMA_OK && thr->outbuf->size < out_size);
282
283         switch (ret) {
284         case LZMA_STREAM_END:
285                 assert(state == THR_FINISH);
286
287                 // Encode the Block Header. By doing it after
288                 // the compression, we can store the Compressed Size
289                 // and Uncompressed Size fields.
290                 ret = lzma_block_header_encode(&thr->block_options,
291                                 thr->outbuf->buf);
292                 if (ret != LZMA_OK) {
293                         worker_error(thr, ret);
294                         return THR_STOP;
295                 }
296
297                 break;
298
299         case LZMA_OK:
300                 // The data was incompressible. Encode it using uncompressed
301                 // LZMA2 chunks.
302                 //
303                 // First wait that we have gotten all the input.
304                 mythread_sync(thr->mutex) {
305                         while (thr->state == THR_RUN)
306                                 mythread_cond_wait(&thr->cond, &thr->mutex);
307
308                         state = thr->state;
309                         in_size = thr->in_size;
310                 }
311
312                 if (state >= THR_STOP)
313                         return state;
314
315                 // Do the encoding. This takes care of the Block Header too.
316                 thr->outbuf->size = 0;
317                 ret = lzma_block_uncomp_encode(&thr->block_options,
318                                 thr->in, in_size, thr->outbuf->buf,
319                                 &thr->outbuf->size, out_size);
320
321                 // It shouldn't fail.
322                 if (ret != LZMA_OK) {
323                         worker_error(thr, LZMA_PROG_ERROR);
324                         return THR_STOP;
325                 }
326
327                 break;
328
329         default:
330                 worker_error(thr, ret);
331                 return THR_STOP;
332         }
333
334         // Set the size information that will be read by the main thread
335         // to write the Index field.
336         thr->outbuf->unpadded_size
337                         = lzma_block_unpadded_size(&thr->block_options);
338         assert(thr->outbuf->unpadded_size != 0);
339         thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
340
341         return THR_FINISH;
342 }
343
344
345 static MYTHREAD_RET_TYPE
346 worker_start(void *thr_ptr)
347 {
348         worker_thread *thr = thr_ptr;
349         worker_state state = THR_IDLE; // Init to silence a warning
350
351         while (true) {
352                 // Wait for work.
353                 mythread_sync(thr->mutex) {
354                         while (true) {
355                                 // The thread is already idle so if we are
356                                 // requested to stop, just set the state.
357                                 if (thr->state == THR_STOP) {
358                                         thr->state = THR_IDLE;
359                                         mythread_cond_signal(&thr->cond);
360                                 }
361
362                                 state = thr->state;
363                                 if (state != THR_IDLE)
364                                         break;
365
366                                 mythread_cond_wait(&thr->cond, &thr->mutex);
367                         }
368                 }
369
370                 assert(state != THR_IDLE);
371                 assert(state != THR_STOP);
372
373                 if (state <= THR_FINISH)
374                         state = worker_encode(thr, state);
375
376                 if (state == THR_EXIT)
377                         break;
378
379                 // Mark the thread as idle unless the main thread has
380                 // told us to exit. Signal is needed for the case
381                 // where the main thread is waiting for the threads to stop.
382                 mythread_sync(thr->mutex) {
383                         if (thr->state != THR_EXIT) {
384                                 thr->state = THR_IDLE;
385                                 mythread_cond_signal(&thr->cond);
386                         }
387                 }
388
389                 mythread_sync(thr->coder->mutex) {
390                         // Mark the output buffer as finished if
391                         // no errors occurred.
392                         thr->outbuf->finished = state == THR_FINISH;
393
394                         // Update the main progress info.
395                         thr->coder->progress_in
396                                         += thr->outbuf->uncompressed_size;
397                         thr->coder->progress_out += thr->outbuf->size;
398                         thr->progress_in = 0;
399                         thr->progress_out = 0;
400
401                         // Return this thread to the stack of free threads.
402                         thr->next = thr->coder->threads_free;
403                         thr->coder->threads_free = thr;
404
405                         mythread_cond_signal(&thr->coder->cond);
406                 }
407         }
408
409         // Exiting, free the resources.
410         mythread_mutex_destroy(&thr->mutex);
411         mythread_cond_destroy(&thr->cond);
412
413         lzma_next_end(&thr->block_encoder, thr->allocator);
414         lzma_free(thr->in, thr->allocator);
415         return MYTHREAD_RET_VALUE;
416 }
417
418
419 /// Make the threads stop but not exit. Optionally wait for them to stop.
420 static void
421 threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
422 {
423         // Tell the threads to stop.
424         for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
425                 mythread_sync(coder->threads[i].mutex) {
426                         coder->threads[i].state = THR_STOP;
427                         mythread_cond_signal(&coder->threads[i].cond);
428                 }
429         }
430
431         if (!wait_for_threads)
432                 return;
433
434         // Wait for the threads to settle in the idle state.
435         for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
436                 mythread_sync(coder->threads[i].mutex) {
437                         while (coder->threads[i].state != THR_IDLE)
438                                 mythread_cond_wait(&coder->threads[i].cond,
439                                                 &coder->threads[i].mutex);
440                 }
441         }
442
443         return;
444 }
445
446
447 /// Stop the threads and free the resources associated with them.
448 /// Wait until the threads have exited.
449 static void
450 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
451 {
452         for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453                 mythread_sync(coder->threads[i].mutex) {
454                         coder->threads[i].state = THR_EXIT;
455                         mythread_cond_signal(&coder->threads[i].cond);
456                 }
457         }
458
459         for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460                 int ret = mythread_join(coder->threads[i].thread_id);
461                 assert(ret == 0);
462                 (void)ret;
463         }
464
465         lzma_free(coder->threads, allocator);
466         return;
467 }
468
469
470 /// Initialize a new worker_thread structure and create a new thread.
471 static lzma_ret
472 initialize_new_thread(lzma_stream_coder *coder,
473                 const lzma_allocator *allocator)
474 {
475         worker_thread *thr = &coder->threads[coder->threads_initialized];
476
477         thr->in = lzma_alloc(coder->block_size, allocator);
478         if (thr->in == NULL)
479                 return LZMA_MEM_ERROR;
480
481         if (mythread_mutex_init(&thr->mutex))
482                 goto error_mutex;
483
484         if (mythread_cond_init(&thr->cond))
485                 goto error_cond;
486
487         thr->state = THR_IDLE;
488         thr->allocator = allocator;
489         thr->coder = coder;
490         thr->progress_in = 0;
491         thr->progress_out = 0;
492         thr->block_encoder = LZMA_NEXT_CODER_INIT;
493
494         if (mythread_create(&thr->thread_id, &worker_start, thr))
495                 goto error_thread;
496
497         ++coder->threads_initialized;
498         coder->thr = thr;
499
500         return LZMA_OK;
501
502 error_thread:
503         mythread_cond_destroy(&thr->cond);
504
505 error_cond:
506         mythread_mutex_destroy(&thr->mutex);
507
508 error_mutex:
509         lzma_free(thr->in, allocator);
510         return LZMA_MEM_ERROR;
511 }
512
513
514 static lzma_ret
515 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
516 {
517         // If there are no free output subqueues, there is no
518         // point to try getting a thread.
519         if (!lzma_outq_has_buf(&coder->outq))
520                 return LZMA_OK;
521
522         // If there is a free structure on the stack, use it.
523         mythread_sync(coder->mutex) {
524                 if (coder->threads_free != NULL) {
525                         coder->thr = coder->threads_free;
526                         coder->threads_free = coder->threads_free->next;
527                 }
528         }
529
530         if (coder->thr == NULL) {
531                 // If there are no uninitialized structures left, return.
532                 if (coder->threads_initialized == coder->threads_max)
533                         return LZMA_OK;
534
535                 // Initialize a new thread.
536                 return_if_error(initialize_new_thread(coder, allocator));
537         }
538
539         // Reset the parts of the thread state that have to be done
540         // in the main thread.
541         mythread_sync(coder->thr->mutex) {
542                 coder->thr->state = THR_RUN;
543                 coder->thr->in_size = 0;
544                 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
545                 mythread_cond_signal(&coder->thr->cond);
546         }
547
548         return LZMA_OK;
549 }
550
551
552 static lzma_ret
553 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
554                 const uint8_t *restrict in, size_t *restrict in_pos,
555                 size_t in_size, lzma_action action)
556 {
557         while (*in_pos < in_size
558                         || (coder->thr != NULL && action != LZMA_RUN)) {
559                 if (coder->thr == NULL) {
560                         // Get a new thread.
561                         const lzma_ret ret = get_thread(coder, allocator);
562                         if (coder->thr == NULL)
563                                 return ret;
564                 }
565
566                 // Copy the input data to thread's buffer.
567                 size_t thr_in_size = coder->thr->in_size;
568                 lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
569                                 &thr_in_size, coder->block_size);
570
571                 // Tell the Block encoder to finish if
572                 //  - it has got block_size bytes of input; or
573                 //  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
574                 //    or LZMA_FULL_BARRIER was used.
575                 //
576                 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
577                 const bool finish = thr_in_size == coder->block_size
578                                 || (*in_pos == in_size && action != LZMA_RUN);
579
580                 bool block_error = false;
581
582                 mythread_sync(coder->thr->mutex) {
583                         if (coder->thr->state == THR_IDLE) {
584                                 // Something has gone wrong with the Block
585                                 // encoder. It has set coder->thread_error
586                                 // which we will read a few lines later.
587                                 block_error = true;
588                         } else {
589                                 // Tell the Block encoder its new amount
590                                 // of input and update the state if needed.
591                                 coder->thr->in_size = thr_in_size;
592
593                                 if (finish)
594                                         coder->thr->state = THR_FINISH;
595
596                                 mythread_cond_signal(&coder->thr->cond);
597                         }
598                 }
599
600                 if (block_error) {
601                         lzma_ret ret;
602
603                         mythread_sync(coder->mutex) {
604                                 ret = coder->thread_error;
605                         }
606
607                         return ret;
608                 }
609
610                 if (finish)
611                         coder->thr = NULL;
612         }
613
614         return LZMA_OK;
615 }
616
617
618 /// Wait until more input can be consumed, more output can be read, or
619 /// an optional timeout is reached.
620 static bool
621 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
622                 bool *has_blocked, bool has_input)
623 {
624         if (coder->timeout != 0 && !*has_blocked) {
625                 // Every time when stream_encode_mt() is called via
626                 // lzma_code(), *has_blocked starts as false. We set it
627                 // to true here and calculate the absolute time when
628                 // we must return if there's nothing to do.
629                 //
630                 // The idea of *has_blocked is to avoid unneeded calls
631                 // to mythread_condtime_set(), which may do a syscall
632                 // depending on the operating system.
633                 *has_blocked = true;
634                 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
635         }
636
637         bool timed_out = false;
638
639         mythread_sync(coder->mutex) {
640                 // There are four things that we wait. If one of them
641                 // becomes possible, we return.
642                 //  - If there is input left, we need to get a free
643                 //    worker thread and an output buffer for it.
644                 //  - Data ready to be read from the output queue.
645                 //  - A worker thread indicates an error.
646                 //  - Time out occurs.
647                 while ((!has_input || coder->threads_free == NULL
648                                         || !lzma_outq_has_buf(&coder->outq))
649                                 && !lzma_outq_is_readable(&coder->outq)
650                                 && coder->thread_error == LZMA_OK
651                                 && !timed_out) {
652                         if (coder->timeout != 0)
653                                 timed_out = mythread_cond_timedwait(
654                                                 &coder->cond, &coder->mutex,
655                                                 wait_abs) != 0;
656                         else
657                                 mythread_cond_wait(&coder->cond,
658                                                 &coder->mutex);
659                 }
660         }
661
662         return timed_out;
663 }
664
665
666 static lzma_ret
667 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
668                 const uint8_t *restrict in, size_t *restrict in_pos,
669                 size_t in_size, uint8_t *restrict out,
670                 size_t *restrict out_pos, size_t out_size, lzma_action action)
671 {
672         lzma_stream_coder *coder = coder_ptr;
673
674         switch (coder->sequence) {
675         case SEQ_STREAM_HEADER:
676                 lzma_bufcpy(coder->header, &coder->header_pos,
677                                 sizeof(coder->header),
678                                 out, out_pos, out_size);
679                 if (coder->header_pos < sizeof(coder->header))
680                         return LZMA_OK;
681
682                 coder->header_pos = 0;
683                 coder->sequence = SEQ_BLOCK;
684
685         // Fall through
686
687         case SEQ_BLOCK: {
688                 // Initialized to silence warnings.
689                 lzma_vli unpadded_size = 0;
690                 lzma_vli uncompressed_size = 0;
691                 lzma_ret ret = LZMA_OK;
692
693                 // These are for wait_for_work().
694                 bool has_blocked = false;
695                 mythread_condtime wait_abs;
696
697                 while (true) {
698                         mythread_sync(coder->mutex) {
699                                 // Check for Block encoder errors.
700                                 ret = coder->thread_error;
701                                 if (ret != LZMA_OK) {
702                                         assert(ret != LZMA_STREAM_END);
703                                         break; // Break out of mythread_sync.
704                                 }
705
706                                 // Try to read compressed data to out[].
707                                 ret = lzma_outq_read(&coder->outq,
708                                                 out, out_pos, out_size,
709                                                 &unpadded_size,
710                                                 &uncompressed_size);
711                         }
712
713                         if (ret == LZMA_STREAM_END) {
714                                 // End of Block. Add it to the Index.
715                                 ret = lzma_index_append(coder->index,
716                                                 allocator, unpadded_size,
717                                                 uncompressed_size);
718
719                                 // If we didn't fill the output buffer yet,
720                                 // try to read more data. Maybe the next
721                                 // outbuf has been finished already too.
722                                 if (*out_pos < out_size)
723                                         continue;
724                         }
725
726                         if (ret != LZMA_OK) {
727                                 // coder->thread_error was set or
728                                 // lzma_index_append() failed.
729                                 threads_stop(coder, false);
730                                 return ret;
731                         }
732
733                         // Try to give uncompressed data to a worker thread.
734                         ret = stream_encode_in(coder, allocator,
735                                         in, in_pos, in_size, action);
736                         if (ret != LZMA_OK) {
737                                 threads_stop(coder, false);
738                                 return ret;
739                         }
740
741                         // See if we should wait or return.
742                         //
743                         // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
744                         if (*in_pos == in_size) {
745                                 // LZMA_RUN: More data is probably coming
746                                 // so return to let the caller fill the
747                                 // input buffer.
748                                 if (action == LZMA_RUN)
749                                         return LZMA_OK;
750
751                                 // LZMA_FULL_BARRIER: The same as with
752                                 // LZMA_RUN but tell the caller that the
753                                 // barrier was completed.
754                                 if (action == LZMA_FULL_BARRIER)
755                                         return LZMA_STREAM_END;
756
757                                 // Finishing or flushing isn't completed until
758                                 // all input data has been encoded and copied
759                                 // to the output buffer.
760                                 if (lzma_outq_is_empty(&coder->outq)) {
761                                         // LZMA_FINISH: Continue to encode
762                                         // the Index field.
763                                         if (action == LZMA_FINISH)
764                                                 break;
765
766                                         // LZMA_FULL_FLUSH: Return to tell
767                                         // the caller that flushing was
768                                         // completed.
769                                         if (action == LZMA_FULL_FLUSH)
770                                                 return LZMA_STREAM_END;
771                                 }
772                         }
773
774                         // Return if there is no output space left.
775                         // This check must be done after testing the input
776                         // buffer, because we might want to use a different
777                         // return code.
778                         if (*out_pos == out_size)
779                                 return LZMA_OK;
780
781                         // Neither in nor out has been used completely.
782                         // Wait until there's something we can do.
783                         if (wait_for_work(coder, &wait_abs, &has_blocked,
784                                         *in_pos < in_size))
785                                 return LZMA_TIMED_OUT;
786                 }
787
788                 // All Blocks have been encoded and the threads have stopped.
789                 // Prepare to encode the Index field.
790                 return_if_error(lzma_index_encoder_init(
791                                 &coder->index_encoder, allocator,
792                                 coder->index));
793                 coder->sequence = SEQ_INDEX;
794
795                 // Update the progress info to take the Index and
796                 // Stream Footer into account. Those are very fast to encode
797                 // so in terms of progress information they can be thought
798                 // to be ready to be copied out.
799                 coder->progress_out += lzma_index_size(coder->index)
800                                 + LZMA_STREAM_HEADER_SIZE;
801         }
802
803         // Fall through
804
805         case SEQ_INDEX: {
806                 // Call the Index encoder. It doesn't take any input, so
807                 // those pointers can be NULL.
808                 const lzma_ret ret = coder->index_encoder.code(
809                                 coder->index_encoder.coder, allocator,
810                                 NULL, NULL, 0,
811                                 out, out_pos, out_size, LZMA_RUN);
812                 if (ret != LZMA_STREAM_END)
813                         return ret;
814
815                 // Encode the Stream Footer into coder->buffer.
816                 coder->stream_flags.backward_size
817                                 = lzma_index_size(coder->index);
818                 if (lzma_stream_footer_encode(&coder->stream_flags,
819                                 coder->header) != LZMA_OK)
820                         return LZMA_PROG_ERROR;
821
822                 coder->sequence = SEQ_STREAM_FOOTER;
823         }
824
825         // Fall through
826
827         case SEQ_STREAM_FOOTER:
828                 lzma_bufcpy(coder->header, &coder->header_pos,
829                                 sizeof(coder->header),
830                                 out, out_pos, out_size);
831                 return coder->header_pos < sizeof(coder->header)
832                                 ? LZMA_OK : LZMA_STREAM_END;
833         }
834
835         assert(0);
836         return LZMA_PROG_ERROR;
837 }
838
839
840 static void
841 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
842 {
843         lzma_stream_coder *coder = coder_ptr;
844
845         // Threads must be killed before the output queue can be freed.
846         threads_end(coder, allocator);
847         lzma_outq_end(&coder->outq, allocator);
848
849         for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
850                 lzma_free(coder->filters[i].options, allocator);
851
852         lzma_next_end(&coder->index_encoder, allocator);
853         lzma_index_end(coder->index, allocator);
854
855         mythread_cond_destroy(&coder->cond);
856         mythread_mutex_destroy(&coder->mutex);
857
858         lzma_free(coder, allocator);
859         return;
860 }
861
862
863 /// Options handling for lzma_stream_encoder_mt_init() and
864 /// lzma_stream_encoder_mt_memusage()
865 static lzma_ret
866 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
867                 const lzma_filter **filters, uint64_t *block_size,
868                 uint64_t *outbuf_size_max)
869 {
870         // Validate some of the options.
871         if (options == NULL)
872                 return LZMA_PROG_ERROR;
873
874         if (options->flags != 0 || options->threads == 0
875                         || options->threads > LZMA_THREADS_MAX)
876                 return LZMA_OPTIONS_ERROR;
877
878         if (options->filters != NULL) {
879                 // Filter chain was given, use it as is.
880                 *filters = options->filters;
881         } else {
882                 // Use a preset.
883                 if (lzma_easy_preset(opt_easy, options->preset))
884                         return LZMA_OPTIONS_ERROR;
885
886                 *filters = opt_easy->filters;
887         }
888
889         // Block size
890         if (options->block_size > 0) {
891                 if (options->block_size > BLOCK_SIZE_MAX)
892                         return LZMA_OPTIONS_ERROR;
893
894                 *block_size = options->block_size;
895         } else {
896                 // Determine the Block size from the filter chain.
897                 *block_size = lzma_mt_block_size(*filters);
898                 if (*block_size == 0)
899                         return LZMA_OPTIONS_ERROR;
900
901                 assert(*block_size <= BLOCK_SIZE_MAX);
902         }
903
904         // Calculate the maximum amount output that a single output buffer
905         // may need to hold. This is the same as the maximum total size of
906         // a Block.
907         *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
908         if (*outbuf_size_max == 0)
909                 return LZMA_MEM_ERROR;
910
911         return LZMA_OK;
912 }
913
914
915 static void
916 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
917 {
918         lzma_stream_coder *coder = coder_ptr;
919
920         // Lock coder->mutex to prevent finishing threads from moving their
921         // progress info from the worker_thread structure to lzma_stream_coder.
922         mythread_sync(coder->mutex) {
923                 *progress_in = coder->progress_in;
924                 *progress_out = coder->progress_out;
925
926                 for (size_t i = 0; i < coder->threads_initialized; ++i) {
927                         mythread_sync(coder->threads[i].mutex) {
928                                 *progress_in += coder->threads[i].progress_in;
929                                 *progress_out += coder->threads[i]
930                                                 .progress_out;
931                         }
932                 }
933         }
934
935         return;
936 }
937
938
939 static lzma_ret
940 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
941                 const lzma_mt *options)
942 {
943         lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
944
945         // Get the filter chain.
946         lzma_options_easy easy;
947         const lzma_filter *filters;
948         uint64_t block_size;
949         uint64_t outbuf_size_max;
950         return_if_error(get_options(options, &easy, &filters,
951                         &block_size, &outbuf_size_max));
952
953 #if SIZE_MAX < UINT64_MAX
954         if (block_size > SIZE_MAX)
955                 return LZMA_MEM_ERROR;
956 #endif
957
958         // Validate the filter chain so that we can give an error in this
959         // function instead of delaying it to the first call to lzma_code().
960         // The memory usage calculation verifies the filter chain as
961         // a side effect so we take advantage of that.
962         if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
963                 return LZMA_OPTIONS_ERROR;
964
965         // Validate the Check ID.
966         if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
967                 return LZMA_PROG_ERROR;
968
969         if (!lzma_check_is_supported(options->check))
970                 return LZMA_UNSUPPORTED_CHECK;
971
972         // Allocate and initialize the base structure if needed.
973         lzma_stream_coder *coder = next->coder;
974         if (coder == NULL) {
975                 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
976                 if (coder == NULL)
977                         return LZMA_MEM_ERROR;
978
979                 next->coder = coder;
980
981                 // For the mutex and condition variable initializations
982                 // the error handling has to be done here because
983                 // stream_encoder_mt_end() doesn't know if they have
984                 // already been initialized or not.
985                 if (mythread_mutex_init(&coder->mutex)) {
986                         lzma_free(coder, allocator);
987                         next->coder = NULL;
988                         return LZMA_MEM_ERROR;
989                 }
990
991                 if (mythread_cond_init(&coder->cond)) {
992                         mythread_mutex_destroy(&coder->mutex);
993                         lzma_free(coder, allocator);
994                         next->coder = NULL;
995                         return LZMA_MEM_ERROR;
996                 }
997
998                 next->code = &stream_encode_mt;
999                 next->end = &stream_encoder_mt_end;
1000                 next->get_progress = &get_progress;
1001 //              next->update = &stream_encoder_mt_update;
1002
1003                 coder->filters[0].id = LZMA_VLI_UNKNOWN;
1004                 coder->index_encoder = LZMA_NEXT_CODER_INIT;
1005                 coder->index = NULL;
1006                 memzero(&coder->outq, sizeof(coder->outq));
1007                 coder->threads = NULL;
1008                 coder->threads_max = 0;
1009                 coder->threads_initialized = 0;
1010         }
1011
1012         // Basic initializations
1013         coder->sequence = SEQ_STREAM_HEADER;
1014         coder->block_size = (size_t)(block_size);
1015         coder->thread_error = LZMA_OK;
1016         coder->thr = NULL;
1017
1018         // Allocate the thread-specific base structures.
1019         assert(options->threads > 0);
1020         if (coder->threads_max != options->threads) {
1021                 threads_end(coder, allocator);
1022
1023                 coder->threads = NULL;
1024                 coder->threads_max = 0;
1025
1026                 coder->threads_initialized = 0;
1027                 coder->threads_free = NULL;
1028
1029                 coder->threads = lzma_alloc(
1030                                 options->threads * sizeof(worker_thread),
1031                                 allocator);
1032                 if (coder->threads == NULL)
1033                         return LZMA_MEM_ERROR;
1034
1035                 coder->threads_max = options->threads;
1036         } else {
1037                 // Reuse the old structures and threads. Tell the running
1038                 // threads to stop and wait until they have stopped.
1039                 threads_stop(coder, true);
1040         }
1041
1042         // Output queue
1043         return_if_error(lzma_outq_init(&coder->outq, allocator,
1044                         outbuf_size_max, options->threads));
1045
1046         // Timeout
1047         coder->timeout = options->timeout;
1048
1049         // Free the old filter chain and copy the new one.
1050         for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051                 lzma_free(coder->filters[i].options, allocator);
1052
1053         return_if_error(lzma_filters_copy(
1054                         filters, coder->filters, allocator));
1055
1056         // Index
1057         lzma_index_end(coder->index, allocator);
1058         coder->index = lzma_index_init(allocator);
1059         if (coder->index == NULL)
1060                 return LZMA_MEM_ERROR;
1061
1062         // Stream Header
1063         coder->stream_flags.version = 0;
1064         coder->stream_flags.check = options->check;
1065         return_if_error(lzma_stream_header_encode(
1066                         &coder->stream_flags, coder->header));
1067
1068         coder->header_pos = 0;
1069
1070         // Progress info
1071         coder->progress_in = 0;
1072         coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1073
1074         return LZMA_OK;
1075 }
1076
1077
1078 extern LZMA_API(lzma_ret)
1079 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1080 {
1081         lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1082
1083         strm->internal->supported_actions[LZMA_RUN] = true;
1084 //      strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1085         strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1086         strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1087         strm->internal->supported_actions[LZMA_FINISH] = true;
1088
1089         return LZMA_OK;
1090 }
1091
1092
1093 // This function name is a monster but it's consistent with the older
1094 // monster names. :-( 31 chars is the max that C99 requires so in that
1095 // sense it's not too long. ;-)
1096 extern LZMA_API(uint64_t)
1097 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1098 {
1099         lzma_options_easy easy;
1100         const lzma_filter *filters;
1101         uint64_t block_size;
1102         uint64_t outbuf_size_max;
1103
1104         if (get_options(options, &easy, &filters, &block_size,
1105                         &outbuf_size_max) != LZMA_OK)
1106                 return UINT64_MAX;
1107
1108         // Memory usage of the input buffers
1109         const uint64_t inbuf_memusage = options->threads * block_size;
1110
1111         // Memory usage of the filter encoders
1112         uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1113         if (filters_memusage == UINT64_MAX)
1114                 return UINT64_MAX;
1115
1116         filters_memusage *= options->threads;
1117
1118         // Memory usage of the output queue
1119         const uint64_t outq_memusage = lzma_outq_memusage(
1120                         outbuf_size_max, options->threads);
1121         if (outq_memusage == UINT64_MAX)
1122                 return UINT64_MAX;
1123
1124         // Sum them with overflow checking.
1125         uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126                         + sizeof(lzma_stream_coder)
1127                         + options->threads * sizeof(worker_thread);
1128
1129         if (UINT64_MAX - total_memusage < inbuf_memusage)
1130                 return UINT64_MAX;
1131
1132         total_memusage += inbuf_memusage;
1133
1134         if (UINT64_MAX - total_memusage < filters_memusage)
1135                 return UINT64_MAX;
1136
1137         total_memusage += filters_memusage;
1138
1139         if (UINT64_MAX - total_memusage < outq_memusage)
1140                 return UINT64_MAX;
1141
1142         return total_memusage + outq_memusage;
1143 }