1 ///////////////////////////////////////////////////////////////////////////////
3 /// \file stream_encoder_mt.c
4 /// \brief Multithreaded .xz Stream encoder
6 // Author: Lasse Collin
8 // This file has been put into the public domain.
9 // You can do whatever you want with this file.
11 ///////////////////////////////////////////////////////////////////////////////
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
30 /// Encoding is in progress.
33 /// Encoding is in progress but no more input data will
37 /// The main thread wants the thread to stop whatever it was doing
41 /// The main thread wants the thread to exit. We could use
42 /// cancellation but since there's stopped anyway, this is lazier.
47 typedef struct lzma_stream_coder_s lzma_stream_coder;
49 typedef struct worker_thread_s worker_thread;
50 struct worker_thread_s {
53 /// Input buffer of coder->block_size bytes. The main thread will
54 /// put new input into this and update in_size accordingly. Once
55 /// no more input is coming, state will be set to THR_FINISH.
58 /// Amount of data available in the input buffer. This is modified
59 /// only by the main thread.
62 /// Output buffer for this thread. This is set by the main
63 /// thread every time a new Block is started with this thread
67 /// Pointer to the main structure is needed when putting this
68 /// thread back to the stack of free threads.
69 lzma_stream_coder *coder;
71 /// The allocator is set by the main thread. Since a copy of the
72 /// pointer is kept here, the application must not change the
73 /// allocator before calling lzma_end().
74 const lzma_allocator *allocator;
76 /// Amount of uncompressed data that has already been compressed.
79 /// Amount of compressed data that is ready.
80 uint64_t progress_out;
83 lzma_next_coder block_encoder;
85 /// Compression options for this Block
86 lzma_block block_options;
88 /// Next structure in the stack of free worker threads.
94 /// The ID of this thread is used to join the thread
95 /// when it's not needed anymore.
100 struct lzma_stream_coder_s {
108 /// Start a new Block every block_size bytes of input unless
109 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
112 /// The filter chain currently in use
113 lzma_filter filters[LZMA_FILTERS_MAX + 1];
116 /// Index to hold sizes of the Blocks
120 lzma_next_coder index_encoder;
123 /// Stream Flags for encoding the Stream Header and Stream Footer.
124 lzma_stream_flags stream_flags;
126 /// Buffer to hold Stream Header and Stream Footer.
127 uint8_t header[LZMA_STREAM_HEADER_SIZE];
129 /// Read position in header[]
133 /// Output buffer queue for compressed data
137 /// Maximum wait time if cannot use all the input and cannot
138 /// fill the output buffer. This is in milliseconds.
142 /// Error code from a worker thread
143 lzma_ret thread_error;
145 /// Array of allocated thread-specific structures
146 worker_thread *threads;
148 /// Number of structures in "threads" above. This is also the
149 /// number of threads that will be created at maximum.
150 uint32_t threads_max;
152 /// Number of thread structures that have been initialized, and
153 /// thus the number of worker threads actually created so far.
154 uint32_t threads_initialized;
156 /// Stack of free threads. When a thread finishes, it puts itself
157 /// back into this stack. This starts as empty because threads
158 /// are created only when actually needed.
159 worker_thread *threads_free;
161 /// The most recent worker thread to which the main thread writes
162 /// the new input from the application.
166 /// Amount of uncompressed data in Blocks that have already
168 uint64_t progress_in;
170 /// Amount of compressed data in Stream Header + Blocks that
171 /// have already been finished.
172 uint64_t progress_out;
175 mythread_mutex mutex;
180 /// Tell the main thread that something has gone wrong.
182 worker_error(worker_thread *thr, lzma_ret ret)
184 assert(ret != LZMA_OK);
185 assert(ret != LZMA_STREAM_END);
187 mythread_sync(thr->coder->mutex) {
188 if (thr->coder->thread_error == LZMA_OK)
189 thr->coder->thread_error = ret;
191 mythread_cond_signal(&thr->coder->cond);
199 worker_encode(worker_thread *thr, worker_state state)
201 assert(thr->progress_in == 0);
202 assert(thr->progress_out == 0);
204 // Set the Block options.
205 thr->block_options = (lzma_block){
207 .check = thr->coder->stream_flags.check,
208 .compressed_size = thr->coder->outq.buf_size_max,
209 .uncompressed_size = thr->coder->block_size,
211 // TODO: To allow changing the filter chain, the filters
212 // array must be copied to each worker_thread.
213 .filters = thr->coder->filters,
216 // Calculate maximum size of the Block Header. This amount is
217 // reserved in the beginning of the buffer so that Block Header
218 // along with Compressed Size and Uncompressed Size can be
220 lzma_ret ret = lzma_block_header_size(&thr->block_options);
221 if (ret != LZMA_OK) {
222 worker_error(thr, ret);
226 // Initialize the Block encoder.
227 ret = lzma_block_encoder_init(&thr->block_encoder,
228 thr->allocator, &thr->block_options);
229 if (ret != LZMA_OK) {
230 worker_error(thr, ret);
237 thr->outbuf->size = thr->block_options.header_size;
238 const size_t out_size = thr->coder->outq.buf_size_max;
241 mythread_sync(thr->mutex) {
242 // Store in_pos and out_pos into *thr so that
243 // an application may read them via
244 // lzma_get_progress() to get progress information.
246 // NOTE: These aren't updated when the encoding
247 // finishes. Instead, the final values are taken
248 // later from thr->outbuf.
249 thr->progress_in = in_pos;
250 thr->progress_out = thr->outbuf->size;
252 while (in_size == thr->in_size
253 && thr->state == THR_RUN)
254 mythread_cond_wait(&thr->cond, &thr->mutex);
257 in_size = thr->in_size;
260 // Return if we were asked to stop or exit.
261 if (state >= THR_STOP)
264 lzma_action action = state == THR_FINISH
265 ? LZMA_FINISH : LZMA_RUN;
267 // Limit the amount of input given to the Block encoder
268 // at once. This way this thread can react fairly quickly
269 // if the main thread wants us to stop or exit.
270 static const size_t in_chunk_max = 16384;
271 size_t in_limit = in_size;
272 if (in_size - in_pos > in_chunk_max) {
273 in_limit = in_pos + in_chunk_max;
277 ret = thr->block_encoder.code(
278 thr->block_encoder.coder, thr->allocator,
279 thr->in, &in_pos, in_limit, thr->outbuf->buf,
280 &thr->outbuf->size, out_size, action);
281 } while (ret == LZMA_OK && thr->outbuf->size < out_size);
284 case LZMA_STREAM_END:
285 assert(state == THR_FINISH);
287 // Encode the Block Header. By doing it after
288 // the compression, we can store the Compressed Size
289 // and Uncompressed Size fields.
290 ret = lzma_block_header_encode(&thr->block_options,
292 if (ret != LZMA_OK) {
293 worker_error(thr, ret);
300 // The data was incompressible. Encode it using uncompressed
303 // First wait that we have gotten all the input.
304 mythread_sync(thr->mutex) {
305 while (thr->state == THR_RUN)
306 mythread_cond_wait(&thr->cond, &thr->mutex);
309 in_size = thr->in_size;
312 if (state >= THR_STOP)
315 // Do the encoding. This takes care of the Block Header too.
316 thr->outbuf->size = 0;
317 ret = lzma_block_uncomp_encode(&thr->block_options,
318 thr->in, in_size, thr->outbuf->buf,
319 &thr->outbuf->size, out_size);
321 // It shouldn't fail.
322 if (ret != LZMA_OK) {
323 worker_error(thr, LZMA_PROG_ERROR);
330 worker_error(thr, ret);
334 // Set the size information that will be read by the main thread
335 // to write the Index field.
336 thr->outbuf->unpadded_size
337 = lzma_block_unpadded_size(&thr->block_options);
338 assert(thr->outbuf->unpadded_size != 0);
339 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
345 static MYTHREAD_RET_TYPE
346 worker_start(void *thr_ptr)
348 worker_thread *thr = thr_ptr;
349 worker_state state = THR_IDLE; // Init to silence a warning
353 mythread_sync(thr->mutex) {
355 // The thread is already idle so if we are
356 // requested to stop, just set the state.
357 if (thr->state == THR_STOP) {
358 thr->state = THR_IDLE;
359 mythread_cond_signal(&thr->cond);
363 if (state != THR_IDLE)
366 mythread_cond_wait(&thr->cond, &thr->mutex);
370 assert(state != THR_IDLE);
371 assert(state != THR_STOP);
373 if (state <= THR_FINISH)
374 state = worker_encode(thr, state);
376 if (state == THR_EXIT)
379 // Mark the thread as idle unless the main thread has
380 // told us to exit. Signal is needed for the case
381 // where the main thread is waiting for the threads to stop.
382 mythread_sync(thr->mutex) {
383 if (thr->state != THR_EXIT) {
384 thr->state = THR_IDLE;
385 mythread_cond_signal(&thr->cond);
389 mythread_sync(thr->coder->mutex) {
390 // Mark the output buffer as finished if
391 // no errors occurred.
392 thr->outbuf->finished = state == THR_FINISH;
394 // Update the main progress info.
395 thr->coder->progress_in
396 += thr->outbuf->uncompressed_size;
397 thr->coder->progress_out += thr->outbuf->size;
398 thr->progress_in = 0;
399 thr->progress_out = 0;
401 // Return this thread to the stack of free threads.
402 thr->next = thr->coder->threads_free;
403 thr->coder->threads_free = thr;
405 mythread_cond_signal(&thr->coder->cond);
409 // Exiting, free the resources.
410 mythread_mutex_destroy(&thr->mutex);
411 mythread_cond_destroy(&thr->cond);
413 lzma_next_end(&thr->block_encoder, thr->allocator);
414 lzma_free(thr->in, thr->allocator);
415 return MYTHREAD_RET_VALUE;
419 /// Make the threads stop but not exit. Optionally wait for them to stop.
421 threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
423 // Tell the threads to stop.
424 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
425 mythread_sync(coder->threads[i].mutex) {
426 coder->threads[i].state = THR_STOP;
427 mythread_cond_signal(&coder->threads[i].cond);
431 if (!wait_for_threads)
434 // Wait for the threads to settle in the idle state.
435 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
436 mythread_sync(coder->threads[i].mutex) {
437 while (coder->threads[i].state != THR_IDLE)
438 mythread_cond_wait(&coder->threads[i].cond,
439 &coder->threads[i].mutex);
447 /// Stop the threads and free the resources associated with them.
448 /// Wait until the threads have exited.
450 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
452 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453 mythread_sync(coder->threads[i].mutex) {
454 coder->threads[i].state = THR_EXIT;
455 mythread_cond_signal(&coder->threads[i].cond);
459 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460 int ret = mythread_join(coder->threads[i].thread_id);
465 lzma_free(coder->threads, allocator);
470 /// Initialize a new worker_thread structure and create a new thread.
472 initialize_new_thread(lzma_stream_coder *coder,
473 const lzma_allocator *allocator)
475 worker_thread *thr = &coder->threads[coder->threads_initialized];
477 thr->in = lzma_alloc(coder->block_size, allocator);
479 return LZMA_MEM_ERROR;
481 if (mythread_mutex_init(&thr->mutex))
484 if (mythread_cond_init(&thr->cond))
487 thr->state = THR_IDLE;
488 thr->allocator = allocator;
490 thr->progress_in = 0;
491 thr->progress_out = 0;
492 thr->block_encoder = LZMA_NEXT_CODER_INIT;
494 if (mythread_create(&thr->thread_id, &worker_start, thr))
497 ++coder->threads_initialized;
503 mythread_cond_destroy(&thr->cond);
506 mythread_mutex_destroy(&thr->mutex);
509 lzma_free(thr->in, allocator);
510 return LZMA_MEM_ERROR;
515 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
517 // If there are no free output subqueues, there is no
518 // point to try getting a thread.
519 if (!lzma_outq_has_buf(&coder->outq))
522 // If there is a free structure on the stack, use it.
523 mythread_sync(coder->mutex) {
524 if (coder->threads_free != NULL) {
525 coder->thr = coder->threads_free;
526 coder->threads_free = coder->threads_free->next;
530 if (coder->thr == NULL) {
531 // If there are no uninitialized structures left, return.
532 if (coder->threads_initialized == coder->threads_max)
535 // Initialize a new thread.
536 return_if_error(initialize_new_thread(coder, allocator));
539 // Reset the parts of the thread state that have to be done
540 // in the main thread.
541 mythread_sync(coder->thr->mutex) {
542 coder->thr->state = THR_RUN;
543 coder->thr->in_size = 0;
544 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
545 mythread_cond_signal(&coder->thr->cond);
553 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
554 const uint8_t *restrict in, size_t *restrict in_pos,
555 size_t in_size, lzma_action action)
557 while (*in_pos < in_size
558 || (coder->thr != NULL && action != LZMA_RUN)) {
559 if (coder->thr == NULL) {
561 const lzma_ret ret = get_thread(coder, allocator);
562 if (coder->thr == NULL)
566 // Copy the input data to thread's buffer.
567 size_t thr_in_size = coder->thr->in_size;
568 lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
569 &thr_in_size, coder->block_size);
571 // Tell the Block encoder to finish if
572 // - it has got block_size bytes of input; or
573 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
574 // or LZMA_FULL_BARRIER was used.
576 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
577 const bool finish = thr_in_size == coder->block_size
578 || (*in_pos == in_size && action != LZMA_RUN);
580 bool block_error = false;
582 mythread_sync(coder->thr->mutex) {
583 if (coder->thr->state == THR_IDLE) {
584 // Something has gone wrong with the Block
585 // encoder. It has set coder->thread_error
586 // which we will read a few lines later.
589 // Tell the Block encoder its new amount
590 // of input and update the state if needed.
591 coder->thr->in_size = thr_in_size;
594 coder->thr->state = THR_FINISH;
596 mythread_cond_signal(&coder->thr->cond);
603 mythread_sync(coder->mutex) {
604 ret = coder->thread_error;
618 /// Wait until more input can be consumed, more output can be read, or
619 /// an optional timeout is reached.
621 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
622 bool *has_blocked, bool has_input)
624 if (coder->timeout != 0 && !*has_blocked) {
625 // Every time when stream_encode_mt() is called via
626 // lzma_code(), *has_blocked starts as false. We set it
627 // to true here and calculate the absolute time when
628 // we must return if there's nothing to do.
630 // The idea of *has_blocked is to avoid unneeded calls
631 // to mythread_condtime_set(), which may do a syscall
632 // depending on the operating system.
634 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
637 bool timed_out = false;
639 mythread_sync(coder->mutex) {
640 // There are four things that we wait. If one of them
641 // becomes possible, we return.
642 // - If there is input left, we need to get a free
643 // worker thread and an output buffer for it.
644 // - Data ready to be read from the output queue.
645 // - A worker thread indicates an error.
646 // - Time out occurs.
647 while ((!has_input || coder->threads_free == NULL
648 || !lzma_outq_has_buf(&coder->outq))
649 && !lzma_outq_is_readable(&coder->outq)
650 && coder->thread_error == LZMA_OK
652 if (coder->timeout != 0)
653 timed_out = mythread_cond_timedwait(
654 &coder->cond, &coder->mutex,
657 mythread_cond_wait(&coder->cond,
667 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
668 const uint8_t *restrict in, size_t *restrict in_pos,
669 size_t in_size, uint8_t *restrict out,
670 size_t *restrict out_pos, size_t out_size, lzma_action action)
672 lzma_stream_coder *coder = coder_ptr;
674 switch (coder->sequence) {
675 case SEQ_STREAM_HEADER:
676 lzma_bufcpy(coder->header, &coder->header_pos,
677 sizeof(coder->header),
678 out, out_pos, out_size);
679 if (coder->header_pos < sizeof(coder->header))
682 coder->header_pos = 0;
683 coder->sequence = SEQ_BLOCK;
688 // Initialized to silence warnings.
689 lzma_vli unpadded_size = 0;
690 lzma_vli uncompressed_size = 0;
691 lzma_ret ret = LZMA_OK;
693 // These are for wait_for_work().
694 bool has_blocked = false;
695 mythread_condtime wait_abs;
698 mythread_sync(coder->mutex) {
699 // Check for Block encoder errors.
700 ret = coder->thread_error;
701 if (ret != LZMA_OK) {
702 assert(ret != LZMA_STREAM_END);
706 // Try to read compressed data to out[].
707 ret = lzma_outq_read(&coder->outq,
708 out, out_pos, out_size,
713 if (ret == LZMA_STREAM_END) {
714 // End of Block. Add it to the Index.
715 ret = lzma_index_append(coder->index,
716 allocator, unpadded_size,
719 // If we didn't fill the output buffer yet,
720 // try to read more data. Maybe the next
721 // outbuf has been finished already too.
722 if (*out_pos < out_size)
726 if (ret != LZMA_OK) {
727 // coder->thread_error was set or
728 // lzma_index_append() failed.
729 threads_stop(coder, false);
733 // Try to give uncompressed data to a worker thread.
734 ret = stream_encode_in(coder, allocator,
735 in, in_pos, in_size, action);
736 if (ret != LZMA_OK) {
737 threads_stop(coder, false);
741 // See if we should wait or return.
743 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
744 if (*in_pos == in_size) {
745 // LZMA_RUN: More data is probably coming
746 // so return to let the caller fill the
748 if (action == LZMA_RUN)
751 // LZMA_FULL_BARRIER: The same as with
752 // LZMA_RUN but tell the caller that the
753 // barrier was completed.
754 if (action == LZMA_FULL_BARRIER)
755 return LZMA_STREAM_END;
757 // Finishing or flushing isn't completed until
758 // all input data has been encoded and copied
759 // to the output buffer.
760 if (lzma_outq_is_empty(&coder->outq)) {
761 // LZMA_FINISH: Continue to encode
763 if (action == LZMA_FINISH)
766 // LZMA_FULL_FLUSH: Return to tell
767 // the caller that flushing was
769 if (action == LZMA_FULL_FLUSH)
770 return LZMA_STREAM_END;
774 // Return if there is no output space left.
775 // This check must be done after testing the input
776 // buffer, because we might want to use a different
778 if (*out_pos == out_size)
781 // Neither in nor out has been used completely.
782 // Wait until there's something we can do.
783 if (wait_for_work(coder, &wait_abs, &has_blocked,
785 return LZMA_TIMED_OUT;
788 // All Blocks have been encoded and the threads have stopped.
789 // Prepare to encode the Index field.
790 return_if_error(lzma_index_encoder_init(
791 &coder->index_encoder, allocator,
793 coder->sequence = SEQ_INDEX;
795 // Update the progress info to take the Index and
796 // Stream Footer into account. Those are very fast to encode
797 // so in terms of progress information they can be thought
798 // to be ready to be copied out.
799 coder->progress_out += lzma_index_size(coder->index)
800 + LZMA_STREAM_HEADER_SIZE;
806 // Call the Index encoder. It doesn't take any input, so
807 // those pointers can be NULL.
808 const lzma_ret ret = coder->index_encoder.code(
809 coder->index_encoder.coder, allocator,
811 out, out_pos, out_size, LZMA_RUN);
812 if (ret != LZMA_STREAM_END)
815 // Encode the Stream Footer into coder->buffer.
816 coder->stream_flags.backward_size
817 = lzma_index_size(coder->index);
818 if (lzma_stream_footer_encode(&coder->stream_flags,
819 coder->header) != LZMA_OK)
820 return LZMA_PROG_ERROR;
822 coder->sequence = SEQ_STREAM_FOOTER;
827 case SEQ_STREAM_FOOTER:
828 lzma_bufcpy(coder->header, &coder->header_pos,
829 sizeof(coder->header),
830 out, out_pos, out_size);
831 return coder->header_pos < sizeof(coder->header)
832 ? LZMA_OK : LZMA_STREAM_END;
836 return LZMA_PROG_ERROR;
841 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
843 lzma_stream_coder *coder = coder_ptr;
845 // Threads must be killed before the output queue can be freed.
846 threads_end(coder, allocator);
847 lzma_outq_end(&coder->outq, allocator);
849 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
850 lzma_free(coder->filters[i].options, allocator);
852 lzma_next_end(&coder->index_encoder, allocator);
853 lzma_index_end(coder->index, allocator);
855 mythread_cond_destroy(&coder->cond);
856 mythread_mutex_destroy(&coder->mutex);
858 lzma_free(coder, allocator);
863 /// Options handling for lzma_stream_encoder_mt_init() and
864 /// lzma_stream_encoder_mt_memusage()
866 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
867 const lzma_filter **filters, uint64_t *block_size,
868 uint64_t *outbuf_size_max)
870 // Validate some of the options.
872 return LZMA_PROG_ERROR;
874 if (options->flags != 0 || options->threads == 0
875 || options->threads > LZMA_THREADS_MAX)
876 return LZMA_OPTIONS_ERROR;
878 if (options->filters != NULL) {
879 // Filter chain was given, use it as is.
880 *filters = options->filters;
883 if (lzma_easy_preset(opt_easy, options->preset))
884 return LZMA_OPTIONS_ERROR;
886 *filters = opt_easy->filters;
890 if (options->block_size > 0) {
891 if (options->block_size > BLOCK_SIZE_MAX)
892 return LZMA_OPTIONS_ERROR;
894 *block_size = options->block_size;
896 // Determine the Block size from the filter chain.
897 *block_size = lzma_mt_block_size(*filters);
898 if (*block_size == 0)
899 return LZMA_OPTIONS_ERROR;
901 assert(*block_size <= BLOCK_SIZE_MAX);
904 // Calculate the maximum amount output that a single output buffer
905 // may need to hold. This is the same as the maximum total size of
907 *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
908 if (*outbuf_size_max == 0)
909 return LZMA_MEM_ERROR;
916 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
918 lzma_stream_coder *coder = coder_ptr;
920 // Lock coder->mutex to prevent finishing threads from moving their
921 // progress info from the worker_thread structure to lzma_stream_coder.
922 mythread_sync(coder->mutex) {
923 *progress_in = coder->progress_in;
924 *progress_out = coder->progress_out;
926 for (size_t i = 0; i < coder->threads_initialized; ++i) {
927 mythread_sync(coder->threads[i].mutex) {
928 *progress_in += coder->threads[i].progress_in;
929 *progress_out += coder->threads[i]
940 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
941 const lzma_mt *options)
943 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
945 // Get the filter chain.
946 lzma_options_easy easy;
947 const lzma_filter *filters;
949 uint64_t outbuf_size_max;
950 return_if_error(get_options(options, &easy, &filters,
951 &block_size, &outbuf_size_max));
953 #if SIZE_MAX < UINT64_MAX
954 if (block_size > SIZE_MAX)
955 return LZMA_MEM_ERROR;
958 // Validate the filter chain so that we can give an error in this
959 // function instead of delaying it to the first call to lzma_code().
960 // The memory usage calculation verifies the filter chain as
961 // a side effect so we take advatange of that.
962 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
963 return LZMA_OPTIONS_ERROR;
965 // Validate the Check ID.
966 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
967 return LZMA_PROG_ERROR;
969 if (!lzma_check_is_supported(options->check))
970 return LZMA_UNSUPPORTED_CHECK;
972 // Allocate and initialize the base structure if needed.
973 lzma_stream_coder *coder = next->coder;
975 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
977 return LZMA_MEM_ERROR;
981 // For the mutex and condition variable initializations
982 // the error handling has to be done here because
983 // stream_encoder_mt_end() doesn't know if they have
984 // already been initialized or not.
985 if (mythread_mutex_init(&coder->mutex)) {
986 lzma_free(coder, allocator);
988 return LZMA_MEM_ERROR;
991 if (mythread_cond_init(&coder->cond)) {
992 mythread_mutex_destroy(&coder->mutex);
993 lzma_free(coder, allocator);
995 return LZMA_MEM_ERROR;
998 next->code = &stream_encode_mt;
999 next->end = &stream_encoder_mt_end;
1000 next->get_progress = &get_progress;
1001 // next->update = &stream_encoder_mt_update;
1003 coder->filters[0].id = LZMA_VLI_UNKNOWN;
1004 coder->index_encoder = LZMA_NEXT_CODER_INIT;
1005 coder->index = NULL;
1006 memzero(&coder->outq, sizeof(coder->outq));
1007 coder->threads = NULL;
1008 coder->threads_max = 0;
1009 coder->threads_initialized = 0;
1012 // Basic initializations
1013 coder->sequence = SEQ_STREAM_HEADER;
1014 coder->block_size = (size_t)(block_size);
1015 coder->thread_error = LZMA_OK;
1018 // Allocate the thread-specific base structures.
1019 assert(options->threads > 0);
1020 if (coder->threads_max != options->threads) {
1021 threads_end(coder, allocator);
1023 coder->threads = NULL;
1024 coder->threads_max = 0;
1026 coder->threads_initialized = 0;
1027 coder->threads_free = NULL;
1029 coder->threads = lzma_alloc(
1030 options->threads * sizeof(worker_thread),
1032 if (coder->threads == NULL)
1033 return LZMA_MEM_ERROR;
1035 coder->threads_max = options->threads;
1037 // Reuse the old structures and threads. Tell the running
1038 // threads to stop and wait until they have stopped.
1039 threads_stop(coder, true);
1043 return_if_error(lzma_outq_init(&coder->outq, allocator,
1044 outbuf_size_max, options->threads));
1047 coder->timeout = options->timeout;
1049 // Free the old filter chain and copy the new one.
1050 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051 lzma_free(coder->filters[i].options, allocator);
1053 return_if_error(lzma_filters_copy(
1054 filters, coder->filters, allocator));
1057 lzma_index_end(coder->index, allocator);
1058 coder->index = lzma_index_init(allocator);
1059 if (coder->index == NULL)
1060 return LZMA_MEM_ERROR;
1063 coder->stream_flags.version = 0;
1064 coder->stream_flags.check = options->check;
1065 return_if_error(lzma_stream_header_encode(
1066 &coder->stream_flags, coder->header));
1068 coder->header_pos = 0;
1071 coder->progress_in = 0;
1072 coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1078 extern LZMA_API(lzma_ret)
1079 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1081 lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1083 strm->internal->supported_actions[LZMA_RUN] = true;
1084 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1085 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1086 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1087 strm->internal->supported_actions[LZMA_FINISH] = true;
1093 // This function name is a monster but it's consistent with the older
1094 // monster names. :-( 31 chars is the max that C99 requires so in that
1095 // sense it's not too long. ;-)
1096 extern LZMA_API(uint64_t)
1097 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1099 lzma_options_easy easy;
1100 const lzma_filter *filters;
1101 uint64_t block_size;
1102 uint64_t outbuf_size_max;
1104 if (get_options(options, &easy, &filters, &block_size,
1105 &outbuf_size_max) != LZMA_OK)
1108 // Memory usage of the input buffers
1109 const uint64_t inbuf_memusage = options->threads * block_size;
1111 // Memory usage of the filter encoders
1112 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1113 if (filters_memusage == UINT64_MAX)
1116 filters_memusage *= options->threads;
1118 // Memory usage of the output queue
1119 const uint64_t outq_memusage = lzma_outq_memusage(
1120 outbuf_size_max, options->threads);
1121 if (outq_memusage == UINT64_MAX)
1124 // Sum them with overflow checking.
1125 uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126 + sizeof(lzma_stream_coder)
1127 + options->threads * sizeof(worker_thread);
1129 if (UINT64_MAX - total_memusage < inbuf_memusage)
1132 total_memusage += inbuf_memusage;
1134 if (UINT64_MAX - total_memusage < filters_memusage)
1137 total_memusage += filters_memusage;
1139 if (UINT64_MAX - total_memusage < outq_memusage)
1142 return total_memusage + outq_memusage;