2 * replay.c : entry point for replay RA functions for ra_serf
4 * ====================================================================
5 * Licensed to the Apache Software Foundation (ASF) under one
6 * or more contributor license agreements. See the NOTICE file
7 * distributed with this work for additional information
8 * regarding copyright ownership. The ASF licenses this file
9 * to you under the Apache License, Version 2.0 (the
10 * "License"); you may not use this file except in compliance
11 * with the License. You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing,
16 * software distributed under the License is distributed on an
17 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
18 * KIND, either express or implied. See the License for the
19 * specific language governing permissions and limitations
21 * ====================================================================
29 #include "svn_pools.h"
34 #include "../libsvn_ra/ra_loader.h"
35 #include "svn_config.h"
36 #include "svn_delta.h"
37 #include "svn_base64.h"
39 #include "svn_private_config.h"
41 #include "private/svn_string_private.h"
47 * This enum represents the current state of our XML parsing.
49 typedef enum replay_state_e {
50 INITIAL = XML_STATE_INITIAL,
53 REPLAY_TARGET_REVISION,
55 REPLAY_OPEN_DIRECTORY,
61 REPLAY_CLOSE_DIRECTORY,
62 REPLAY_CHANGE_DIRECTORY_PROP,
63 REPLAY_CHANGE_FILE_PROP,
64 REPLAY_APPLY_TEXTDELTA
67 #define S_ SVN_XML_NAMESPACE
68 static const svn_ra_serf__xml_transition_t replay_ttable[] = {
69 { INITIAL, S_, "editor-report", REPLAY_REPORT,
70 FALSE, { NULL }, TRUE },
72 /* Replay just throws every operation as xml element directly
73 in the replay report, so we can't really use the nice exit
74 handling of the transition parser to handle clean callbacks */
76 { REPLAY_REPORT, S_, "target-revision", REPLAY_TARGET_REVISION,
77 FALSE, { "rev", NULL }, TRUE },
79 { REPLAY_REPORT, S_, "open-root", REPLAY_OPEN_ROOT,
80 FALSE, { "rev", NULL }, TRUE },
82 { REPLAY_REPORT, S_, "open-directory", REPLAY_OPEN_DIRECTORY,
83 FALSE, { "name", "rev", NULL }, TRUE },
85 { REPLAY_REPORT, S_, "open-file", REPLAY_OPEN_FILE,
86 FALSE, { "name", "rev", NULL }, TRUE },
88 { REPLAY_REPORT, S_, "add-directory", REPLAY_ADD_DIRECTORY,
89 FALSE, { "name", "?copyfrom-path", "?copyfrom-rev", NULL}, TRUE },
91 { REPLAY_REPORT, S_, "add-file", REPLAY_ADD_FILE,
92 FALSE, { "name", "?copyfrom-path", "?copyfrom-rev", NULL}, TRUE },
94 { REPLAY_REPORT, S_, "delete-entry", REPLAY_DELETE_ENTRY,
95 FALSE, { "name", "rev", NULL }, TRUE },
97 { REPLAY_REPORT, S_, "close-file", REPLAY_CLOSE_FILE,
98 FALSE, { "?checksum", NULL }, TRUE },
100 { REPLAY_REPORT, S_, "close-directory", REPLAY_CLOSE_DIRECTORY,
101 FALSE, { NULL }, TRUE },
103 { REPLAY_REPORT, S_, "change-dir-prop", REPLAY_CHANGE_DIRECTORY_PROP,
104 TRUE, { "name", "?del", NULL }, TRUE },
106 { REPLAY_REPORT, S_, "change-file-prop", REPLAY_CHANGE_FILE_PROP,
107 TRUE, { "name", "?del", NULL }, TRUE },
109 { REPLAY_REPORT, S_, "apply-textdelta", REPLAY_APPLY_TEXTDELTA,
110 FALSE, { "?checksum", NULL }, TRUE },
115 /* Per directory/file state */
116 typedef struct replay_node_t {
117 apr_pool_t *pool; /* pool allocating this node's data */
118 svn_boolean_t file; /* file or dir */
120 void *baton; /* node baton */
121 svn_stream_t *stream; /* stream while handling txdata */
123 struct replay_node_t *parent; /* parent node or NULL */
126 /* Per revision replay report state */
127 typedef struct revision_report_t {
128 apr_pool_t *pool; /* per revision pool */
130 struct replay_node_t *current_node;
131 struct replay_node_t *root_node;
133 /* Are we done fetching this file?
134 Handles book-keeping in multi-report case */
136 int *replay_reports; /* NULL or number of outstanding reports */
138 /* callback to get an editor */
139 svn_ra_replay_revstart_callback_t revstart_func;
140 svn_ra_replay_revfinish_callback_t revfinish_func;
143 /* replay receiver function and baton */
144 const svn_delta_editor_t *editor;
147 /* Path and revision used to filter replayed changes. If
148 INCLUDE_PATH is non-NULL, REVISION is unnecessary and will not be
149 included in the replay REPORT. (Because the REPORT is being
150 aimed an HTTP v2 revision resource.) */
151 const char *include_path;
152 svn_revnum_t revision;
154 /* Information needed to create the replay report body */
155 svn_revnum_t low_water_mark;
156 svn_boolean_t send_deltas;
158 /* Target and revision to fetch revision properties on */
159 const char *revprop_target;
160 svn_revnum_t revprop_rev;
162 /* Revision properties for this revision. */
163 apr_hash_t *rev_props;
165 /* Handlers for the PROPFIND and REPORT for the current revision. */
166 svn_ra_serf__handler_t *propfind_handler;
167 svn_ra_serf__handler_t *report_handler; /* For done handler */
171 /* Conforms to svn_ra_serf__xml_opened_t */
173 replay_opened(svn_ra_serf__xml_estate_t *xes,
176 const svn_ra_serf__dav_props_t *tag,
177 apr_pool_t *scratch_pool)
179 struct revision_report_t *ctx = baton;
181 if (entered_state == REPLAY_REPORT)
183 /* Before we can continue, we need the revision properties. */
184 SVN_ERR_ASSERT(!ctx->propfind_handler || ctx->propfind_handler->done);
186 svn_ra_serf__keep_only_regular_props(ctx->rev_props, scratch_pool);
188 if (ctx->revstart_func)
190 SVN_ERR(ctx->revstart_func(ctx->revision, ctx->replay_baton,
191 &ctx->editor, &ctx->editor_baton,
196 else if (entered_state == REPLAY_APPLY_TEXTDELTA)
198 struct replay_node_t *node = ctx->current_node;
200 const char *checksum;
201 svn_txdelta_window_handler_t handler;
204 if (! node || ! node->file || node->stream)
205 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
207 /* ### Is there a better way to access a specific attr here? */
208 attrs = svn_ra_serf__xml_gather_since(xes, REPLAY_APPLY_TEXTDELTA);
209 checksum = svn_hash_gets(attrs, "checksum");
211 SVN_ERR(ctx->editor->apply_textdelta(node->baton, checksum, node->pool,
212 &handler, &handler_baton));
214 if (handler != svn_delta_noop_window_handler)
216 node->stream = svn_base64_decode(
217 svn_txdelta_parse_svndiff(handler,
228 /* Conforms to svn_ra_serf__xml_closed_t */
230 replay_closed(svn_ra_serf__xml_estate_t *xes,
233 const svn_string_t *cdata,
235 apr_pool_t *scratch_pool)
237 struct revision_report_t *ctx = baton;
239 if (leaving_state == REPLAY_REPORT)
241 if (ctx->current_node)
242 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
244 if (ctx->revfinish_func)
246 SVN_ERR(ctx->revfinish_func(ctx->revision, ctx->replay_baton,
247 ctx->editor, ctx->editor_baton,
248 ctx->rev_props, scratch_pool));
251 else if (leaving_state == REPLAY_TARGET_REVISION)
253 const char *revstr = svn_hash_gets(attrs, "rev");
256 SVN_ERR(svn_cstring_atoi64(&rev, revstr));
257 SVN_ERR(ctx->editor->set_target_revision(ctx->editor_baton,
261 else if (leaving_state == REPLAY_OPEN_ROOT)
263 const char *revstr = svn_hash_gets(attrs, "rev");
265 apr_pool_t *root_pool = svn_pool_create(ctx->pool);
267 if (ctx->current_node || ctx->root_node)
268 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
270 ctx->root_node = apr_pcalloc(root_pool, sizeof(*ctx->root_node));
271 ctx->root_node->pool = root_pool;
273 ctx->current_node = ctx->root_node;
275 SVN_ERR(svn_cstring_atoi64(&rev, revstr));
276 SVN_ERR(ctx->editor->open_root(ctx->editor_baton, (svn_revnum_t)rev,
278 &ctx->current_node->baton));
280 else if (leaving_state == REPLAY_OPEN_DIRECTORY
281 || leaving_state == REPLAY_OPEN_FILE
282 || leaving_state == REPLAY_ADD_DIRECTORY
283 || leaving_state == REPLAY_ADD_FILE)
285 struct replay_node_t *node;
286 apr_pool_t *node_pool;
287 const char *name = svn_hash_gets(attrs, "name");
291 if (!ctx->current_node || ctx->current_node->file)
292 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
294 node_pool = svn_pool_create(ctx->current_node->pool);
295 node = apr_pcalloc(node_pool, sizeof(*node));
296 node->pool = node_pool;
297 node->parent = ctx->current_node;
299 if (leaving_state == REPLAY_OPEN_DIRECTORY
300 || leaving_state == REPLAY_OPEN_FILE)
302 rev_str = svn_hash_gets(attrs, "rev");
305 rev_str = svn_hash_gets(attrs, "copyfrom-rev");
308 SVN_ERR(svn_cstring_atoi64(&rev, rev_str));
310 rev = SVN_INVALID_REVNUM;
312 switch (leaving_state)
314 case REPLAY_OPEN_DIRECTORY:
316 SVN_ERR(ctx->editor->open_directory(name,
317 ctx->current_node->baton,
322 case REPLAY_OPEN_FILE:
324 SVN_ERR(ctx->editor->open_file(name,
325 ctx->current_node->baton,
330 case REPLAY_ADD_DIRECTORY:
332 SVN_ERR(ctx->editor->add_directory(
334 ctx->current_node->baton,
335 SVN_IS_VALID_REVNUM(rev)
336 ? svn_hash_gets(attrs, "copyfrom-path")
342 case REPLAY_ADD_FILE:
344 SVN_ERR(ctx->editor->add_file(
346 ctx->current_node->baton,
347 SVN_IS_VALID_REVNUM(rev)
348 ? svn_hash_gets(attrs, "copyfrom-path")
354 /* default: unreachable */
356 ctx->current_node = node;
358 else if (leaving_state == REPLAY_CLOSE_FILE)
360 struct replay_node_t *node = ctx->current_node;
362 if (! node || ! node->file)
363 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
365 SVN_ERR(ctx->editor->close_file(node->baton,
366 svn_hash_gets(attrs, "checksum"),
368 ctx->current_node = node->parent;
369 svn_pool_destroy(node->pool);
371 else if (leaving_state == REPLAY_CLOSE_DIRECTORY)
373 struct replay_node_t *node = ctx->current_node;
375 if (! node || node->file)
376 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
378 SVN_ERR(ctx->editor->close_directory(node->baton, node->pool));
379 ctx->current_node = node->parent;
380 svn_pool_destroy(node->pool);
382 else if (leaving_state == REPLAY_DELETE_ENTRY)
384 struct replay_node_t *parent_node = ctx->current_node;
385 const char *name = svn_hash_gets(attrs, "name");
386 const char *revstr = svn_hash_gets(attrs, "rev");
389 if (! parent_node || parent_node->file)
390 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
392 SVN_ERR(svn_cstring_atoi64(&rev, revstr));
393 SVN_ERR(ctx->editor->delete_entry(name,
398 else if (leaving_state == REPLAY_CHANGE_FILE_PROP
399 || leaving_state == REPLAY_CHANGE_DIRECTORY_PROP)
401 struct replay_node_t *node = ctx->current_node;
403 const svn_string_t *value;
405 if (! node || node->file != (leaving_state == REPLAY_CHANGE_FILE_PROP))
406 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
408 name = svn_hash_gets(attrs, "name");
410 if (svn_hash_gets(attrs, "del"))
413 value = svn_base64_decode_string(cdata, scratch_pool);
417 SVN_ERR(ctx->editor->change_file_prop(node->baton, name, value,
422 SVN_ERR(ctx->editor->change_dir_prop(node->baton, name, value,
426 else if (leaving_state == REPLAY_APPLY_TEXTDELTA)
428 struct replay_node_t *node = ctx->current_node;
430 if (! node || ! node->file || ! node->stream)
431 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
433 SVN_ERR(svn_stream_close(node->stream));
440 /* Conforms to svn_ra_serf__xml_cdata_t */
442 replay_cdata(svn_ra_serf__xml_estate_t *xes,
447 apr_pool_t *scratch_pool)
449 struct revision_report_t *ctx = baton;
451 if (current_state == REPLAY_APPLY_TEXTDELTA)
453 struct replay_node_t *node = ctx->current_node;
455 if (! node || ! node->file)
456 return svn_error_create(SVN_ERR_XML_MALFORMED, NULL, NULL);
460 apr_size_t written = len;
462 SVN_ERR(svn_stream_write(node->stream, data, &written));
464 return svn_error_create(SVN_ERR_STREAM_UNEXPECTED_EOF, NULL,
465 _("Error writing stream: unexpected EOF"));
472 /* Implements svn_ra_serf__request_body_delegate_t */
474 create_replay_body(serf_bucket_t **bkt,
476 serf_bucket_alloc_t *alloc,
477 apr_pool_t *pool /* request pool */,
478 apr_pool_t *scratch_pool)
480 struct revision_report_t *ctx = baton;
481 serf_bucket_t *body_bkt;
483 body_bkt = serf_bucket_aggregate_create(alloc);
485 svn_ra_serf__add_open_tag_buckets(body_bkt, alloc,
487 "xmlns:S", SVN_XML_NAMESPACE,
490 /* If we have a non-NULL include path, we add it to the body and
491 omit the revision; otherwise, the reverse. */
492 if (ctx->include_path)
494 svn_ra_serf__add_tag_buckets(body_bkt,
501 svn_ra_serf__add_tag_buckets(body_bkt,
503 apr_ltoa(pool, ctx->revision),
506 svn_ra_serf__add_tag_buckets(body_bkt,
508 apr_ltoa(pool, ctx->low_water_mark),
511 svn_ra_serf__add_tag_buckets(body_bkt,
513 apr_ltoa(pool, ctx->send_deltas),
516 svn_ra_serf__add_close_tag_buckets(body_bkt, alloc, "S:replay-report");
523 svn_ra_serf__replay(svn_ra_session_t *ra_session,
524 svn_revnum_t revision,
525 svn_revnum_t low_water_mark,
526 svn_boolean_t send_deltas,
527 const svn_delta_editor_t *editor,
529 apr_pool_t *scratch_pool)
531 struct revision_report_t ctx = { NULL };
532 svn_ra_serf__session_t *session = ra_session->priv;
533 svn_ra_serf__handler_t *handler;
534 svn_ra_serf__xml_context_t *xmlctx;
535 const char *report_target;
537 SVN_ERR(svn_ra_serf__report_resource(&report_target, session,
540 ctx.pool = svn_pool_create(scratch_pool);
542 ctx.editor_baton = edit_baton;
544 ctx.revision = revision;
545 ctx.low_water_mark = low_water_mark;
546 ctx.send_deltas = send_deltas;
547 ctx.rev_props = apr_hash_make(scratch_pool);
549 xmlctx = svn_ra_serf__xml_context_create(replay_ttable,
550 replay_opened, replay_closed,
555 handler = svn_ra_serf__create_expat_handler(session, xmlctx, NULL,
558 handler->method = "REPORT";
559 handler->path = session->session_url.path;
560 handler->body_delegate = create_replay_body;
561 handler->body_delegate_baton = &ctx;
562 handler->body_type = "text/xml";
564 /* Not setting up done handler as we don't use a global context */
566 SVN_ERR(svn_ra_serf__context_run_one(handler, scratch_pool));
568 return svn_error_trace(
569 svn_ra_serf__error_on_status(handler->sline,
574 /* The maximum number of outstanding requests at any time. When this
575 * number is reached, ra_serf will stop sending requests until
576 * responses on the previous requests are received and handled.
578 * Some observations about serf which lead us to the current value.
579 * ----------------------------------------------------------------
581 * We aim to keep serf's outgoing queue filled with enough requests so
582 * the network bandwidth and server capacity is used
583 * optimally. Originally we used 5 as the max. number of outstanding
584 * requests, but this turned out to be too low.
586 * Serf doesn't exit out of the svn_ra_serf__context_run_wait loop as long as
587 * it has data to send or receive. With small responses (revs of a few
588 * kB), serf doesn't come out of this loop at all. So with
589 * MAX_OUTSTANDING_REQUESTS set to a low number, there's a big chance
590 * that serf handles those requests completely in its internal loop,
591 * and only then gives us a chance to create new requests. This
592 * results in hiccups, slowing down the whole process.
594 * With a larger MAX_OUTSTANDING_REQUESTS, like 100 or more, there's
595 * more chance that serf can come out of its internal loop so we can
596 * replenish the outgoing request queue. There's no real disadvantage
597 * of using a large number here, besides the memory used to store the
598 * message, parser and handler objects (approx. 250 bytes).
600 * In my test setup peak performance was reached at max. 30-35
601 * requests. So I added a small margin and chose 50.
603 #define MAX_OUTSTANDING_REQUESTS 50
605 /* Implements svn_ra_serf__response_done_delegate_t for svn_ra_serf__replay_range */
607 replay_done(serf_request_t *request,
609 apr_pool_t *scratch_pool)
611 struct revision_report_t *ctx = baton;
612 svn_ra_serf__handler_t *handler = ctx->report_handler;
614 if (handler->server_error)
615 return svn_ra_serf__server_error_create(handler, scratch_pool);
616 else if (handler->sline.code != 200)
617 return svn_error_trace(svn_ra_serf__unexpected_status(handler));
619 *ctx->done = TRUE; /* Breaks out svn_ra_serf__context_run_wait */
621 /* Are re replaying multiple revisions? */
622 if (ctx->replay_reports)
624 (*ctx->replay_reports)--;
627 svn_pool_destroy(ctx->pool); /* Destroys handler and request! */
633 svn_ra_serf__replay_range(svn_ra_session_t *ra_session,
634 svn_revnum_t start_revision,
635 svn_revnum_t end_revision,
636 svn_revnum_t low_water_mark,
637 svn_boolean_t send_deltas,
638 svn_ra_replay_revstart_callback_t revstart_func,
639 svn_ra_replay_revfinish_callback_t revfinish_func,
641 apr_pool_t *scratch_pool)
643 svn_ra_serf__session_t *session = ra_session->priv;
644 svn_revnum_t rev = start_revision;
645 const char *report_target;
646 int active_reports = 0;
647 const char *include_path;
650 SVN_ERR(svn_ra_serf__report_resource(&report_target, session,
653 /* Prior to 1.8, mod_dav_svn expect to get replay REPORT requests
654 aimed at the session URL. But that's incorrect -- these reports
655 aren't about specific resources -- they are above revisions. The
656 path-based filtering offered by this API is just that: a filter
657 applied to the full set of changes made in the revision. As
658 such, the correct target for these REPORT requests is the "me
659 resource" (or, pre-http-v2, the default VCC).
661 Our server should have told us if it supported this protocol
662 correction. If so, we aimed our report at the correct resource
663 and include the filtering path as metadata within the report
664 body. Otherwise, we fall back to the pre-1.8 behavior and just
668 http://subversion.tigris.org/issues/show_bug.cgi?id=4287
670 if (session->supports_rev_rsrc_replay)
672 SVN_ERR(svn_ra_serf__get_relative_path(&include_path,
673 session->session_url.path,
674 session, scratch_pool));
681 while (active_reports || rev <= end_revision)
683 if (session->cancel_func)
684 SVN_ERR(session->cancel_func(session->cancel_baton));
686 /* Send pending requests, if any. Limit the number of outstanding
687 requests to MAX_OUTSTANDING_REQUESTS. */
688 if (rev <= end_revision && active_reports < MAX_OUTSTANDING_REQUESTS)
690 struct revision_report_t *rev_ctx;
691 svn_ra_serf__handler_t *handler;
692 apr_pool_t *rev_pool = svn_pool_create(scratch_pool);
693 svn_ra_serf__xml_context_t *xmlctx;
694 const char *replay_target;
696 rev_ctx = apr_pcalloc(rev_pool, sizeof(*rev_ctx));
697 rev_ctx->pool = rev_pool;
698 rev_ctx->revstart_func = revstart_func;
699 rev_ctx->revfinish_func = revfinish_func;
700 rev_ctx->replay_baton = replay_baton;
701 rev_ctx->done = &done;
702 rev_ctx->replay_reports = &active_reports;
703 rev_ctx->include_path = include_path;
704 rev_ctx->revision = rev;
705 rev_ctx->low_water_mark = low_water_mark;
706 rev_ctx->send_deltas = send_deltas;
708 /* Request all properties of a certain revision. */
709 rev_ctx->rev_props = apr_hash_make(rev_ctx->pool);
711 if (SVN_RA_SERF__HAVE_HTTPV2_SUPPORT(session))
713 rev_ctx->revprop_target = apr_psprintf(rev_pool, "%s/%ld",
714 session->rev_stub, rev);
715 rev_ctx->revprop_rev = SVN_INVALID_REVNUM;
719 rev_ctx->revprop_target = report_target;
720 rev_ctx->revprop_rev = rev;
723 SVN_ERR(svn_ra_serf__create_propfind_handler(
724 &rev_ctx->propfind_handler,
726 rev_ctx->revprop_target,
727 rev_ctx->revprop_rev,
729 svn_ra_serf__deliver_svn_props,
733 /* Spin up the serf request for the PROPFIND. */
734 svn_ra_serf__request_create(rev_ctx->propfind_handler);
736 /* Send the replay REPORT request. */
737 if (session->supports_rev_rsrc_replay)
739 replay_target = apr_psprintf(rev_pool, "%s/%ld",
740 session->rev_stub, rev);
744 replay_target = session->session_url.path;
747 xmlctx = svn_ra_serf__xml_context_create(replay_ttable,
748 replay_opened, replay_closed,
749 replay_cdata, rev_ctx,
752 handler = svn_ra_serf__create_expat_handler(session, xmlctx, NULL,
755 handler->method = "REPORT";
756 handler->path = replay_target;
757 handler->body_delegate = create_replay_body;
758 handler->body_delegate_baton = rev_ctx;
759 handler->body_type = "text/xml";
761 handler->done_delegate = replay_done;
762 handler->done_delegate_baton = rev_ctx;
764 rev_ctx->report_handler = handler;
765 svn_ra_serf__request_create(handler);
771 /* Run the serf loop. */
773 SVN_ERR(svn_ra_serf__context_run_wait(&done, session, scratch_pool));
775 /* The done handler of reports decrements active_reports when a report
776 is done. This same handler reports (fatal) report errors, so we can
782 #undef MAX_OUTSTANDING_REQUESTS