]> CyberLeo.Net >> Repos - FreeBSD/stable/10.git/blob - contrib/xz/src/liblzma/common/stream_encoder_mt.c
MFC: xz 5.2.2.
[FreeBSD/stable/10.git] / contrib / xz / src / liblzma / common / stream_encoder_mt.c
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       stream_encoder_mt.c
4 /// \brief      Multithreaded .xz Stream encoder
5 //
6 //  Author:     Lasse Collin
7 //
8 //  This file has been put into the public domain.
9 //  You can do whatever you want with this file.
10 //
11 ///////////////////////////////////////////////////////////////////////////////
12
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
18 #include "outqueue.h"
19
20
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24
25
26 typedef enum {
27         /// Waiting for work.
28         THR_IDLE,
29
30         /// Encoding is in progress.
31         THR_RUN,
32
33         /// Encoding is in progress but no more input data will
34         /// be read.
35         THR_FINISH,
36
37         /// The main thread wants the thread to stop whatever it was doing
38         /// but not exit.
39         THR_STOP,
40
41         /// The main thread wants the thread to exit. We could use
42         /// cancellation but since there's stopped anyway, this is lazier.
43         THR_EXIT,
44
45 } worker_state;
46
47
48 typedef struct worker_thread_s worker_thread;
49 struct worker_thread_s {
50         worker_state state;
51
52         /// Input buffer of coder->block_size bytes. The main thread will
53         /// put new input into this and update in_size accordingly. Once
54         /// no more input is coming, state will be set to THR_FINISH.
55         uint8_t *in;
56
57         /// Amount of data available in the input buffer. This is modified
58         /// only by the main thread.
59         size_t in_size;
60
61         /// Output buffer for this thread. This is set by the main
62         /// thread every time a new Block is started with this thread
63         /// structure.
64         lzma_outbuf *outbuf;
65
66         /// Pointer to the main structure is needed when putting this
67         /// thread back to the stack of free threads.
68         lzma_coder *coder;
69
70         /// The allocator is set by the main thread. Since a copy of the
71         /// pointer is kept here, the application must not change the
72         /// allocator before calling lzma_end().
73         const lzma_allocator *allocator;
74
75         /// Amount of uncompressed data that has already been compressed.
76         uint64_t progress_in;
77
78         /// Amount of compressed data that is ready.
79         uint64_t progress_out;
80
81         /// Block encoder
82         lzma_next_coder block_encoder;
83
84         /// Compression options for this Block
85         lzma_block block_options;
86
87         /// Next structure in the stack of free worker threads.
88         worker_thread *next;
89
90         mythread_mutex mutex;
91         mythread_cond cond;
92
93         /// The ID of this thread is used to join the thread
94         /// when it's not needed anymore.
95         mythread thread_id;
96 };
97
98
99 struct lzma_coder_s {
100         enum {
101                 SEQ_STREAM_HEADER,
102                 SEQ_BLOCK,
103                 SEQ_INDEX,
104                 SEQ_STREAM_FOOTER,
105         } sequence;
106
107         /// Start a new Block every block_size bytes of input unless
108         /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
109         size_t block_size;
110
111         /// The filter chain currently in use
112         lzma_filter filters[LZMA_FILTERS_MAX + 1];
113
114
115         /// Index to hold sizes of the Blocks
116         lzma_index *index;
117
118         /// Index encoder
119         lzma_next_coder index_encoder;
120
121
122         /// Stream Flags for encoding the Stream Header and Stream Footer.
123         lzma_stream_flags stream_flags;
124
125         /// Buffer to hold Stream Header and Stream Footer.
126         uint8_t header[LZMA_STREAM_HEADER_SIZE];
127
128         /// Read position in header[]
129         size_t header_pos;
130
131
132         /// Output buffer queue for compressed data
133         lzma_outq outq;
134
135
136         /// Maximum wait time if cannot use all the input and cannot
137         /// fill the output buffer. This is in milliseconds.
138         uint32_t timeout;
139
140
141         /// Error code from a worker thread
142         lzma_ret thread_error;
143
144         /// Array of allocated thread-specific structures
145         worker_thread *threads;
146
147         /// Number of structures in "threads" above. This is also the
148         /// number of threads that will be created at maximum.
149         uint32_t threads_max;
150
151         /// Number of thread structures that have been initialized, and
152         /// thus the number of worker threads actually created so far.
153         uint32_t threads_initialized;
154
155         /// Stack of free threads. When a thread finishes, it puts itself
156         /// back into this stack. This starts as empty because threads
157         /// are created only when actually needed.
158         worker_thread *threads_free;
159
160         /// The most recent worker thread to which the main thread writes
161         /// the new input from the application.
162         worker_thread *thr;
163
164
165         /// Amount of uncompressed data in Blocks that have already
166         /// been finished.
167         uint64_t progress_in;
168
169         /// Amount of compressed data in Stream Header + Blocks that
170         /// have already been finished.
171         uint64_t progress_out;
172
173
174         mythread_mutex mutex;
175         mythread_cond cond;
176 };
177
178
179 /// Tell the main thread that something has gone wrong.
180 static void
181 worker_error(worker_thread *thr, lzma_ret ret)
182 {
183         assert(ret != LZMA_OK);
184         assert(ret != LZMA_STREAM_END);
185
186         mythread_sync(thr->coder->mutex) {
187                 if (thr->coder->thread_error == LZMA_OK)
188                         thr->coder->thread_error = ret;
189
190                 mythread_cond_signal(&thr->coder->cond);
191         }
192
193         return;
194 }
195
196
197 static worker_state
198 worker_encode(worker_thread *thr, worker_state state)
199 {
200         assert(thr->progress_in == 0);
201         assert(thr->progress_out == 0);
202
203         // Set the Block options.
204         thr->block_options = (lzma_block){
205                 .version = 0,
206                 .check = thr->coder->stream_flags.check,
207                 .compressed_size = thr->coder->outq.buf_size_max,
208                 .uncompressed_size = thr->coder->block_size,
209
210                 // TODO: To allow changing the filter chain, the filters
211                 // array must be copied to each worker_thread.
212                 .filters = thr->coder->filters,
213         };
214
215         // Calculate maximum size of the Block Header. This amount is
216         // reserved in the beginning of the buffer so that Block Header
217         // along with Compressed Size and Uncompressed Size can be
218         // written there.
219         lzma_ret ret = lzma_block_header_size(&thr->block_options);
220         if (ret != LZMA_OK) {
221                 worker_error(thr, ret);
222                 return THR_STOP;
223         }
224
225         // Initialize the Block encoder.
226         ret = lzma_block_encoder_init(&thr->block_encoder,
227                         thr->allocator, &thr->block_options);
228         if (ret != LZMA_OK) {
229                 worker_error(thr, ret);
230                 return THR_STOP;
231         }
232
233         size_t in_pos = 0;
234         size_t in_size = 0;
235
236         thr->outbuf->size = thr->block_options.header_size;
237         const size_t out_size = thr->coder->outq.buf_size_max;
238
239         do {
240                 mythread_sync(thr->mutex) {
241                         // Store in_pos and out_pos into *thr so that
242                         // an application may read them via
243                         // lzma_get_progress() to get progress information.
244                         //
245                         // NOTE: These aren't updated when the encoding
246                         // finishes. Instead, the final values are taken
247                         // later from thr->outbuf.
248                         thr->progress_in = in_pos;
249                         thr->progress_out = thr->outbuf->size;
250
251                         while (in_size == thr->in_size
252                                         && thr->state == THR_RUN)
253                                 mythread_cond_wait(&thr->cond, &thr->mutex);
254
255                         state = thr->state;
256                         in_size = thr->in_size;
257                 }
258
259                 // Return if we were asked to stop or exit.
260                 if (state >= THR_STOP)
261                         return state;
262
263                 lzma_action action = state == THR_FINISH
264                                 ? LZMA_FINISH : LZMA_RUN;
265
266                 // Limit the amount of input given to the Block encoder
267                 // at once. This way this thread can react fairly quickly
268                 // if the main thread wants us to stop or exit.
269                 static const size_t in_chunk_max = 16384;
270                 size_t in_limit = in_size;
271                 if (in_size - in_pos > in_chunk_max) {
272                         in_limit = in_pos + in_chunk_max;
273                         action = LZMA_RUN;
274                 }
275
276                 ret = thr->block_encoder.code(
277                                 thr->block_encoder.coder, thr->allocator,
278                                 thr->in, &in_pos, in_limit, thr->outbuf->buf,
279                                 &thr->outbuf->size, out_size, action);
280         } while (ret == LZMA_OK && thr->outbuf->size < out_size);
281
282         switch (ret) {
283         case LZMA_STREAM_END:
284                 assert(state == THR_FINISH);
285
286                 // Encode the Block Header. By doing it after
287                 // the compression, we can store the Compressed Size
288                 // and Uncompressed Size fields.
289                 ret = lzma_block_header_encode(&thr->block_options,
290                                 thr->outbuf->buf);
291                 if (ret != LZMA_OK) {
292                         worker_error(thr, ret);
293                         return THR_STOP;
294                 }
295
296                 break;
297
298         case LZMA_OK:
299                 // The data was incompressible. Encode it using uncompressed
300                 // LZMA2 chunks.
301                 //
302                 // First wait that we have gotten all the input.
303                 mythread_sync(thr->mutex) {
304                         while (thr->state == THR_RUN)
305                                 mythread_cond_wait(&thr->cond, &thr->mutex);
306
307                         state = thr->state;
308                         in_size = thr->in_size;
309                 }
310
311                 if (state >= THR_STOP)
312                         return state;
313
314                 // Do the encoding. This takes care of the Block Header too.
315                 thr->outbuf->size = 0;
316                 ret = lzma_block_uncomp_encode(&thr->block_options,
317                                 thr->in, in_size, thr->outbuf->buf,
318                                 &thr->outbuf->size, out_size);
319
320                 // It shouldn't fail.
321                 if (ret != LZMA_OK) {
322                         worker_error(thr, LZMA_PROG_ERROR);
323                         return THR_STOP;
324                 }
325
326                 break;
327
328         default:
329                 worker_error(thr, ret);
330                 return THR_STOP;
331         }
332
333         // Set the size information that will be read by the main thread
334         // to write the Index field.
335         thr->outbuf->unpadded_size
336                         = lzma_block_unpadded_size(&thr->block_options);
337         assert(thr->outbuf->unpadded_size != 0);
338         thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
339
340         return THR_FINISH;
341 }
342
343
344 static MYTHREAD_RET_TYPE
345 worker_start(void *thr_ptr)
346 {
347         worker_thread *thr = thr_ptr;
348         worker_state state = THR_IDLE; // Init to silence a warning
349
350         while (true) {
351                 // Wait for work.
352                 mythread_sync(thr->mutex) {
353                         while (true) {
354                                 // The thread is already idle so if we are
355                                 // requested to stop, just set the state.
356                                 if (thr->state == THR_STOP) {
357                                         thr->state = THR_IDLE;
358                                         mythread_cond_signal(&thr->cond);
359                                 }
360
361                                 state = thr->state;
362                                 if (state != THR_IDLE)
363                                         break;
364
365                                 mythread_cond_wait(&thr->cond, &thr->mutex);
366                         }
367                 }
368
369                 assert(state != THR_IDLE);
370                 assert(state != THR_STOP);
371
372                 if (state <= THR_FINISH)
373                         state = worker_encode(thr, state);
374
375                 if (state == THR_EXIT)
376                         break;
377
378                 // Mark the thread as idle unless the main thread has
379                 // told us to exit. Signal is needed for the case
380                 // where the main thread is waiting for the threads to stop.
381                 mythread_sync(thr->mutex) {
382                         if (thr->state != THR_EXIT) {
383                                 thr->state = THR_IDLE;
384                                 mythread_cond_signal(&thr->cond);
385                         }
386                 }
387
388                 mythread_sync(thr->coder->mutex) {
389                         // Mark the output buffer as finished if
390                         // no errors occurred.
391                         thr->outbuf->finished = state == THR_FINISH;
392
393                         // Update the main progress info.
394                         thr->coder->progress_in
395                                         += thr->outbuf->uncompressed_size;
396                         thr->coder->progress_out += thr->outbuf->size;
397                         thr->progress_in = 0;
398                         thr->progress_out = 0;
399
400                         // Return this thread to the stack of free threads.
401                         thr->next = thr->coder->threads_free;
402                         thr->coder->threads_free = thr;
403
404                         mythread_cond_signal(&thr->coder->cond);
405                 }
406         }
407
408         // Exiting, free the resources.
409         mythread_mutex_destroy(&thr->mutex);
410         mythread_cond_destroy(&thr->cond);
411
412         lzma_next_end(&thr->block_encoder, thr->allocator);
413         lzma_free(thr->in, thr->allocator);
414         return MYTHREAD_RET_VALUE;
415 }
416
417
418 /// Make the threads stop but not exit. Optionally wait for them to stop.
419 static void
420 threads_stop(lzma_coder *coder, bool wait_for_threads)
421 {
422         // Tell the threads to stop.
423         for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
424                 mythread_sync(coder->threads[i].mutex) {
425                         coder->threads[i].state = THR_STOP;
426                         mythread_cond_signal(&coder->threads[i].cond);
427                 }
428         }
429
430         if (!wait_for_threads)
431                 return;
432
433         // Wait for the threads to settle in the idle state.
434         for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
435                 mythread_sync(coder->threads[i].mutex) {
436                         while (coder->threads[i].state != THR_IDLE)
437                                 mythread_cond_wait(&coder->threads[i].cond,
438                                                 &coder->threads[i].mutex);
439                 }
440         }
441
442         return;
443 }
444
445
446 /// Stop the threads and free the resources associated with them.
447 /// Wait until the threads have exited.
448 static void
449 threads_end(lzma_coder *coder, const lzma_allocator *allocator)
450 {
451         for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
452                 mythread_sync(coder->threads[i].mutex) {
453                         coder->threads[i].state = THR_EXIT;
454                         mythread_cond_signal(&coder->threads[i].cond);
455                 }
456         }
457
458         for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
459                 int ret = mythread_join(coder->threads[i].thread_id);
460                 assert(ret == 0);
461                 (void)ret;
462         }
463
464         lzma_free(coder->threads, allocator);
465         return;
466 }
467
468
469 /// Initialize a new worker_thread structure and create a new thread.
470 static lzma_ret
471 initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
472 {
473         worker_thread *thr = &coder->threads[coder->threads_initialized];
474
475         thr->in = lzma_alloc(coder->block_size, allocator);
476         if (thr->in == NULL)
477                 return LZMA_MEM_ERROR;
478
479         if (mythread_mutex_init(&thr->mutex))
480                 goto error_mutex;
481
482         if (mythread_cond_init(&thr->cond))
483                 goto error_cond;
484
485         thr->state = THR_IDLE;
486         thr->allocator = allocator;
487         thr->coder = coder;
488         thr->progress_in = 0;
489         thr->progress_out = 0;
490         thr->block_encoder = LZMA_NEXT_CODER_INIT;
491
492         if (mythread_create(&thr->thread_id, &worker_start, thr))
493                 goto error_thread;
494
495         ++coder->threads_initialized;
496         coder->thr = thr;
497
498         return LZMA_OK;
499
500 error_thread:
501         mythread_cond_destroy(&thr->cond);
502
503 error_cond:
504         mythread_mutex_destroy(&thr->mutex);
505
506 error_mutex:
507         lzma_free(thr->in, allocator);
508         return LZMA_MEM_ERROR;
509 }
510
511
512 static lzma_ret
513 get_thread(lzma_coder *coder, const lzma_allocator *allocator)
514 {
515         // If there are no free output subqueues, there is no
516         // point to try getting a thread.
517         if (!lzma_outq_has_buf(&coder->outq))
518                 return LZMA_OK;
519
520         // If there is a free structure on the stack, use it.
521         mythread_sync(coder->mutex) {
522                 if (coder->threads_free != NULL) {
523                         coder->thr = coder->threads_free;
524                         coder->threads_free = coder->threads_free->next;
525                 }
526         }
527
528         if (coder->thr == NULL) {
529                 // If there are no uninitialized structures left, return.
530                 if (coder->threads_initialized == coder->threads_max)
531                         return LZMA_OK;
532
533                 // Initialize a new thread.
534                 return_if_error(initialize_new_thread(coder, allocator));
535         }
536
537         // Reset the parts of the thread state that have to be done
538         // in the main thread.
539         mythread_sync(coder->thr->mutex) {
540                 coder->thr->state = THR_RUN;
541                 coder->thr->in_size = 0;
542                 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
543                 mythread_cond_signal(&coder->thr->cond);
544         }
545
546         return LZMA_OK;
547 }
548
549
550 static lzma_ret
551 stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
552                 const uint8_t *restrict in, size_t *restrict in_pos,
553                 size_t in_size, lzma_action action)
554 {
555         while (*in_pos < in_size
556                         || (coder->thr != NULL && action != LZMA_RUN)) {
557                 if (coder->thr == NULL) {
558                         // Get a new thread.
559                         const lzma_ret ret = get_thread(coder, allocator);
560                         if (coder->thr == NULL)
561                                 return ret;
562                 }
563
564                 // Copy the input data to thread's buffer.
565                 size_t thr_in_size = coder->thr->in_size;
566                 lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
567                                 &thr_in_size, coder->block_size);
568
569                 // Tell the Block encoder to finish if
570                 //  - it has got block_size bytes of input; or
571                 //  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
572                 //    or LZMA_FULL_BARRIER was used.
573                 //
574                 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
575                 const bool finish = thr_in_size == coder->block_size
576                                 || (*in_pos == in_size && action != LZMA_RUN);
577
578                 bool block_error = false;
579
580                 mythread_sync(coder->thr->mutex) {
581                         if (coder->thr->state == THR_IDLE) {
582                                 // Something has gone wrong with the Block
583                                 // encoder. It has set coder->thread_error
584                                 // which we will read a few lines later.
585                                 block_error = true;
586                         } else {
587                                 // Tell the Block encoder its new amount
588                                 // of input and update the state if needed.
589                                 coder->thr->in_size = thr_in_size;
590
591                                 if (finish)
592                                         coder->thr->state = THR_FINISH;
593
594                                 mythread_cond_signal(&coder->thr->cond);
595                         }
596                 }
597
598                 if (block_error) {
599                         lzma_ret ret;
600
601                         mythread_sync(coder->mutex) {
602                                 ret = coder->thread_error;
603                         }
604
605                         return ret;
606                 }
607
608                 if (finish)
609                         coder->thr = NULL;
610         }
611
612         return LZMA_OK;
613 }
614
615
616 /// Wait until more input can be consumed, more output can be read, or
617 /// an optional timeout is reached.
618 static bool
619 wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
620                 bool *has_blocked, bool has_input)
621 {
622         if (coder->timeout != 0 && !*has_blocked) {
623                 // Every time when stream_encode_mt() is called via
624                 // lzma_code(), *has_blocked starts as false. We set it
625                 // to true here and calculate the absolute time when
626                 // we must return if there's nothing to do.
627                 //
628                 // The idea of *has_blocked is to avoid unneeded calls
629                 // to mythread_condtime_set(), which may do a syscall
630                 // depending on the operating system.
631                 *has_blocked = true;
632                 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
633         }
634
635         bool timed_out = false;
636
637         mythread_sync(coder->mutex) {
638                 // There are four things that we wait. If one of them
639                 // becomes possible, we return.
640                 //  - If there is input left, we need to get a free
641                 //    worker thread and an output buffer for it.
642                 //  - Data ready to be read from the output queue.
643                 //  - A worker thread indicates an error.
644                 //  - Time out occurs.
645                 while ((!has_input || coder->threads_free == NULL
646                                         || !lzma_outq_has_buf(&coder->outq))
647                                 && !lzma_outq_is_readable(&coder->outq)
648                                 && coder->thread_error == LZMA_OK
649                                 && !timed_out) {
650                         if (coder->timeout != 0)
651                                 timed_out = mythread_cond_timedwait(
652                                                 &coder->cond, &coder->mutex,
653                                                 wait_abs) != 0;
654                         else
655                                 mythread_cond_wait(&coder->cond,
656                                                 &coder->mutex);
657                 }
658         }
659
660         return timed_out;
661 }
662
663
664 static lzma_ret
665 stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
666                 const uint8_t *restrict in, size_t *restrict in_pos,
667                 size_t in_size, uint8_t *restrict out,
668                 size_t *restrict out_pos, size_t out_size, lzma_action action)
669 {
670         switch (coder->sequence) {
671         case SEQ_STREAM_HEADER:
672                 lzma_bufcpy(coder->header, &coder->header_pos,
673                                 sizeof(coder->header),
674                                 out, out_pos, out_size);
675                 if (coder->header_pos < sizeof(coder->header))
676                         return LZMA_OK;
677
678                 coder->header_pos = 0;
679                 coder->sequence = SEQ_BLOCK;
680
681         // Fall through
682
683         case SEQ_BLOCK: {
684                 // Initialized to silence warnings.
685                 lzma_vli unpadded_size = 0;
686                 lzma_vli uncompressed_size = 0;
687                 lzma_ret ret = LZMA_OK;
688
689                 // These are for wait_for_work().
690                 bool has_blocked = false;
691                 mythread_condtime wait_abs;
692
693                 while (true) {
694                         mythread_sync(coder->mutex) {
695                                 // Check for Block encoder errors.
696                                 ret = coder->thread_error;
697                                 if (ret != LZMA_OK) {
698                                         assert(ret != LZMA_STREAM_END);
699                                         break;
700                                 }
701
702                                 // Try to read compressed data to out[].
703                                 ret = lzma_outq_read(&coder->outq,
704                                                 out, out_pos, out_size,
705                                                 &unpadded_size,
706                                                 &uncompressed_size);
707                         }
708
709                         if (ret == LZMA_STREAM_END) {
710                                 // End of Block. Add it to the Index.
711                                 ret = lzma_index_append(coder->index,
712                                                 allocator, unpadded_size,
713                                                 uncompressed_size);
714
715                                 // If we didn't fill the output buffer yet,
716                                 // try to read more data. Maybe the next
717                                 // outbuf has been finished already too.
718                                 if (*out_pos < out_size)
719                                         continue;
720                         }
721
722                         if (ret != LZMA_OK) {
723                                 // coder->thread_error was set or
724                                 // lzma_index_append() failed.
725                                 threads_stop(coder, false);
726                                 return ret;
727                         }
728
729                         // Try to give uncompressed data to a worker thread.
730                         ret = stream_encode_in(coder, allocator,
731                                         in, in_pos, in_size, action);
732                         if (ret != LZMA_OK) {
733                                 threads_stop(coder, false);
734                                 return ret;
735                         }
736
737                         // See if we should wait or return.
738                         //
739                         // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
740                         if (*in_pos == in_size) {
741                                 // LZMA_RUN: More data is probably coming
742                                 // so return to let the caller fill the
743                                 // input buffer.
744                                 if (action == LZMA_RUN)
745                                         return LZMA_OK;
746
747                                 // LZMA_FULL_BARRIER: The same as with
748                                 // LZMA_RUN but tell the caller that the
749                                 // barrier was completed.
750                                 if (action == LZMA_FULL_BARRIER)
751                                         return LZMA_STREAM_END;
752
753                                 // Finishing or flushing isn't completed until
754                                 // all input data has been encoded and copied
755                                 // to the output buffer.
756                                 if (lzma_outq_is_empty(&coder->outq)) {
757                                         // LZMA_FINISH: Continue to encode
758                                         // the Index field.
759                                         if (action == LZMA_FINISH)
760                                                 break;
761
762                                         // LZMA_FULL_FLUSH: Return to tell
763                                         // the caller that flushing was
764                                         // completed.
765                                         if (action == LZMA_FULL_FLUSH)
766                                                 return LZMA_STREAM_END;
767                                 }
768                         }
769
770                         // Return if there is no output space left.
771                         // This check must be done after testing the input
772                         // buffer, because we might want to use a different
773                         // return code.
774                         if (*out_pos == out_size)
775                                 return LZMA_OK;
776
777                         // Neither in nor out has been used completely.
778                         // Wait until there's something we can do.
779                         if (wait_for_work(coder, &wait_abs, &has_blocked,
780                                         *in_pos < in_size))
781                                 return LZMA_TIMED_OUT;
782                 }
783
784                 // All Blocks have been encoded and the threads have stopped.
785                 // Prepare to encode the Index field.
786                 return_if_error(lzma_index_encoder_init(
787                                 &coder->index_encoder, allocator,
788                                 coder->index));
789                 coder->sequence = SEQ_INDEX;
790
791                 // Update the progress info to take the Index and
792                 // Stream Footer into account. Those are very fast to encode
793                 // so in terms of progress information they can be thought
794                 // to be ready to be copied out.
795                 coder->progress_out += lzma_index_size(coder->index)
796                                 + LZMA_STREAM_HEADER_SIZE;
797         }
798
799         // Fall through
800
801         case SEQ_INDEX: {
802                 // Call the Index encoder. It doesn't take any input, so
803                 // those pointers can be NULL.
804                 const lzma_ret ret = coder->index_encoder.code(
805                                 coder->index_encoder.coder, allocator,
806                                 NULL, NULL, 0,
807                                 out, out_pos, out_size, LZMA_RUN);
808                 if (ret != LZMA_STREAM_END)
809                         return ret;
810
811                 // Encode the Stream Footer into coder->buffer.
812                 coder->stream_flags.backward_size
813                                 = lzma_index_size(coder->index);
814                 if (lzma_stream_footer_encode(&coder->stream_flags,
815                                 coder->header) != LZMA_OK)
816                         return LZMA_PROG_ERROR;
817
818                 coder->sequence = SEQ_STREAM_FOOTER;
819         }
820
821         // Fall through
822
823         case SEQ_STREAM_FOOTER:
824                 lzma_bufcpy(coder->header, &coder->header_pos,
825                                 sizeof(coder->header),
826                                 out, out_pos, out_size);
827                 return coder->header_pos < sizeof(coder->header)
828                                 ? LZMA_OK : LZMA_STREAM_END;
829         }
830
831         assert(0);
832         return LZMA_PROG_ERROR;
833 }
834
835
836 static void
837 stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator)
838 {
839         // Threads must be killed before the output queue can be freed.
840         threads_end(coder, allocator);
841         lzma_outq_end(&coder->outq, allocator);
842
843         for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
844                 lzma_free(coder->filters[i].options, allocator);
845
846         lzma_next_end(&coder->index_encoder, allocator);
847         lzma_index_end(coder->index, allocator);
848
849         mythread_cond_destroy(&coder->cond);
850         mythread_mutex_destroy(&coder->mutex);
851
852         lzma_free(coder, allocator);
853         return;
854 }
855
856
857 /// Options handling for lzma_stream_encoder_mt_init() and
858 /// lzma_stream_encoder_mt_memusage()
859 static lzma_ret
860 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
861                 const lzma_filter **filters, uint64_t *block_size,
862                 uint64_t *outbuf_size_max)
863 {
864         // Validate some of the options.
865         if (options == NULL)
866                 return LZMA_PROG_ERROR;
867
868         if (options->flags != 0 || options->threads == 0
869                         || options->threads > LZMA_THREADS_MAX)
870                 return LZMA_OPTIONS_ERROR;
871
872         if (options->filters != NULL) {
873                 // Filter chain was given, use it as is.
874                 *filters = options->filters;
875         } else {
876                 // Use a preset.
877                 if (lzma_easy_preset(opt_easy, options->preset))
878                         return LZMA_OPTIONS_ERROR;
879
880                 *filters = opt_easy->filters;
881         }
882
883         // Block size
884         if (options->block_size > 0) {
885                 if (options->block_size > BLOCK_SIZE_MAX)
886                         return LZMA_OPTIONS_ERROR;
887
888                 *block_size = options->block_size;
889         } else {
890                 // Determine the Block size from the filter chain.
891                 *block_size = lzma_mt_block_size(*filters);
892                 if (*block_size == 0)
893                         return LZMA_OPTIONS_ERROR;
894
895                 assert(*block_size <= BLOCK_SIZE_MAX);
896         }
897
898         // Calculate the maximum amount output that a single output buffer
899         // may need to hold. This is the same as the maximum total size of
900         // a Block.
901         *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
902         if (*outbuf_size_max == 0)
903                 return LZMA_MEM_ERROR;
904
905         return LZMA_OK;
906 }
907
908
909 static void
910 get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
911 {
912         // Lock coder->mutex to prevent finishing threads from moving their
913         // progress info from the worker_thread structure to lzma_coder.
914         mythread_sync(coder->mutex) {
915                 *progress_in = coder->progress_in;
916                 *progress_out = coder->progress_out;
917
918                 for (size_t i = 0; i < coder->threads_initialized; ++i) {
919                         mythread_sync(coder->threads[i].mutex) {
920                                 *progress_in += coder->threads[i].progress_in;
921                                 *progress_out += coder->threads[i]
922                                                 .progress_out;
923                         }
924                 }
925         }
926
927         return;
928 }
929
930
931 static lzma_ret
932 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
933                 const lzma_mt *options)
934 {
935         lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
936
937         // Get the filter chain.
938         lzma_options_easy easy;
939         const lzma_filter *filters;
940         uint64_t block_size;
941         uint64_t outbuf_size_max;
942         return_if_error(get_options(options, &easy, &filters,
943                         &block_size, &outbuf_size_max));
944
945 #if SIZE_MAX < UINT64_MAX
946         if (block_size > SIZE_MAX)
947                 return LZMA_MEM_ERROR;
948 #endif
949
950         // Validate the filter chain so that we can give an error in this
951         // function instead of delaying it to the first call to lzma_code().
952         // The memory usage calculation verifies the filter chain as
953         // a side effect so we take advatange of that.
954         if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
955                 return LZMA_OPTIONS_ERROR;
956
957         // Validate the Check ID.
958         if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
959                 return LZMA_PROG_ERROR;
960
961         if (!lzma_check_is_supported(options->check))
962                 return LZMA_UNSUPPORTED_CHECK;
963
964         // Allocate and initialize the base structure if needed.
965         if (next->coder == NULL) {
966                 next->coder = lzma_alloc(sizeof(lzma_coder), allocator);
967                 if (next->coder == NULL)
968                         return LZMA_MEM_ERROR;
969
970                 // For the mutex and condition variable initializations
971                 // the error handling has to be done here because
972                 // stream_encoder_mt_end() doesn't know if they have
973                 // already been initialized or not.
974                 if (mythread_mutex_init(&next->coder->mutex)) {
975                         lzma_free(next->coder, allocator);
976                         next->coder = NULL;
977                         return LZMA_MEM_ERROR;
978                 }
979
980                 if (mythread_cond_init(&next->coder->cond)) {
981                         mythread_mutex_destroy(&next->coder->mutex);
982                         lzma_free(next->coder, allocator);
983                         next->coder = NULL;
984                         return LZMA_MEM_ERROR;
985                 }
986
987                 next->code = &stream_encode_mt;
988                 next->end = &stream_encoder_mt_end;
989                 next->get_progress = &get_progress;
990 //              next->update = &stream_encoder_mt_update;
991
992                 next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
993                 next->coder->index_encoder = LZMA_NEXT_CODER_INIT;
994                 next->coder->index = NULL;
995                 memzero(&next->coder->outq, sizeof(next->coder->outq));
996                 next->coder->threads = NULL;
997                 next->coder->threads_max = 0;
998                 next->coder->threads_initialized = 0;
999         }
1000
1001         // Basic initializations
1002         next->coder->sequence = SEQ_STREAM_HEADER;
1003         next->coder->block_size = (size_t)(block_size);
1004         next->coder->thread_error = LZMA_OK;
1005         next->coder->thr = NULL;
1006
1007         // Allocate the thread-specific base structures.
1008         assert(options->threads > 0);
1009         if (next->coder->threads_max != options->threads) {
1010                 threads_end(next->coder, allocator);
1011
1012                 next->coder->threads = NULL;
1013                 next->coder->threads_max = 0;
1014
1015                 next->coder->threads_initialized = 0;
1016                 next->coder->threads_free = NULL;
1017
1018                 next->coder->threads = lzma_alloc(
1019                                 options->threads * sizeof(worker_thread),
1020                                 allocator);
1021                 if (next->coder->threads == NULL)
1022                         return LZMA_MEM_ERROR;
1023
1024                 next->coder->threads_max = options->threads;
1025         } else {
1026                 // Reuse the old structures and threads. Tell the running
1027                 // threads to stop and wait until they have stopped.
1028                 threads_stop(next->coder, true);
1029         }
1030
1031         // Output queue
1032         return_if_error(lzma_outq_init(&next->coder->outq, allocator,
1033                         outbuf_size_max, options->threads));
1034
1035         // Timeout
1036         next->coder->timeout = options->timeout;
1037
1038         // Free the old filter chain and copy the new one.
1039         for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1040                 lzma_free(next->coder->filters[i].options, allocator);
1041
1042         return_if_error(lzma_filters_copy(
1043                         filters, next->coder->filters, allocator));
1044
1045         // Index
1046         lzma_index_end(next->coder->index, allocator);
1047         next->coder->index = lzma_index_init(allocator);
1048         if (next->coder->index == NULL)
1049                 return LZMA_MEM_ERROR;
1050
1051         // Stream Header
1052         next->coder->stream_flags.version = 0;
1053         next->coder->stream_flags.check = options->check;
1054         return_if_error(lzma_stream_header_encode(
1055                         &next->coder->stream_flags, next->coder->header));
1056
1057         next->coder->header_pos = 0;
1058
1059         // Progress info
1060         next->coder->progress_in = 0;
1061         next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1062
1063         return LZMA_OK;
1064 }
1065
1066
1067 extern LZMA_API(lzma_ret)
1068 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1069 {
1070         lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1071
1072         strm->internal->supported_actions[LZMA_RUN] = true;
1073 //      strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1074         strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1075         strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1076         strm->internal->supported_actions[LZMA_FINISH] = true;
1077
1078         return LZMA_OK;
1079 }
1080
1081
1082 // This function name is a monster but it's consistent with the older
1083 // monster names. :-( 31 chars is the max that C99 requires so in that
1084 // sense it's not too long. ;-)
1085 extern LZMA_API(uint64_t)
1086 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1087 {
1088         lzma_options_easy easy;
1089         const lzma_filter *filters;
1090         uint64_t block_size;
1091         uint64_t outbuf_size_max;
1092
1093         if (get_options(options, &easy, &filters, &block_size,
1094                         &outbuf_size_max) != LZMA_OK)
1095                 return UINT64_MAX;
1096
1097         // Memory usage of the input buffers
1098         const uint64_t inbuf_memusage = options->threads * block_size;
1099
1100         // Memory usage of the filter encoders
1101         uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1102         if (filters_memusage == UINT64_MAX)
1103                 return UINT64_MAX;
1104
1105         filters_memusage *= options->threads;
1106
1107         // Memory usage of the output queue
1108         const uint64_t outq_memusage = lzma_outq_memusage(
1109                         outbuf_size_max, options->threads);
1110         if (outq_memusage == UINT64_MAX)
1111                 return UINT64_MAX;
1112
1113         // Sum them with overflow checking.
1114         uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder)
1115                         + options->threads * sizeof(worker_thread);
1116
1117         if (UINT64_MAX - total_memusage < inbuf_memusage)
1118                 return UINT64_MAX;
1119
1120         total_memusage += inbuf_memusage;
1121
1122         if (UINT64_MAX - total_memusage < filters_memusage)
1123                 return UINT64_MAX;
1124
1125         total_memusage += filters_memusage;
1126
1127         if (UINT64_MAX - total_memusage < outq_memusage)
1128                 return UINT64_MAX;
1129
1130         return total_memusage + outq_memusage;
1131 }