]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/subversion/subversion/libsvn_fs_x/batch_fsync.c
MFV r342175:
[FreeBSD/FreeBSD.git] / contrib / subversion / subversion / libsvn_fs_x / batch_fsync.c
1 /* batch_fsync.c --- efficiently fsync multiple targets
2  *
3  * ====================================================================
4  *    Licensed to the Apache Software Foundation (ASF) under one
5  *    or more contributor license agreements.  See the NOTICE file
6  *    distributed with this work for additional information
7  *    regarding copyright ownership.  The ASF licenses this file
8  *    to you under the Apache License, Version 2.0 (the
9  *    "License"); you may not use this file except in compliance
10  *    with the License.  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *    Unless required by applicable law or agreed to in writing,
15  *    software distributed under the License is distributed on an
16  *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  *    KIND, either express or implied.  See the License for the
18  *    specific language governing permissions and limitations
19  *    under the License.
20  * ====================================================================
21  */
22
23 #include <apr_thread_pool.h>
24 #include <apr_thread_cond.h>
25
26 #include "batch_fsync.h"
27 #include "svn_pools.h"
28 #include "svn_hash.h"
29 #include "svn_dirent_uri.h"
30 #include "svn_private_config.h"
31
32 #include "private/svn_atomic.h"
33 #include "private/svn_dep_compat.h"
34 #include "private/svn_mutex.h"
35 #include "private/svn_subr_private.h"
36
37 /* Handy macro to check APR function results and turning them into
38  * svn_error_t upon failure. */
39 #define WRAP_APR_ERR(x,msg)                     \
40   {                                             \
41     apr_status_t status_ = (x);                 \
42     if (status_)                                \
43       return svn_error_wrap_apr(status_, msg);  \
44   }
45
46
47 /* A simple SVN-wrapper around the apr_thread_cond_* API */
48 #if APR_HAS_THREADS
49 typedef apr_thread_cond_t svn_thread_cond__t;
50 #else
51 typedef int svn_thread_cond__t;
52 #endif
53
54 static svn_error_t *
55 svn_thread_cond__create(svn_thread_cond__t **cond,
56                         apr_pool_t *result_pool)
57 {
58 #if APR_HAS_THREADS
59
60   WRAP_APR_ERR(apr_thread_cond_create(cond, result_pool),
61                _("Can't create condition variable"));
62
63 #else
64
65   *cond = apr_pcalloc(result_pool, sizeof(**cond));
66
67 #endif
68
69   return SVN_NO_ERROR;
70 }
71
72 static svn_error_t *
73 svn_thread_cond__broadcast(svn_thread_cond__t *cond)
74 {
75 #if APR_HAS_THREADS
76
77   WRAP_APR_ERR(apr_thread_cond_broadcast(cond),
78                _("Can't broadcast condition variable"));
79
80 #endif
81
82   return SVN_NO_ERROR;
83 }
84
85 static svn_error_t *
86 svn_thread_cond__wait(svn_thread_cond__t *cond,
87                       svn_mutex__t *mutex)
88 {
89 #if APR_HAS_THREADS
90
91   WRAP_APR_ERR(apr_thread_cond_wait(cond, svn_mutex__get(mutex)),
92                _("Can't broadcast condition variable"));
93
94 #endif
95
96   return SVN_NO_ERROR;
97 }
98
99 /* Utility construct:  Clients can efficiently wait for the encapsulated
100  * counter to reach a certain value.  Currently, only increments have been
101  * implemented.  This whole structure can be opaque to the API users.
102  */
103 typedef struct waitable_counter_t
104 {
105   /* Current value, initialized to 0. */
106   int value;
107
108   /* Synchronization objects. */
109   svn_thread_cond__t *cond;
110   svn_mutex__t *mutex;
111 } waitable_counter_t;
112
113 /* Set *COUNTER_P to a new waitable_counter_t instance allocated in
114  * RESULT_POOL.  The initial counter value is 0. */
115 static svn_error_t *
116 waitable_counter__create(waitable_counter_t **counter_p,
117                          apr_pool_t *result_pool)
118 {
119   waitable_counter_t *counter = apr_pcalloc(result_pool, sizeof(*counter));
120   counter->value = 0;
121
122   SVN_ERR(svn_thread_cond__create(&counter->cond, result_pool));
123   SVN_ERR(svn_mutex__init(&counter->mutex, TRUE, result_pool));
124
125   *counter_p = counter;
126
127   return SVN_NO_ERROR;
128 }
129
130 /* Increment the value in COUNTER by 1. */
131 static svn_error_t *
132 waitable_counter__increment(waitable_counter_t *counter)
133 {
134   SVN_ERR(svn_mutex__lock(counter->mutex));
135   counter->value++;
136
137   SVN_ERR(svn_thread_cond__broadcast(counter->cond));
138   SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
139
140   return SVN_NO_ERROR;
141 }
142
143 /* Efficiently wait for COUNTER to assume VALUE. */
144 static svn_error_t *
145 waitable_counter__wait_for(waitable_counter_t *counter,
146                            int value)
147 {
148   svn_boolean_t done = FALSE;
149
150   /* This loop implicitly handles spurious wake-ups. */
151   do
152     {
153       SVN_ERR(svn_mutex__lock(counter->mutex));
154
155       if (counter->value == value)
156         done = TRUE;
157       else
158         SVN_ERR(svn_thread_cond__wait(counter->cond, counter->mutex));
159
160       SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
161     }
162   while (!done);
163
164   return SVN_NO_ERROR;
165 }
166
167 /* Set the value in COUNTER to 0. */
168 static svn_error_t *
169 waitable_counter__reset(waitable_counter_t *counter)
170 {
171   SVN_ERR(svn_mutex__lock(counter->mutex));
172   counter->value = 0;
173   SVN_ERR(svn_mutex__unlock(counter->mutex, SVN_NO_ERROR));
174
175   SVN_ERR(svn_thread_cond__broadcast(counter->cond));
176
177   return SVN_NO_ERROR;
178 }
179
180 /* Entry type for the svn_fs_x__batch_fsync_t collection.  There is one
181  * instance per file handle.
182  */
183 typedef struct to_sync_t
184 {
185   /* Open handle of the file / directory to fsync. */
186   apr_file_t *file;
187
188   /* Pool to use with FILE.  It is private to FILE such that it can be
189    * used safely together with FILE in a separate thread. */
190   apr_pool_t *pool;
191
192   /* Result of the file operations. */
193   svn_error_t *result;
194
195   /* Counter to increment when we completed the task. */
196   waitable_counter_t *counter;
197 } to_sync_t;
198
199 /* The actual collection object. */
200 struct svn_fs_x__batch_fsync_t
201 {
202   /* Maps open file handles: C-string path to to_sync_t *. */
203   apr_hash_t *files;
204
205   /* Counts the number of completed fsync tasks. */
206   waitable_counter_t *counter;
207
208   /* Perform fsyncs only if this flag has been set. */
209   svn_boolean_t flush_to_disk;
210 };
211
212 /* Data structures for concurrent fsync execution are only available if
213  * we have threading support.
214  */
215 #if APR_HAS_THREADS
216
217 /* Number of microseconds that an unused thread remains in the pool before
218  * being terminated.
219  *
220  * Higher values are useful if clients frequently send small requests and
221  * you want to minimize the latency for those.
222  */
223 #define THREADPOOL_THREAD_IDLE_LIMIT 1000000
224
225 /* Maximum number of threads in THREAD_POOL, i.e. number of paths we can
226  * fsync concurrently throughout the process. */
227 #define MAX_THREADS 16
228
229 /* Thread pool to execute the fsync tasks. */
230 static apr_thread_pool_t *thread_pool = NULL;
231
232 #endif
233
234 /* Keep track on whether we already created the THREAD_POOL . */
235 static svn_atomic_t thread_pool_initialized = FALSE;
236
237 /* We open non-directory files with these flags. */
238 #define FILE_FLAGS (APR_READ | APR_WRITE | APR_BUFFERED | APR_CREATE)
239
240 #if APR_HAS_THREADS
241
242 /* Destructor function that implicitly cleans up any running threads
243    in the TRHEAD_POOL *once*.
244
245    Must be run as a pre-cleanup hook.
246  */
247 static apr_status_t
248 thread_pool_pre_cleanup(void *data)
249 {
250   apr_thread_pool_t *tp = thread_pool;
251   if (!thread_pool)
252     return APR_SUCCESS;
253
254   thread_pool = NULL;
255   thread_pool_initialized = FALSE;
256
257   return apr_thread_pool_destroy(tp);
258 }
259
260 #endif
261
262 /* Core implementation of svn_fs_x__batch_fsync_init. */
263 static svn_error_t *
264 create_thread_pool(void *baton,
265                    apr_pool_t *owning_pool)
266 {
267 #if APR_HAS_THREADS
268   /* The thread-pool must be allocated from a thread-safe pool.
269      GLOBAL_POOL may be single-threaded, though. */
270   apr_pool_t *pool = svn_pool_create(NULL);
271
272   /* This thread pool will get cleaned up automatically when GLOBAL_POOL
273      gets cleared.  No additional cleanup callback is needed. */
274   WRAP_APR_ERR(apr_thread_pool_create(&thread_pool, 0, MAX_THREADS, pool),
275                _("Can't create fsync thread pool in FSX"));
276
277   /* Work around an APR bug:  The cleanup must happen in the pre-cleanup
278      hook instead of the normal cleanup hook.  Otherwise, the sub-pools
279      containing the thread objects would already be invalid. */
280   apr_pool_pre_cleanup_register(pool, NULL, thread_pool_pre_cleanup);
281   apr_pool_pre_cleanup_register(owning_pool, NULL, thread_pool_pre_cleanup);
282
283   /* let idle threads linger for a while in case more requests are
284      coming in */
285   apr_thread_pool_idle_wait_set(thread_pool, THREADPOOL_THREAD_IDLE_LIMIT);
286
287   /* don't queue requests unless we reached the worker thread limit */
288   apr_thread_pool_threshold_set(thread_pool, 0);
289
290 #endif
291
292   return SVN_NO_ERROR;
293 }
294
295 svn_error_t *
296 svn_fs_x__batch_fsync_init(apr_pool_t *owning_pool)
297 {
298   /* Protect against multiple calls. */
299   return svn_error_trace(svn_atomic__init_once(&thread_pool_initialized,
300                                                create_thread_pool,
301                                                NULL, owning_pool));
302 }
303
304 /* Destructor for svn_fs_x__batch_fsync_t.  Releases all global pool memory
305  * and closes all open file handles. */
306 static apr_status_t
307 fsync_batch_cleanup(void *data)
308 {
309   svn_fs_x__batch_fsync_t *batch = data;
310   apr_hash_index_t *hi;
311
312   /* Close all files (implicitly) and release memory. */
313   for (hi = apr_hash_first(apr_hash_pool_get(batch->files), batch->files);
314        hi;
315        hi = apr_hash_next(hi))
316     {
317       to_sync_t *to_sync = apr_hash_this_val(hi);
318       svn_pool_destroy(to_sync->pool);
319     }
320
321   return APR_SUCCESS;
322 }
323
324 svn_error_t *
325 svn_fs_x__batch_fsync_create(svn_fs_x__batch_fsync_t **result_p,
326                              svn_boolean_t flush_to_disk,
327                              apr_pool_t *result_pool)
328 {
329   svn_fs_x__batch_fsync_t *result = apr_pcalloc(result_pool, sizeof(*result));
330   result->files = svn_hash__make(result_pool);
331   result->flush_to_disk = flush_to_disk;
332
333   SVN_ERR(waitable_counter__create(&result->counter, result_pool));
334   apr_pool_cleanup_register(result_pool, result, fsync_batch_cleanup,
335                             apr_pool_cleanup_null);
336
337   *result_p = result;
338
339   return SVN_NO_ERROR;
340 }
341
342 /* If BATCH does not contain a handle for PATH, yet, create one with FLAGS
343  * and add it to BATCH.  Set *FILE to the open file handle.
344  * Use SCRATCH_POOL for temporaries.
345  */
346 static svn_error_t *
347 internal_open_file(apr_file_t **file,
348                    svn_fs_x__batch_fsync_t *batch,
349                    const char *path,
350                    apr_int32_t flags,
351                    apr_pool_t *scratch_pool)
352 {
353   svn_error_t *err;
354   apr_pool_t *pool;
355   to_sync_t *to_sync;
356 #ifdef SVN_ON_POSIX
357   svn_boolean_t is_new_file;
358 #endif
359
360   /* If we already have a handle for PATH, return that. */
361   to_sync = svn_hash_gets(batch->files, path);
362   if (to_sync)
363     {
364       *file = to_sync->file;
365       return SVN_NO_ERROR;
366     }
367
368   /* Calling fsync in PATH is going to be expensive in any case, so we can
369    * allow for some extra overhead figuring out whether the file already
370    * exists.  If it doesn't, be sure to schedule parent folder updates, if
371    * required on this platform.
372    *
373    * See svn_fs_x__batch_fsync_new_path() for when such extra fsyncs may be
374    * needed at all. */
375
376 #ifdef SVN_ON_POSIX
377
378   is_new_file = FALSE;
379   if (flags & APR_CREATE)
380     {
381       svn_node_kind_t kind;
382       /* We might actually be about to create a new file.
383        * Check whether the file already exists. */
384       SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
385       is_new_file = kind == svn_node_none;
386     }
387
388 #endif
389
390   /* To be able to process each file in a separate thread, they must use
391    * separate, thread-safe pools.  Allocating a sub-pool from the standard
392    * memory pool achieves exactly that. */
393   pool = svn_pool_create(NULL);
394   err = svn_io_file_open(file, path, flags, APR_OS_DEFAULT, pool);
395   if (err)
396     {
397       svn_pool_destroy(pool);
398       return svn_error_trace(err);
399     }
400
401   to_sync = apr_pcalloc(pool, sizeof(*to_sync));
402   to_sync->file = *file;
403   to_sync->pool = pool;
404   to_sync->result = SVN_NO_ERROR;
405   to_sync->counter = batch->counter;
406
407   svn_hash_sets(batch->files,
408                 apr_pstrdup(apr_hash_pool_get(batch->files), path),
409                 to_sync);
410
411   /* If we just created a new file, schedule any additional necessary fsyncs.
412    * Note that this can only recurse once since the parent folder already
413    * exists on disk. */
414 #ifdef SVN_ON_POSIX
415
416   if (is_new_file)
417     SVN_ERR(svn_fs_x__batch_fsync_new_path(batch, path, scratch_pool));
418
419 #endif
420
421   return SVN_NO_ERROR;
422 }
423
424 svn_error_t *
425 svn_fs_x__batch_fsync_open_file(apr_file_t **file,
426                                 svn_fs_x__batch_fsync_t *batch,
427                                 const char *filename,
428                                 apr_pool_t *scratch_pool)
429 {
430   apr_off_t offset = 0;
431
432   SVN_ERR(internal_open_file(file, batch, filename, FILE_FLAGS,
433                              scratch_pool));
434   SVN_ERR(svn_io_file_seek(*file, APR_SET, &offset, scratch_pool));
435
436   return SVN_NO_ERROR;
437 }
438
439 svn_error_t *
440 svn_fs_x__batch_fsync_new_path(svn_fs_x__batch_fsync_t *batch,
441                                const char *path,
442                                apr_pool_t *scratch_pool)
443 {
444   apr_file_t *file;
445
446 #ifdef SVN_ON_POSIX
447
448   /* On POSIX, we need to sync the parent directory because it contains
449    * the name for the file / folder given by PATH. */
450   path = svn_dirent_dirname(path, scratch_pool);
451   SVN_ERR(internal_open_file(&file, batch, path, APR_READ, scratch_pool));
452
453 #else
454
455   svn_node_kind_t kind;
456
457   /* On non-POSIX systems, we assume that sync'ing the given PATH is the
458    * right thing to do.  Also, we assume that only files may be sync'ed. */
459   SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
460   if (kind == svn_node_file)
461     SVN_ERR(internal_open_file(&file, batch, path, FILE_FLAGS,
462                                scratch_pool));
463
464 #endif
465
466   return SVN_NO_ERROR;
467 }
468
469 /* Thread-pool task Flush the to_sync_t instance given by DATA. */
470 static void * APR_THREAD_FUNC
471 flush_task(apr_thread_t *tid,
472            void *data)
473 {
474   to_sync_t *to_sync = data;
475
476   to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
477                                         (to_sync->file, to_sync->pool));
478
479   /* As soon as the increment call returns, TO_SYNC may be invalid
480      (the main thread may have woken up and released the struct.
481
482      Therefore, we cannot chain this error into TO_SYNC->RESULT.
483      OTOH, the main thread will probably deadlock anyway if we got
484      an error here, thus there is no point in trying to tell the
485      main thread what the problem was. */
486   svn_error_clear(waitable_counter__increment(to_sync->counter));
487
488   return NULL;
489 }
490
491 svn_error_t *
492 svn_fs_x__batch_fsync_run(svn_fs_x__batch_fsync_t *batch,
493                           apr_pool_t *scratch_pool)
494 {
495   apr_hash_index_t *hi;
496
497   /* Number of tasks sent to the thread pool. */
498   int tasks = 0;
499
500   /* Because we allocated the open files from our global pool, don't bail
501    * out on the first error.  Instead, process all files and but accumulate
502    * the errors in this chain.
503    */
504   svn_error_t *chain = SVN_NO_ERROR;
505
506   /* First, flush APR-internal buffers. This should minimize / prevent the
507    * introduction of additional meta-data changes during the next phase.
508    * We might otherwise issue redundant fsyncs.
509    */
510   for (hi = apr_hash_first(scratch_pool, batch->files);
511        hi;
512        hi = apr_hash_next(hi))
513     {
514       to_sync_t *to_sync = apr_hash_this_val(hi);
515       to_sync->result = svn_error_trace(svn_io_file_flush
516                                            (to_sync->file, to_sync->pool));
517     }
518
519   /* Make sure the task completion counter is set to 0. */
520   chain = svn_error_compose_create(chain,
521                                    waitable_counter__reset(batch->counter));
522
523   /* Start the actual fsyncing process. */
524   if (batch->flush_to_disk)
525     {
526       for (hi = apr_hash_first(scratch_pool, batch->files);
527            hi;
528            hi = apr_hash_next(hi))
529         {
530           to_sync_t *to_sync = apr_hash_this_val(hi);
531
532 #if APR_HAS_THREADS
533
534           /* Forgot to call _init() or cleaned up the owning pool too early?
535            */
536           SVN_ERR_ASSERT(thread_pool);
537
538           /* If there are multiple fsyncs to perform, run them in parallel.
539            * Otherwise, skip the thread-pool and synchronization overhead. */
540           if (apr_hash_count(batch->files) > 1)
541             {
542               apr_status_t status = APR_SUCCESS;
543               status = apr_thread_pool_push(thread_pool, flush_task, to_sync,
544                                             0, NULL);
545               if (status)
546                 to_sync->result = svn_error_wrap_apr(status,
547                                                      _("Can't push task"));
548               else
549                 tasks++;
550             }
551           else
552
553 #endif
554
555             {
556               to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
557                                                   (to_sync->file,
558                                                    to_sync->pool));
559             }
560         }
561     }
562
563   /* Wait for all outstanding flush operations to complete. */
564   chain = svn_error_compose_create(chain,
565                                    waitable_counter__wait_for(batch->counter,
566                                                               tasks));
567
568   /* Collect the results, close all files and release memory. */
569   for (hi = apr_hash_first(scratch_pool, batch->files);
570        hi;
571        hi = apr_hash_next(hi))
572     {
573       to_sync_t *to_sync = apr_hash_this_val(hi);
574       if (batch->flush_to_disk)
575         chain = svn_error_compose_create(chain, to_sync->result);
576
577       chain = svn_error_compose_create(chain,
578                                        svn_io_file_close(to_sync->file,
579                                                          scratch_pool));
580       svn_pool_destroy(to_sync->pool);
581     }
582
583   /* Don't process any file / folder twice. */
584   apr_hash_clear(batch->files);
585
586   /* Report the errors that we encountered. */
587   return svn_error_trace(chain);
588 }