1 /* batch_fsync.c --- efficiently fsync multiple targets
3 * ====================================================================
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
20 * ====================================================================
23 #include <apr_thread_pool.h>
24 #include <apr_thread_cond.h>
26 #include "batch_fsync.h"
27 #include "svn_pools.h"
29 #include "svn_dirent_uri.h"
30 #include "svn_private_config.h"
32 #include "private/svn_atomic.h"
33 #include "private/svn_dep_compat.h"
34 #include "private/svn_mutex.h"
35 #include "private/svn_subr_private.h"
37 /* Handy macro to check APR function results and turning them into
38 * svn_error_t upon failure. */
39 #define WRAP_APR_ERR(x,msg) \
41 apr_status_t status_ = (x); \
43 return svn_error_wrap_apr(status_, msg); \
47 /* A simple SVN-wrapper around the apr_thread_cond_* API */
49 typedef apr_thread_cond_t svn_thread_cond__t;
51 typedef int svn_thread_cond__t;
55 svn_thread_cond__create(svn_thread_cond__t **cond,
56 apr_pool_t *result_pool)
60 WRAP_APR_ERR(apr_thread_cond_create(cond, result_pool),
61 _("Can't create condition variable"));
65 *cond = apr_pcalloc(result_pool, sizeof(**cond));
73 svn_thread_cond__broadcast(svn_thread_cond__t *cond)
77 WRAP_APR_ERR(apr_thread_cond_broadcast(cond),
78 _("Can't broadcast condition variable"));
86 svn_thread_cond__wait(svn_thread_cond__t *cond,
91 WRAP_APR_ERR(apr_thread_cond_wait(cond, svn_mutex__get(mutex)),
92 _("Can't broadcast condition variable"));
99 /* Utility construct: Clients can efficiently wait for the encapsulated
100 * counter to reach a certain value. Currently, only increments have been
101 * implemented. This whole structure can be opaque to the API users.
103 typedef struct waitable_counter_t
105 /* Current value, initialized to 0. */
108 /* Synchronization objects. */
109 svn_thread_cond__t *cond;
111 } waitable_counter_t;
113 /* Set *COUNTER_P to a new waitable_counter_t instance allocated in
114 * RESULT_POOL. The initial counter value is 0. */
116 waitable_counter__create(waitable_counter_t **counter_p,
117 apr_pool_t *result_pool)
119 waitable_counter_t *counter = apr_pcalloc(result_pool, sizeof(*counter));
122 SVN_ERR(svn_thread_cond__create(&counter->cond, result_pool));
123 SVN_ERR(svn_mutex__init(&counter->mutex, TRUE, result_pool));
125 *counter_p = counter;
130 /* Increment the value in COUNTER by 1. */
132 waitable_counter__increment(waitable_counter_t *counter)
134 SVN_ERR(svn_mutex__lock(counter->mutex));
137 SVN_ERR(svn_thread_cond__broadcast(counter->cond));
138 SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
143 /* Efficiently wait for COUNTER to assume VALUE. */
145 waitable_counter__wait_for(waitable_counter_t *counter,
148 svn_boolean_t done = FALSE;
150 /* This loop implicitly handles spurious wake-ups. */
153 SVN_ERR(svn_mutex__lock(counter->mutex));
155 if (counter->value == value)
158 SVN_ERR(svn_thread_cond__wait(counter->cond, counter->mutex));
160 SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
167 /* Set the value in COUNTER to 0. */
169 waitable_counter__reset(waitable_counter_t *counter)
171 SVN_ERR(svn_mutex__lock(counter->mutex));
173 SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
175 SVN_ERR(svn_thread_cond__broadcast(counter->cond));
180 /* Entry type for the svn_fs_x__batch_fsync_t collection. There is one
181 * instance per file handle.
183 typedef struct to_sync_t
185 /* Open handle of the file / directory to fsync. */
188 /* Pool to use with FILE. It is private to FILE such that it can be
189 * used safely together with FILE in a separate thread. */
192 /* Result of the file operations. */
195 /* Counter to increment when we completed the task. */
196 waitable_counter_t *counter;
199 /* The actual collection object. */
200 struct svn_fs_x__batch_fsync_t
202 /* Maps open file handles: C-string path to to_sync_t *. */
205 /* Counts the number of completed fsync tasks. */
206 waitable_counter_t *counter;
208 /* Perform fsyncs only if this flag has been set. */
209 svn_boolean_t flush_to_disk;
212 /* Data structures for concurrent fsync execution are only available if
213 * we have threading support.
217 /* Number of microseconds that an unused thread remains in the pool before
220 * Higher values are useful if clients frequently send small requests and
221 * you want to minimize the latency for those.
223 #define THREADPOOL_THREAD_IDLE_LIMIT 1000000
225 /* Maximum number of threads in THREAD_POOL, i.e. number of paths we can
226 * fsync concurrently throughout the process. */
227 #define MAX_THREADS 16
229 /* Thread pool to execute the fsync tasks. */
230 static apr_thread_pool_t *thread_pool = NULL;
234 /* Keep track on whether we already created the THREAD_POOL . */
235 static svn_atomic_t thread_pool_initialized = FALSE;
237 /* We open non-directory files with these flags. */
238 #define FILE_FLAGS (APR_READ | APR_WRITE | APR_BUFFERED | APR_CREATE)
242 /* Destructor function that implicitly cleans up any running threads
243 in the TRHEAD_POOL *once*.
245 Must be run as a pre-cleanup hook.
248 thread_pool_pre_cleanup(void *data)
250 apr_thread_pool_t *tp = thread_pool;
255 thread_pool_initialized = FALSE;
257 return apr_thread_pool_destroy(tp);
262 /* Core implementation of svn_fs_x__batch_fsync_init. */
264 create_thread_pool(void *baton,
265 apr_pool_t *owning_pool)
268 /* The thread-pool must be allocated from a thread-safe pool.
269 GLOBAL_POOL may be single-threaded, though. */
270 apr_pool_t *pool = svn_pool_create(NULL);
272 /* This thread pool will get cleaned up automatically when GLOBAL_POOL
273 gets cleared. No additional cleanup callback is needed. */
274 WRAP_APR_ERR(apr_thread_pool_create(&thread_pool, 0, MAX_THREADS, pool),
275 _("Can't create fsync thread pool in FSX"));
277 /* Work around an APR bug: The cleanup must happen in the pre-cleanup
278 hook instead of the normal cleanup hook. Otherwise, the sub-pools
279 containing the thread objects would already be invalid. */
280 apr_pool_pre_cleanup_register(pool, NULL, thread_pool_pre_cleanup);
281 apr_pool_pre_cleanup_register(owning_pool, NULL, thread_pool_pre_cleanup);
283 /* let idle threads linger for a while in case more requests are
285 apr_thread_pool_idle_wait_set(thread_pool, THREADPOOL_THREAD_IDLE_LIMIT);
287 /* don't queue requests unless we reached the worker thread limit */
288 apr_thread_pool_threshold_set(thread_pool, 0);
296 svn_fs_x__batch_fsync_init(apr_pool_t *owning_pool)
298 /* Protect against multiple calls. */
299 return svn_error_trace(svn_atomic__init_once(&thread_pool_initialized,
304 /* Destructor for svn_fs_x__batch_fsync_t. Releases all global pool memory
305 * and closes all open file handles. */
307 fsync_batch_cleanup(void *data)
309 svn_fs_x__batch_fsync_t *batch = data;
310 apr_hash_index_t *hi;
312 /* Close all files (implicitly) and release memory. */
313 for (hi = apr_hash_first(apr_hash_pool_get(batch->files), batch->files);
315 hi = apr_hash_next(hi))
317 to_sync_t *to_sync = apr_hash_this_val(hi);
318 svn_pool_destroy(to_sync->pool);
325 svn_fs_x__batch_fsync_create(svn_fs_x__batch_fsync_t **result_p,
326 svn_boolean_t flush_to_disk,
327 apr_pool_t *result_pool)
329 svn_fs_x__batch_fsync_t *result = apr_pcalloc(result_pool, sizeof(*result));
330 result->files = svn_hash__make(result_pool);
331 result->flush_to_disk = flush_to_disk;
333 SVN_ERR(waitable_counter__create(&result->counter, result_pool));
334 apr_pool_cleanup_register(result_pool, result, fsync_batch_cleanup,
335 apr_pool_cleanup_null);
342 /* If BATCH does not contain a handle for PATH, yet, create one with FLAGS
343 * and add it to BATCH. Set *FILE to the open file handle.
344 * Use SCRATCH_POOL for temporaries.
347 internal_open_file(apr_file_t **file,
348 svn_fs_x__batch_fsync_t *batch,
351 apr_pool_t *scratch_pool)
357 svn_boolean_t is_new_file;
360 /* If we already have a handle for PATH, return that. */
361 to_sync = svn_hash_gets(batch->files, path);
364 *file = to_sync->file;
368 /* Calling fsync in PATH is going to be expensive in any case, so we can
369 * allow for some extra overhead figuring out whether the file already
370 * exists. If it doesn't, be sure to schedule parent folder updates, if
371 * required on this platform.
373 * See svn_fs_x__batch_fsync_new_path() for when such extra fsyncs may be
379 if (flags & APR_CREATE)
381 svn_node_kind_t kind;
382 /* We might actually be about to create a new file.
383 * Check whether the file already exists. */
384 SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
385 is_new_file = kind == svn_node_none;
390 /* To be able to process each file in a separate thread, they must use
391 * separate, thread-safe pools. Allocating a sub-pool from the standard
392 * memory pool achieves exactly that. */
393 pool = svn_pool_create(NULL);
394 err = svn_io_file_open(file, path, flags, APR_OS_DEFAULT, pool);
397 svn_pool_destroy(pool);
398 return svn_error_trace(err);
401 to_sync = apr_pcalloc(pool, sizeof(*to_sync));
402 to_sync->file = *file;
403 to_sync->pool = pool;
404 to_sync->result = SVN_NO_ERROR;
405 to_sync->counter = batch->counter;
407 svn_hash_sets(batch->files,
408 apr_pstrdup(apr_hash_pool_get(batch->files), path),
411 /* If we just created a new file, schedule any additional necessary fsyncs.
412 * Note that this can only recurse once since the parent folder already
417 SVN_ERR(svn_fs_x__batch_fsync_new_path(batch, path, scratch_pool));
425 svn_fs_x__batch_fsync_open_file(apr_file_t **file,
426 svn_fs_x__batch_fsync_t *batch,
427 const char *filename,
428 apr_pool_t *scratch_pool)
430 apr_off_t offset = 0;
432 SVN_ERR(internal_open_file(file, batch, filename, FILE_FLAGS,
434 SVN_ERR(svn_io_file_seek(*file, APR_SET, &offset, scratch_pool));
440 svn_fs_x__batch_fsync_new_path(svn_fs_x__batch_fsync_t *batch,
442 apr_pool_t *scratch_pool)
448 /* On POSIX, we need to sync the parent directory because it contains
449 * the name for the file / folder given by PATH. */
450 path = svn_dirent_dirname(path, scratch_pool);
451 SVN_ERR(internal_open_file(&file, batch, path, APR_READ, scratch_pool));
455 svn_node_kind_t kind;
457 /* On non-POSIX systems, we assume that sync'ing the given PATH is the
458 * right thing to do. Also, we assume that only files may be sync'ed. */
459 SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
460 if (kind == svn_node_file)
461 SVN_ERR(internal_open_file(&file, batch, path, FILE_FLAGS,
469 /* Thread-pool task Flush the to_sync_t instance given by DATA. */
470 static void * APR_THREAD_FUNC
471 flush_task(apr_thread_t *tid,
474 to_sync_t *to_sync = data;
476 to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
477 (to_sync->file, to_sync->pool));
479 /* As soon as the increment call returns, TO_SYNC may be invalid
480 (the main thread may have woken up and released the struct.
482 Therefore, we cannot chain this error into TO_SYNC->RESULT.
483 OTOH, the main thread will probably deadlock anyway if we got
484 an error here, thus there is no point in trying to tell the
485 main thread what the problem was. */
486 svn_error_clear(waitable_counter__increment(to_sync->counter));
492 svn_fs_x__batch_fsync_run(svn_fs_x__batch_fsync_t *batch,
493 apr_pool_t *scratch_pool)
495 apr_hash_index_t *hi;
497 /* Number of tasks sent to the thread pool. */
500 /* Because we allocated the open files from our global pool, don't bail
501 * out on the first error. Instead, process all files and but accumulate
502 * the errors in this chain.
504 svn_error_t *chain = SVN_NO_ERROR;
506 /* First, flush APR-internal buffers. This should minimize / prevent the
507 * introduction of additional meta-data changes during the next phase.
508 * We might otherwise issue redundant fsyncs.
510 for (hi = apr_hash_first(scratch_pool, batch->files);
512 hi = apr_hash_next(hi))
514 to_sync_t *to_sync = apr_hash_this_val(hi);
515 to_sync->result = svn_error_trace(svn_io_file_flush
516 (to_sync->file, to_sync->pool));
519 /* Make sure the task completion counter is set to 0. */
520 chain = svn_error_compose_create(chain,
521 waitable_counter__reset(batch->counter));
523 /* Start the actual fsyncing process. */
524 if (batch->flush_to_disk)
526 for (hi = apr_hash_first(scratch_pool, batch->files);
528 hi = apr_hash_next(hi))
530 to_sync_t *to_sync = apr_hash_this_val(hi);
534 /* Forgot to call _init() or cleaned up the owning pool too early?
536 SVN_ERR_ASSERT(thread_pool);
538 /* If there are multiple fsyncs to perform, run them in parallel.
539 * Otherwise, skip the thread-pool and synchronization overhead. */
540 if (apr_hash_count(batch->files) > 1)
542 apr_status_t status = APR_SUCCESS;
543 status = apr_thread_pool_push(thread_pool, flush_task, to_sync,
546 to_sync->result = svn_error_wrap_apr(status,
547 _("Can't push task"));
556 to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
563 /* Wait for all outstanding flush operations to complete. */
564 chain = svn_error_compose_create(chain,
565 waitable_counter__wait_for(batch->counter,
568 /* Collect the results, close all files and release memory. */
569 for (hi = apr_hash_first(scratch_pool, batch->files);
571 hi = apr_hash_next(hi))
573 to_sync_t *to_sync = apr_hash_this_val(hi);
574 if (batch->flush_to_disk)
575 chain = svn_error_compose_create(chain, to_sync->result);
577 chain = svn_error_compose_create(chain,
578 svn_io_file_close(to_sync->file,
580 svn_pool_destroy(to_sync->pool);
583 /* Don't process any file / folder twice. */
584 apr_hash_clear(batch->files);
586 /* Report the errors that we encountered. */
587 return svn_error_trace(chain);