2 rcs_id('$Id: dbaBase.php,v 1.30 2007-07-15 17:39:25 rurban Exp $');
4 require_once('lib/WikiDB/backend.php');
6 // FIXME:padding of data? Is it needed? dba_optimize() seems to do a good
7 // job at packing 'gdbm' (and 'db2') databases.
14 * Values: latestversion . ':' . flags . ':' serialized hash of page meta data
15 * Currently flags = 1 if latest version has empty content.
18 * Index: version:pagename
19 * Value: serialized hash of revision meta data, including:
20 * + quasi-meta-data %content
23 * index: 'o' . pagename
24 * value: serialized list of pages (names) which pagename links to.
25 * index: 'i' . pagename
26 * value: serialized list of pages which link to pagename
29 * Don't keep tables locked the whole time
32 * list of pagenames for get_all_pages
34 * RecentChanges support:
35 * lists of most recent edits (major, minor, either).
38 * Separate hit table, so we don't have to update the whole page entry
39 * each time we get a hit. (Maybe not so important though...).
42 require_once('lib/DbaPartition.php');
44 class WikiDB_backend_dbaBase
45 extends WikiDB_backend
47 function WikiDB_backend_dbaBase (&$dba) {
49 // TODO: page and version tables should be in their own files, probably.
50 // We'll pack them all in one for now (testing).
51 // 2004-07-09 10:07:30 rurban: It's fast enough this way.
52 $this->_pagedb = new DbaPartition($dba, 'p');
53 $this->_versiondb = new DbaPartition($dba, 'v');
54 $linkdbpart = new DbaPartition($dba, 'l');
55 $this->_linkdb = new WikiDB_backend_dbaBase_linktable($linkdbpart);
56 $this->_dbdb = new DbaPartition($dba, 'd');
59 function sortable_columns() {
60 return array('pagename','mtime'/*,'author_id','author'*/);
68 $this->_db->optimize();
77 // rebuild backlink table
78 $this->_linkdb->rebuild();
83 return $this->_linkdb->check();
86 function get_pagedata($pagename) {
87 $result = $this->_pagedb->get($pagename);
90 list(,,$packed) = explode(':', $result, 3);
91 $data = unserialize($packed);
95 function update_pagedata($pagename, $newdata) {
96 $result = $this->_pagedb->get($pagename);
98 list($latestversion,$flags,$data) = explode(':', $result, 3);
99 $data = unserialize($data);
102 $latestversion = $flags = 0;
106 foreach ($newdata as $key => $val) {
112 $this->_pagedb->set($pagename,
113 (int)$latestversion . ':'
118 function get_latest_version($pagename) {
119 return (int) $this->_pagedb->get($pagename);
122 function get_previous_version($pagename, $version) {
123 $versdb = &$this->_versiondb;
125 while (--$version > 0) {
126 if ($versdb->exists($version . ":$pagename"))
132 //check $want_content
133 function get_versiondata($pagename, $version, $want_content=false) {
134 $data = $this->_versiondb->get((int)$version . ":$pagename");
135 if (empty($data)) return false;
137 $data = unserialize($data);
139 $data['%content'] = !empty($data['%content']);
145 * See ADODB for a better delete_page(), which can be undone and is seen in RecentChanges.
148 //function delete_page($pagename) { $this->purge_page($pagename); }
151 * Completely delete page from the database.
153 function purge_page($pagename) {
154 $pagedb = &$this->_pagedb;
155 $versdb = &$this->_versiondb;
157 $version = $this->get_latest_version($pagename);
158 while ($version > 0) {
159 $versdb->set($version-- . ":$pagename", false);
161 $pagedb->set($pagename, false);
163 $this->set_links($pagename, false);
166 function rename_page($pagename, $to) {
167 $result = $this->_pagedb->get($pagename);
169 list($version, $flags, $data) = explode(':', $result, 3);
170 $data = unserialize($data);
175 $links = $this->_linkdb->get_links($pagename, false, false);
176 $this->_pagedb->delete($pagename);
177 $data['pagename'] = $to;
178 $this->_pagedb->set($to,
182 // move over the latest version only
183 $pvdata = $this->get_versiondata($pagename, $version, true);
184 $this->set_versiondata($to, $version, $pvdata);
186 // update links and backlinks
187 $this->_linkdb->set_links($to, $links);
193 * Delete an old revision of a page.
195 function delete_versiondata($pagename, $version) {
196 $versdb = &$this->_versiondb;
198 $latest = $this->get_latest_version($pagename);
200 assert($version > 0);
201 assert($version <= $latest);
203 $versdb->set((int)$version . ":$pagename", false);
205 if ($version == $latest) {
206 $previous = $this->get_previous_version($version);
208 $pvdata = $this->get_versiondata($pagename, $previous);
209 $is_empty = empty($pvdata['%content']);
213 $this->_update_latest_version($pagename, $previous, $is_empty);
218 * Create a new revision of a page.
220 function set_versiondata($pagename, $version, $data) {
221 $versdb = &$this->_versiondb;
223 $versdb->set((int)$version . ":$pagename", serialize($data));
224 if ($version > $this->get_latest_version($pagename))
225 $this->_update_latest_version($pagename, $version, empty($data['%content']));
228 function _update_latest_version($pagename, $latest, $flags) {
229 $pagedb = &$this->_pagedb;
231 $pdata = $pagedb->get($pagename);
233 list(,,$pagedata) = explode(':',$pdata,3);
235 $pagedata = serialize(array());
237 $pagedb->set($pagename, (int)$latest . ':' . (int)$flags . ":$pagedata");
240 function numPages($include_empty=false, $exclude='') {
241 $pagedb = &$this->_pagedb;
243 for ($page = $pagedb->firstkey(); $page!== false; $page = $pagedb->nextkey()) {
245 assert(!empty($page));
248 if ($exclude and in_array($page, $exclude)) continue;
249 if (!$include_empty) {
250 if (!($data = $pagedb->get($page))) continue;
251 list($latestversion,$flags,) = explode(':', $data, 3);
253 if ($latestversion == 0 || $flags != 0)
254 continue; // current content is empty
261 function get_all_pages($include_empty=false, $sortby='', $limit='', $exclude='') {
262 $pagedb = &$this->_pagedb;
264 if ($limit) // extract from,count from limit
265 list($from,$count) = $this->limit($limit);
266 for ($page = $pagedb->firstkey(); $page!== false; $page = $pagedb->nextkey()) {
268 assert(!empty($page));
271 if ($exclude and in_array($page, $exclude)) continue;
272 if ($limit and $from) {
274 if ($i < $from) continue;
276 if ($limit and count($pages) > $count) break;
277 if (!$include_empty) {
278 if (!($data = $pagedb->get($page))) continue;
279 list($latestversion,$flags,) = explode(':', $data, 3);
281 if ($latestversion == 0 || $flags != 0)
282 continue; // current content is empty
286 return new WikiDB_backend_dbaBase_pageiter($this, $pages,
287 array('sortby'=>$sortby/*,
288 'limit' =>$limit*/));
291 function set_links($pagename, $links) {
292 $this->_linkdb->set_links($pagename, $links);
295 function get_links($pagename, $reversed=true, $include_empty=false,
296 $sortby='', $limit='', $exclude='',
297 $want_relations=false)
299 // optimization: if no relation at all is found, mark it in the iterator.
300 $links = $this->_linkdb->get_links($pagename, $reversed, $want_relations);
302 return new WikiDB_backend_dbaBase_pageiter
304 array('sortby'=>$sortby,
307 'want_relations'=>$want_relations,
308 'found_relations' => $want_relations ? $this->_linkdb->found_relations : 0
315 * @return array of all linkrelations
316 * Faster than the dumb WikiDB method.
318 function list_relations($also_attributes=false, $only_attributes=false, $sorted=true) {
319 $linkdb = &$this->_linkdb;
320 $relations = array();
321 for ($link = $linkdb->_db->firstkey(); $link!== false; $link = $linkdb->_db->nextkey()) {
322 if ($link[0] != 'o') continue;
323 $links = $linkdb->_get_links('o', substr($link,1));
324 foreach ($links as $link) { // linkto => page, linkrelation => page
326 and $link['relation']
327 and !in_array($link['relation'], $relations))
329 $is_attribute = empty($link['linkto']); // a relation has both
331 if ($only_attributes or $also_attributes)
332 $relations[] = $link['relation'];
333 } elseif (!$only_attributes) {
334 $relations[] = $link['relation'];
347 * WikiDB_backend_dumb_LinkSearchIter searches over all pages and then all its links.
348 * Since there are less links than pages, and we easily get the pagename from the link key,
349 * we iterate here directly over the linkdb and check the pagematch there.
351 * @param $pages object A TextSearchQuery object for the pagename filter.
352 * @param $query object A SearchQuery object (Text or Numeric) for the linkvalues,
353 * linkto, linkfrom (=backlink), relation or attribute values.
354 * @param $linktype string One of the 4 linktypes "linkto", "linkfrom" (=backlink), "relation" or "attribute".
355 * Maybe also "relation+attribute" for the advanced search.
356 * @param $relation object A TextSearchQuery for the linkname or false.
357 * @param $options array Currently ignored. hash of sortby, limit, exclude.
358 * @return object A WikiDB_backend_iterator.
359 * @see WikiDB::linkSearch
361 function link_search( $pages, $query, $linktype, $relation=false, $options=array() ) {
362 $linkdb = &$this->_linkdb;
365 $want_relations = false;
366 if ($linktype == 'relation') {
367 $want_relations = true;
368 $field = 'linkrelation';
370 if ($linktype == 'attribute') {
371 $want_relations = true;
372 $field = 'attribute';
374 if ($linktype == 'linkfrom') {
378 for ($link = $linkdb->_db->firstkey(); $link!== false; $link = $linkdb->_db->nextkey()) {
379 $type = $reverse ? 'i' : 'o';
380 if ($link[0] != $type) continue;
381 $pagename = substr($link, 1);
382 if (!$pages->match($pagename)) continue;
383 if ($linktype == 'attribute') {
384 $page = $GLOBALS['request']->_dbi->getPage($pagename);
385 $attribs = $page->get('attributes');
387 /* Optimization on expressive searches:
388 for queries with multiple attributes.
389 Just take the defined placeholders from the query(ies)
390 if there are more attributes than query variables.
392 if ($query->getType() != 'text'
394 and ((count($vars = $query->getVars()) > 1)
395 or (count($attribs) > count($vars))))
397 // names must strictly match. no * allowed
398 if (!$query->can_match($attribs)) continue;
399 if (!($result = $query->match($attribs))) continue;
400 foreach ($result as $r) {
401 $r['pagename'] = $pagename;
405 // textsearch or simple value. no strict bind by name needed
406 foreach ($attribs as $attribute => $value) {
407 if ($relation and !$relation->match($attribute)) continue;
408 if (!$query->match($value)) continue;
409 $links[] = array('pagename' => $pagename,
410 'linkname' => $attribute,
411 'linkvalue' => $value);
417 // TODO: honor limits. this can get large.
418 if ($want_relations) {
419 // MAP linkrelation : pagename => thispagename : linkname : linkvalue
420 $_links = $linkdb->_get_links('o', $pagename);
421 foreach ($_links as $link) { // linkto => page, linkrelation => page
422 if (!isset($link['relation']) or !$link['relation']) continue;
423 if ($relation and !$relation->match($link['relation'])) continue;
424 if (!$query->match($link['linkto'])) continue;
425 $links[] = array('pagename' => $pagename,
426 'linkname' => $link['relation'],
427 'linkvalue' => $link['linkto']);
430 $_links = $linkdb->_get_links($reverse ? 'i' : 'o', $pagename);
431 foreach ($_links as $link) { // linkto => page
433 $link = $link['linkto'];
434 if (!$query->match($link)) continue;
435 $links[] = array('pagename' => $pagename,
437 'linkvalue' => $link);
442 $options['want_relations'] = true; // Iter hack to force return of the whole hash
443 return new WikiDB_backend_dbaBase_pageiter($this, $links, $options);
447 * Handle multi-searches for many relations and attributes in one expression.
448 * Bind all required attributes and relations per page together and pass it to one query.
449 * (is_a::city and population < 20000) and (*::city and area > 1000000)
450 * (is_a::city or linkto::CategoryCountry) and population < 20000 and area > 1000000
451 * Note that the 'linkto' and 'linkfrom' links are relations, containing an array.
453 * @param $pages object A TextSearchQuery object for the pagename filter.
454 * @param $query object A SemanticSearchQuery object for the links.
455 * @param $options array Currently ignored. hash of sortby, limit, exclude for the pagelist.
456 * @return object A WikiDB_backend_iterator.
457 * @see WikiDB::linkSearch
459 function relation_search( $pages, $query, $options=array() ) {
460 $linkdb = &$this->_linkdb;
462 // We need to detect which attributes and relation names we should look for. NYI
463 $want_attributes = $query->hasAttributes();
464 $want_relation = $query->hasRelations();
465 $linknames = $query->getLinkNames();
466 // create a hash for faster checks
467 $linkcheck = array();
468 foreach ($linknames as $l) $linkcheck[$l] = 1;
470 for ($link = $linkdb->_db->firstkey(); $link!== false; $link = $linkdb->_db->nextkey()) {
471 $type = $reverse ? 'i' : 'o';
472 if ($link[0] != $type) continue;
473 $pagename = substr($link, 1);
474 if (!$pages->match($pagename)) continue;
475 $pagelinks = array();
476 if ($want_attributes) {
477 $page = $GLOBALS['request']->_dbi->getPage($pagename);
478 $attribs = $page->get('attributes');
479 $pagelinks = $attribs;
481 if ($want_relations) {
482 // all links contain arrays of pagenames, just the attributes
483 // are guaranteed to be singular
484 if (isset($linkcheck['linkfrom'])) {
485 $pagelinks['linkfrom'] = $linkdb->_get_links('i', $pagename);
487 $outlinks = $linkdb->_get_links('o', $pagename);
488 $want_to = isset($linkcheck['linkto']);
489 foreach ($outlinks as $link) { // linkto => page, relation => page
491 if ((isset($link['relation'])) and $link['relation']
492 and isset($linkcheck[$link['relation']]))
493 $pagelinks[$link['relation']][] = $link['linkto'];
495 $pagelinks['linkto'][] = is_array($link) ? $link['linkto'] : $link;
498 if ($result = $query->match($pagelinks)) {
499 $links = array_merge($links, $result);
502 $options['want_relations'] = true; // Iter hack to force return of the whole hash
503 return new WikiDB_backend_dbaBase_pageiter($this, $links, $options);
507 function WikiDB_backend_dbaBase_sortby_pagename_ASC ($a, $b) {
508 return strcasecmp($a, $b);
510 function WikiDB_backend_dbaBase_sortby_pagename_DESC ($a, $b) {
511 return strcasecmp($b, $a);
513 function WikiDB_backend_dbaBase_sortby_mtime_ASC ($a, $b) {
514 return WikiDB_backend_dbaBase_sortby_num($a, $b, 'mtime');
516 function WikiDB_backend_dbaBase_sortby_mtime_DESC ($a, $b) {
517 return WikiDB_backend_dbaBase_sortby_num($b, $a, 'mtime');
520 function WikiDB_backend_dbaBase_sortby_hits_ASC ($a, $b) {
521 return WikiDB_backend_dbaBase_sortby_num($a, $b, 'hits');
523 function WikiDB_backend_dbaBase_sortby_hits_DESC ($a, $b) {
524 return WikiDB_backend_dbaBase_sortby_num($b, $a, 'hits');
527 function WikiDB_backend_dbaBase_sortby_num($aname, $bname, $field) {
529 $dbi = $request->getDbh();
530 // fields are stored in versiondata
531 $av = $dbi->_backend->get_latest_version($aname);
532 $bv = $dbi->_backend->get_latest_version($bname);
533 $a = $dbi->_backend->get_versiondata($aname, $av, false);
535 $b = $dbi->_backend->get_versiondata($bname, $bv, false);
537 if ((!isset($a[$field]) && !isset($b[$field])) || ($a[$field] === $b[$field])) {
540 return (!isset($a[$field]) || ($a[$field] < $b[$field])) ? -1 : 1;
544 class WikiDB_backend_dbaBase_pageiter
545 extends WikiDB_backend_iterator
547 // fixed for linkrelations
548 function WikiDB_backend_dbaBase_pageiter(&$backend, &$pages, $options=false) {
549 $this->_backend = $backend;
550 $this->_options = $options;
552 if (!empty($options['sortby'])) {
553 $sortby = WikiDB_backend::sortby($options['sortby'], 'db', array('pagename','mtime'));
554 if ($sortby and !strstr($sortby, "hits ")) { // check for which column to sortby
555 usort($pages, 'WikiDB_backend_dbaBase_sortby_'.str_replace(' ','_',$sortby));
558 if (!empty($options['limit'])) {
559 list($offset,$limit) = WikiDB_backend::limit($options['limit']);
560 $pages = array_slice($pages, $offset, $limit);
562 $this->_pages = $pages;
564 $this->_pages = array();
567 // fixed for relations
569 if ( ! ($page = array_shift($this->_pages)) )
571 if (!empty($this->_options['want_relations'])) {
572 // $linkrelation = $page['linkrelation'];
573 $pagename = $page['pagename'];
574 if (!empty($this->_options['exclude']) and in_array($pagename, $this->_options['exclude']))
575 return $this->next();
578 if (!empty($this->_options['exclude']) and in_array($page, $this->_options['exclude']))
579 return $this->next();
580 return array('pagename' => $page);
584 $this->_pages = array();
588 class WikiDB_backend_dbaBase_linktable
590 function WikiDB_backend_dbaBase_linktable(&$dba) {
594 //TODO: try storing link lists as hashes rather than arrays.
595 // backlink deletion would be faster.
596 function get_links($page, $reversed=true, $want_relations=false) {
597 if ($want_relations) {
598 $this->found_relations = 0;
599 $links = $this->_get_links($reversed ? 'i' : 'o', $page);
600 $linksonly = array();
601 foreach ($links as $link) { // linkto => page, linkrelation => page
602 if (is_array($link) and isset($link['relation'])) {
603 if ($link['relation'])
604 $this->found_relations++;
605 $linksonly[] = array('pagename' => $link['linkto'],
606 'linkrelation' => $link['relation']);
607 } else { // empty relations are stripped
608 $linksonly[] = array('pagename' => $link['linkto']);
613 $links = $this->_get_links($reversed ? 'i' : 'o', $page);
614 $linksonly = array();
615 foreach ($links as $link) {
616 if (is_array($link)) {
617 $linksonly[] = $link['linkto'];
619 $linksonly[] = $link;
625 // fixed: relations ready
626 function set_links($page, $links) {
628 $oldlinks = $this->get_links($page, false, false);
630 if (!is_array($links)) {
631 assert(empty($links));
634 $this->_set_links('o', $page, $links);
636 /* Now for the backlink update we squash the linkto hashes into a simple array */
638 foreach ($links as $hash) {
639 if (!empty($hash['linkto'])
640 and !in_array($hash['linkto'], $newlinks))
641 // for attributes it's empty
642 $newlinks[] = $hash['linkto'];
644 //$newlinks = array_unique($newlinks);
650 $new = current($newlinks);
651 $old = current($oldlinks);
652 while ($new !== false || $old !== false) {
653 if ($old === false || ($new !== false && $new < $old)) {
654 // $new is a new link (not in $oldlinks).
655 $this->_add_backlink($new, $page);
656 $new = next($newlinks);
658 elseif ($new === false || $old < $new) {
659 // $old is a obsolete link (not in $newlinks).
660 $this->_delete_backlink($old, $page);
661 $old = next($oldlinks);
664 // Unchanged link (in both $newlist and $oldlinks).
665 assert($new == $old);
666 $new = next($newlinks);
667 $old = next($oldlinks);
673 * Rebuild the back-link index.
675 * This should never be needed, but if the database gets hosed for some reason,
676 * this should put it back into a consistent state.
678 * We assume the forward links in the our table are correct, and recalculate
679 * all the backlinks appropriately.
681 function rebuild () {
684 // Delete the backlink tables, make a list of lo.page keys.
686 for ($key = $db->firstkey(); $key; $key = $db->nextkey()) {
689 elseif ($key[0] == 'o')
692 trigger_error("Bad key in linktable: '$key'", E_USER_WARNING);
696 foreach ($okeys as $key) {
697 $page = substr($key,1);
698 $links = $this->_get_links('o', $page);
700 $this->set_links($page, $links);
707 // FIXME: check for sortedness and uniqueness in links lists.
709 for ($key = $db->firstkey(); $key; $key = $db->nextkey()) {
710 if (strlen($key) < 1 || ($key[0] != 'i' && $key[0] != 'o')) {
711 $errs[] = "Bad key '$key' in table";
714 $page = substr($key, 1);
715 if ($key[0] == 'o') {
717 foreach($this->_get_links('o', $page) as $link) {
718 $link = $link['linkto'];
719 if (!$this->_has_link('i', $link, $page))
720 $errs[] = "backlink entry missing for link '$page'->'$link'";
724 assert($key[0] == 'i');
726 foreach($this->_get_links('i', $page) as $link) {
727 if (!$this->_has_link('o', $link, $page))
728 $errs[] = "link entry missing for backlink '$page'<-'$link'";
732 //if ($errs) $this->rebuild();
733 return isset($errs) ? $errs : false;
736 /* TODO: Add another lrRelationName key for relations.
737 * lrRelationName: frompage => topage
740 function _add_relation($page, $linkedfrom) {
741 $relations = $this->_get_links('r', $page);
742 $backlinks[] = $linkedfrom;
744 $this->_set_links('i', $page, $backlinks);
747 function _add_backlink($page, $linkedfrom) {
748 $backlinks = $this->_get_links('i', $page);
749 $backlinks[] = $linkedfrom;
751 $this->_set_links('i', $page, $backlinks);
754 function _delete_backlink($page, $linkedfrom) {
755 $backlinks = $this->_get_links('i', $page);
756 foreach ($backlinks as $key => $backlink) {
757 if ($backlink == $linkedfrom)
758 unset($backlinks[$key]);
760 $this->_set_links('i', $page, $backlinks);
763 function _has_link($which, $page, $link) {
764 $links = $this->_get_links($which, $page);
765 // since links are always sorted, break if >
766 // TODO: binary search
767 foreach($links as $l) {
768 if ($l['linkto'] == $link)
770 if ($l['linkto'] > $link)
776 function _get_links($which, $page) {
777 $data = $this->_db->get($which . $page);
778 return $data ? unserialize($data) : array();
781 function _set_links($which, $page, &$links) {
782 $key = $which . $page;
784 $this->_db->set($key, serialize($links));
786 $this->_db->set($key, false);
790 // $Log: not supported by cvs2svn $
791 // Revision 1.29 2007/05/24 18:39:10 rurban
792 // limits for get_all_pages, improved WantedPages
794 // Revision 1.28 2007/01/03 21:26:01 rurban
795 // Fix dba searching for relations. Optimize link_search for strict attribute search. Add relation_search()
797 // Revision 1.27 2007/01/02 13:19:33 rurban
798 // faster list_relations method. new native link_search method. additions to rebuild() (still very slow), fix iterator options (for want_relations and exclude). Clarify API: sortby,limit and exclude are strings
800 // Revision 1.26 2006/12/22 00:27:37 rurban
804 // (c-file-style: "gnu")
809 // c-hanging-comment-ender-p: nil
810 // indent-tabs-mode: nil