如何使用SQL修复数据库中损坏的MPTT树(嵌套集)?

dec*_*eze 19 mysql sql mptt nested-sets

我存储在MySQL的超过10万条记录使用MPTT树lft,rghtparent_id列.现在左/右值已损坏,而父ID仍然完好无损.它需要大量的查询才能在应用程序层中修复它.是否有一种很好的方法来减轻数据库的负担并让它仅使用SQL重新计算左/右值?


为了澄清,我需要重新计算嵌套集的数值lft/rght值,而不是相邻记录的id.

嵌套集http://dev.mysql.com/tech-resources/articles/hierarchical-data-4.png

dec*_*eze 20

以下是我根据@ Lieven的回答改编的内容,结合此处的反馈以获得更好的性能:

DROP PROCEDURE IF EXISTS tree_recover;

DELIMITER //

CREATE PROCEDURE tree_recover ()
MODIFIES SQL DATA
BEGIN

    DECLARE currentId, currentParentId  CHAR(36);
    DECLARE currentLeft                 INT;
    DECLARE startId                     INT DEFAULT 1;

    # Determines the max size for MEMORY tables.
    SET max_heap_table_size = 1024 * 1024 * 512;

    START TRANSACTION;

    # Temporary MEMORY table to do all the heavy lifting in,
    # otherwise performance is simply abysmal.
    CREATE TABLE `tmp_tree` (
        `id`        char(36) NOT NULL DEFAULT '',
        `parent_id` char(36)          DEFAULT NULL,
        `lft`       int(11)  unsigned DEFAULT NULL,
        `rght`      int(11)  unsigned DEFAULT NULL,
        PRIMARY KEY      (`id`),
        INDEX USING HASH (`parent_id`),
        INDEX USING HASH (`lft`),
        INDEX USING HASH (`rght`)
    ) ENGINE = MEMORY
    SELECT `id`,
           `parent_id`,
           `lft`,
           `rght`
    FROM   `tree`;

    # Leveling the playing field.
    UPDATE  `tmp_tree`
    SET     `lft`  = NULL,
            `rght` = NULL;

    # Establishing starting numbers for all root elements.
    WHILE EXISTS (SELECT * FROM `tmp_tree` WHERE `parent_id` IS NULL AND `lft` IS NULL AND `rght` IS NULL LIMIT 1) DO

        UPDATE `tmp_tree`
        SET    `lft`  = startId,
               `rght` = startId + 1
        WHERE  `parent_id` IS NULL
          AND  `lft`       IS NULL
          AND  `rght`      IS NULL
        LIMIT  1;

        SET startId = startId + 2;

    END WHILE;

    # Switching the indexes for the lft/rght columns to B-Trees to speed up the next section, which uses range queries.
    DROP INDEX `lft`  ON `tmp_tree`;
    DROP INDEX `rght` ON `tmp_tree`;
    CREATE INDEX `lft`  USING BTREE ON `tmp_tree` (`lft`);
    CREATE INDEX `rght` USING BTREE ON `tmp_tree` (`rght`);

    # Numbering all child elements
    WHILE EXISTS (SELECT * FROM `tmp_tree` WHERE `lft` IS NULL LIMIT 1) DO

        # Picking an unprocessed element which has a processed parent.
        SELECT     `tmp_tree`.`id`
          INTO     currentId
        FROM       `tmp_tree`
        INNER JOIN `tmp_tree` AS `parents`
                ON `tmp_tree`.`parent_id` = `parents`.`id`
        WHERE      `tmp_tree`.`lft` IS NULL
          AND      `parents`.`lft`  IS NOT NULL
        LIMIT      1;

        # Finding the element's parent.
        SELECT  `parent_id`
          INTO  currentParentId
        FROM    `tmp_tree`
        WHERE   `id` = currentId;

        # Finding the parent's lft value.
        SELECT  `lft`
          INTO  currentLeft
        FROM    `tmp_tree`
        WHERE   `id` = currentParentId;

        # Shifting all elements to the right of the current element 2 to the right.
        UPDATE `tmp_tree`
        SET    `rght` = `rght` + 2
        WHERE  `rght` > currentLeft;

        UPDATE `tmp_tree`
        SET    `lft` = `lft` + 2
        WHERE  `lft` > currentLeft;

        # Setting lft and rght values for current element.
        UPDATE `tmp_tree`
        SET    `lft`  = currentLeft + 1,
               `rght` = currentLeft + 2
        WHERE  `id`   = currentId;

    END WHILE;

    # Writing calculated values back to physical table.
    UPDATE `tree`, `tmp_tree`
    SET    `tree`.`lft`  = `tmp_tree`.`lft`,
           `tree`.`rght` = `tmp_tree`.`rght`
    WHERE  `tree`.`id`   = `tmp_tree`.`id`;

    COMMIT;

    DROP TABLE `tmp_tree`;

END//

DELIMITER ;
Run Code Online (Sandbox Code Playgroud)

适用于一些测试数据,但它仍在我的100,000记录树上运行,所以我还不能给出任何最终判断.直接在物理表上工作的天真脚本具有极差的性能,运行至少几小时,更可能是几天.切换到临时MEMORY表将时间缩短到大约一个小时,选择正确的索引将其缩短到10分钟.


Lie*_*ers 8

使用SQL Server,以下脚本似乎对我有用.

输出测试文件

category_id name                 parent      lft         rgt         lftcalc     rgtcalc
----------- -------------------- ----------- ----------- ----------- ----------- -----------
1           ELECTRONICS          NULL        1           20          1           20
2           TELEVISIONS          1           2           9           2           9
3           TUBE                 2           3           4           3           4
4           LCD                  2           5           6           5           6
5           PLASMA               2           7           8           7           8
6           PORTABLE ELECTRONICS 1           10          19          10          19
7           MP3 PLAYERS          6           11          14          11          14
8           FLASH                7           12          13          12          13
9           CD PLAYERS           6           15          16          15          16
10          2 WAY RADIOS         6           17          18          17          18
Run Code Online (Sandbox Code Playgroud)

脚本

SET NOCOUNT ON
GO

DECLARE @nested_category TABLE (
 category_id INT PRIMARY KEY,
 name VARCHAR(20) NOT NULL,
 parent INT,
 lft INT,
 rgt INT
);

DECLARE @current_Category_ID INTEGER
DECLARE @current_parent INTEGER
DECLARE @SafeGuard INTEGER
DECLARE @myLeft INTEGER
SET @SafeGuard = 100

INSERT INTO @nested_category 
SELECT           1,'ELECTRONICS',NULL,NULL,NULL
UNION ALL SELECT 2,'TELEVISIONS',1,NULL,NULL
UNION ALL SELECT 3,'TUBE',2,NULL,NULL
UNION ALL SELECT 4,'LCD',2,NULL,NULL
UNION ALL SELECT 5,'PLASMA',2,NULL,NULL
UNION ALL SELECT 6,'PORTABLE ELECTRONICS',1,NULL,NULL
UNION ALL SELECT 7,'MP3 PLAYERS',6,NULL,NULL
UNION ALL SELECT 8,'FLASH',7,NULL,NULL
UNION ALL SELECT 9,'CD PLAYERS',6,NULL,NULL
UNION ALL SELECT 10,'2 WAY RADIOS',6,NULL,NULL

/* Initialize */
UPDATE  @nested_category 
SET     lft = 1
        , rgt = 2
WHERE   parent IS NULL

UPDATE  @nested_category 
SET     lft = NULL
        , rgt = NULL
WHERE   parent IS NOT NULL

WHILE EXISTS (SELECT * FROM @nested_category WHERE lft IS NULL) AND @SafeGuard > 0
BEGIN
  SELECT  @current_Category_ID = MAX(nc.category_id)
  FROM    @nested_category nc
          INNER JOIN @nested_category nc2 ON nc2.category_id = nc.parent
  WHERE   nc.lft IS NULL
          AND nc2.lft IS NOT NULL

  SELECT  @current_parent = parent
  FROM    @nested_category
  WHERE   category_id = @current_category_id

  SELECT  @myLeft = lft
  FROM    @nested_category
  WHERE   category_id = @current_parent

  UPDATE @nested_category SET rgt = rgt + 2 WHERE rgt > @myLeft;
  UPDATE @nested_category SET lft = lft + 2 WHERE lft > @myLeft;
  UPDATE @nested_category SET lft = @myLeft + 1, rgt = @myLeft + 2 WHERE category_id = @current_category_id

  SET @SafeGuard = @SafeGuard - 1
END

SELECT * FROM @nested_category ORDER BY category_id

SELECT  COUNT(node.name), node.name, MIN(node.lft)
FROM    @nested_category AS node,
        @nested_category AS parent
WHERE   node.lft BETWEEN parent.lft AND parent.rgt
GROUP BY 
        node.name
ORDER BY
        3, 1
Run Code Online (Sandbox Code Playgroud)

Testscript ##

SET NOCOUNT ON
GO

DECLARE @nested_category TABLE (
 category_id INT PRIMARY KEY,
 name VARCHAR(20) NOT NULL,
 parent INT,
 lft INT,
 rgt INT, 
 lftcalc INT,
 rgtcalc INT
);

INSERT INTO @nested_category 
SELECT           1,'ELECTRONICS',NULL,1,20,NULL,NULL
UNION ALL SELECT 2,'TELEVISIONS',1,2,9,NULL,NULL
UNION ALL SELECT 3,'TUBE',2,3,4,NULL,NULL
UNION ALL SELECT 4,'LCD',2,5,6,NULL,NULL
UNION ALL SELECT 5,'PLASMA',2,7,8,NULL,NULL
UNION ALL SELECT 6,'PORTABLE ELECTRONICS',1,10,19,NULL,NULL
UNION ALL SELECT 7,'MP3 PLAYERS',6,11,14,NULL,NULL
UNION ALL SELECT 8,'FLASH',7,12,13,NULL,NULL
UNION ALL SELECT 9,'CD PLAYERS',6,15,16,NULL,NULL
UNION ALL SELECT 10,'2 WAY RADIOS',6,17,18,NULL,NULL

/* Initialize */
UPDATE  @nested_category 
SET     lftcalc = 1
        , rgtcalc = 2
WHERE   parent IS NULL

DECLARE @current_Category_ID INTEGER
DECLARE @current_parent INTEGER
DECLARE @SafeGuard INTEGER
DECLARE @myRight INTEGER
DECLARE @myLeft INTEGER
SET @SafeGuard = 100
WHILE EXISTS (SELECT * FROM @nested_category WHERE lftcalc IS NULL) AND @SafeGuard > 0
BEGIN
  SELECT  @current_Category_ID = MAX(nc.category_id)
  FROM    @nested_category nc
          INNER JOIN @nested_category nc2 ON nc2.category_id = nc.parent
  WHERE   nc.lftcalc IS NULL
          AND nc2.lftcalc IS NOT NULL

  SELECT  @current_parent = parent
  FROM    @nested_category
  WHERE   category_id = @current_category_id

  SELECT  @myLeft = lftcalc
  FROM    @nested_category
  WHERE   category_id = @current_parent

  UPDATE @nested_category SET rgtcalc = rgtcalc + 2 WHERE rgtcalc > @myLeft;
  UPDATE @nested_category SET lftcalc = lftcalc + 2 WHERE lftcalc > @myLeft;
  UPDATE @nested_category SET lftcalc = @myLeft + 1, rgtcalc = @myLeft + 2 WHERE category_id = @current_category_id

  SELECT * FROM @nested_category WHERE category_id = @current_parent
  SELECT * FROM @nested_category ORDER BY category_id
  SET @SafeGuard = @SafeGuard - 1
END

SELECT * FROM @nested_category ORDER BY category_id

SELECT  COUNT(node.name), node.name, MIN(node.lft)
FROM    @nested_category AS node,
        @nested_category AS parent
WHERE   node.lft BETWEEN parent.lft AND parent.rgt
GROUP BY 
        node.name
ORDER BY
        3, 1
Run Code Online (Sandbox Code Playgroud)

  • 好吧,看下面我改编的版本...... :) (2认同)

小智 5

你救救我吧!!!我使用混合树模型,所以当这一天到来时,我的树(30000+)被损坏了。我从你们的技术中学习,但不是恢复,只是完全重建,丢失了所有排序和反向树...我认为,需要记住旧的 cat_left...只是为了可能的完整性。所以,它可能看起来像……

如果存在则删除程序tree_recover;

分隔符|

创建过程tree_recover()
修改 SQL 数据
开始

    声明 currentId、currentParentId CHAR(36);
    声明 currentLeft INT;
    声明 startId INT 默认值 1;

    # 确定 MEMORY 表的最大大小。
    设置最大堆表大小 = 1024 * 1024 * 512;

    开始交易;

    # 临时 MEMORY 表来完成所有繁重的工作,
    # 否则性能简直糟糕透顶。
    如果存在`tmp_cat`则删除表;
    创建表 `tmp_cat` (
        `cat_id` char(36) NOT NULL DEFAULT '',
        `cat_parent` char(36) 默认为 NULL,
        `cat_left` int(11) 无符号默认为 NULL,
        `cat_right` int(11) 无符号默认为 NULL,
        `cat_left_old` int(11) 无符号默认为 NULL,
        主键(`cat_id`),
        使用哈希索引(`cat_parent`),
        使用哈希索引(`cat_left`),
        使用哈希索引(`cat_right`),
    使用哈希索引(`cat_left_old`)
    ) 引擎 = 内存
    选择“cat_id”,
           `猫_父母`,
           `cat_left`,
           `cat_right`,
       `cat_left` 为 cat_left_old
    来自“目录”;

    # 公平竞争。
    更新`tmp_cat`
    设置“cat_left”= NULL,
            `cat_right` = NULL;

    # 为所有根元素建立起始编号。
    WHILE EXISTS (SELECT * FROM `tmp_cat` WHERE `cat_parent` IS NULL AND `cat_left` IS NULL AND `cat_right` IS NULL ORDER BY cat_left_old LIMIT 1) DO

        更新`tmp_cat`
        SET `cat_left` = startId,
               `cat_right` = 起始 ID + 1
        其中“cat_parent”为空
          并且“cat_left”为空
          并且“cat_right”为空
        限制 1;

        SET 起始Id = 起始Id + 2;

    结束同时;

    # 将 cat_left/rght 列的索引切换到 B 树以加快下一部分的速度,该部分使用范围查询。
    将索引 `cat_left` 删除到 `tmp_cat` 上;
    将索引 `cat_right` 删除到 `tmp_cat` 上;
    将索引 `cat_left_old` 删除到 `tmp_cat` 上;

    在 `tmp_cat` 上使用 BTREE 创建索引 `cat_left` (`cat_left`);
    在 `tmp_cat` 上使用 BTREE 创建索引 `cat_right` (`cat_right`);
    在 `tmp_cat` 上使用 BTREE 创建索引 `cat_left_old` (`cat_left_old`);

    # 对所有子元素进行编号
    WHILE EXISTS (SELECT * FROM `tmp_cat` WHERE `cat_left` IS NULL ORDER BY cat_left_old LIMIT 1) DO

        # 选择一个具有已处理父元素的未处理元素。
        选择 `tmp_cat`.`cat_id`
          INTO 当前 ID
        来自“tmp_cat”
        内连接“tmp_cat”作为“父母”
                ON `tmp_cat`.`cat_parent` = `parents`.`cat_id`
        其中 `tmp_cat`.`cat_left` 为 NULL
          并且“父母”。“cat_left”不为空
    按 `tmp_cat`.cat_left_old DESC 排序
        限制 1;

        # 查找元素的父元素。
        选择“cat_parent”
          INTO 当前父 ID
        来自“tmp_cat”
        WHERE `cat_id` = currentId;

        # 查找父级的 cat_left 值。
        选择“cat_left”
          INTO 当前左
        来自“tmp_cat”
        WHERE `cat_id` = currentParentId;

        # 将当前元素2右侧的所有元素向右移动。
        更新`tmp_cat`
        SET `cat_right` = `cat_right` + 2
        其中 `cat_right` > currentLeft;

        更新`tmp_cat`
        SET `cat_left` = `cat_left` + 2
        WHERE `cat_left` > currentLeft;

        # 设置当前元素的 cat_left 和 rght 值。
        更新`tmp_cat`
        SET `cat_left` = currentLeft + 1,
               `cat_right` = currentLeft + 2
        WHERE `cat_id` = currentId;

    结束同时;

    # 将计算值写回物理表。
    更新“目录”、“tmp_cat”
    SET `catalog`.`cat_left` = `tmp_cat`.`cat_left`,
           `catalog`.`cat_right` = `tmp_cat`.`cat_right`
    WHERE `catalog`.`cat_id` = `tmp_cat`.`cat_id`;

    犯罪;

    如果存在`tmp_cat`则删除表;

结束|